This is an automated email from the ASF dual-hosted git repository.

frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a76915e662 Implement dynamic capacity for kubernetes task runner 
(#18591)
4a76915e662 is described below

commit 4a76915e6620127423210e4ccecaee43d9ee3bb6
Author: Gabriel Chang <[email protected]>
AuthorDate: Mon Dec 8 10:27:40 2025 +0800

    Implement dynamic capacity for kubernetes task runner (#18591)
    
    * Implement dynamic capacity for kubernetes task runner
    
    * Update docs
    
    * Refactor to use getIfNull
    
    * Update docs wording
    
    * Fix based on comments
    
    * Update wording for docs
    
    * Upate static config java doc
    
    * Undo removal of constructor
    
    * Initial config observer implementation
    
    * Use StringUtils.format
    
    * Add missing import
    
    * Update listener operations to be atomic
    
    * Fix config manager intialisation in tests
    
    * Add tests for effective config
    
    * Test syncCapacityWithDynamicConfig
    
    * Fix checkstyle
    
    * Update KubernetesOverlordModuleTest to setup ConfigManager
    
    * Update docs
    
    * Use AtomicInteger for currentCapacity
---
 docs/development/extensions-core/k8s-jobs.md       |  21 +-
 .../k8s/overlord/KubernetesOverlordModule.java     |  18 +-
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |  49 ++-
 .../k8s/overlord/KubernetesTaskRunnerConfig.java   | 331 ++-------------------
 .../KubernetesTaskRunnerEffectiveConfig.java       | 188 ++++++++++++
 .../k8s/overlord/KubernetesTaskRunnerFactory.java  |  13 +-
 ....java => KubernetesTaskRunnerStaticConfig.java} | 220 ++------------
 .../DefaultKubernetesTaskRunnerDynamicConfig.java  |  42 ++-
 .../KubernetesTaskExecutionConfigResource.java     |  20 +-
 .../KubernetesTaskRunnerDynamicConfig.java         |  22 ++
 .../DynamicConfigPodTemplateSelector.java          |  19 +-
 .../taskadapter/PodTemplateTaskAdapter.java        |   2 +-
 .../k8s/overlord/KubernetesOverlordModuleTest.java |  25 ++
 .../KubernetesTaskRunnerEffectiveConfigTest.java   |  90 ++++++
 .../overlord/KubernetesTaskRunnerFactoryTest.java  |  13 +-
 ...a => KubernetesTaskRunnerStaticConfigTest.java} |  10 +-
 .../k8s/overlord/KubernetesTaskRunnerTest.java     | 100 ++++++-
 ...faultKubernetesTaskRunnerDynamicConfigTest.java |  13 +-
 .../KubernetesTaskExecutionConfigResourceTest.java | 121 ++++++++
 .../KubernetesTaskRunnerDynamicConfigTest.java     |  13 +-
 .../DruidPeonClientIntegrationTest.java            |   7 +-
 .../DynamicConfigPodTemplateSelectorTest.java      |  47 +--
 .../overlord/taskadapter/K8sTaskAdapterTest.java   |  71 ++---
 .../taskadapter/MultiContainerTaskAdapterTest.java |  25 +-
 .../taskadapter/PodTemplateTaskAdapterTest.java    |   3 +-
 .../SingleContainerTaskAdapterTest.java            |   7 +-
 .../apache/druid/common/config/ConfigManager.java  |  51 ++++
 27 files changed, 905 insertions(+), 636 deletions(-)

diff --git a/docs/development/extensions-core/k8s-jobs.md 
b/docs/development/extensions-core/k8s-jobs.md
index 289934646af..6dc7bdd70ea 100644
--- a/docs/development/extensions-core/k8s-jobs.md
+++ b/docs/development/extensions-core/k8s-jobs.md
@@ -48,9 +48,9 @@ Other configurations required are:
 Druid operators can dynamically tune certain features within this extension. 
You don't need to restart the Overlord
 service for these changes to take effect.
 
-Druid can dynamically tune [pod template selection](#pod-template-selection), 
which allows you to configure the pod 
-template based on the task to be run. To enable dynamic pod template 
selection, first configure the 
-[custom template pod adapter](#custom-template-pod-adapter).
+Druid can dynamically tune [pod template selection](#pod-template-selection) 
and [capacity](#properties). Where capacity refers to 
`druid.indexer.runner.capacity`.
+
+Pod template selection allows you to configure the pod template based on the 
task to be run. To enable dynamic pod template selection, first configure the 
[custom template pod adapter](#custom-template-pod-adapter).
 
 Use the following APIs to view and update the dynamic configuration for the 
Kubernetes task runner.
 
@@ -126,7 +126,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
         "type": ["index_kafka"]
       }
     ]
-  }
+  },
+  "capacity": 12
 }
 ```
 </details>
@@ -135,6 +136,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 Updates the dynamic configuration for the Kubernetes Task Runner
 
+Note: Both `podTemplateSelectStrategy` and `capacity` are optional fields. A 
POST request may include either, both, or neither.
+
 ##### URL
 
 `POST` `/druid/indexer/v1/k8s/taskrunner/executionconfig`
@@ -193,7 +196,8 @@ curl 
"http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskrunner/executionconf
         "type": ["index_kafka"]
       }
     ]
-  }
+  },
+  "capacity": 6
 }'
 ```
 
@@ -225,7 +229,8 @@ Content-Type: application/json
         "type": ["index_kafka"]
       }
     ]
-  }
+  },
+  "capacity": 6
 }
 ```
 
@@ -309,7 +314,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
       "comment": "",
       "ip": "127.0.0.1"
     },
-    "payload": "{\"type\": 
\"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"}",
+    "payload": "{\"type\": 
\"default\",\"podTemplateSelectStrategy\":{\"type\": 
\"taskType\"},\"capacity\":6",
     "auditTime": "2024-06-13T20:59:51.622Z"
   }
 ]
@@ -790,7 +795,7 @@ Should you require the needed permissions for interacting 
across Kubernetes name
 | `druid.indexer.runner.annotations` | `JsonObject` | Additional annotations 
you want to add to peon pod. | `{}` | No |
 | `druid.indexer.runner.peonMonitors` | `JsonArray` | Overrides 
`druid.monitoring.monitors`. Use this property if you don't want to inherit 
monitors from the Overlord. | `[]` | No |
 | `druid.indexer.runner.graceTerminationPeriodSeconds` | `Long` | Number of 
seconds you want to wait after a sigterm for container lifecycle hooks to 
complete. Keep at a smaller value if you want tasks to hold locks for shorter 
periods. | `PT30S` (K8s default) | No |
-| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that 
can be sent to Kubernetes. | `2147483647` | No |
+| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that 
can be sent to Kubernetes. Value will be overridden if a dynamic config value 
has been set. | `2147483647` | No |
 | `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core 
for the task. | `1000` | No |
 | `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the 
ingestion task makes a best effort to persist the pod logs from `k8s` to 
persistent task log storage. The timeout ensures that `k8s` connection issues 
do not cause the pod to hang indefinitely thereby blocking Overlord operations. 
If the timeout occurs before the logs are saved, those logs will not be 
available in Druid. | `PT300S` | NO |
 
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index 021abef0828..936e86c888c 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -95,7 +95,7 @@ public class KubernetesOverlordModule implements DruidModule
   public void configure(Binder binder)
   {
     // druid.indexer.runner.type=k8s
-    JsonConfigProvider.bind(binder, 
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, 
KubernetesTaskRunnerConfig.class);
+    JsonConfigProvider.bind(binder, 
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, 
KubernetesTaskRunnerStaticConfig.class);
     JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, 
KubernetesAndWorkerTaskRunnerConfig.class);
     JsonConfigProvider.bind(binder, "druid.indexer.queue", 
TaskQueueConfig.class);
     JacksonConfigProvider.bind(binder, 
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, 
KubernetesTaskRunnerDynamicConfig.class, null);
@@ -150,10 +150,20 @@ public class KubernetesOverlordModule implements 
DruidModule
     JsonConfigProvider.bind(binder, JDK_HTTPCLIENT_PROPERITES_PREFIX, 
DruidKubernetesJdkHttpClientConfig.class);
   }
 
+  @Provides
+  @LazySingleton
+  public KubernetesTaskRunnerEffectiveConfig provideEffectiveConfig(
+      KubernetesTaskRunnerStaticConfig staticConfig,
+      Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigSupplier
+  )
+  {
+    return new KubernetesTaskRunnerEffectiveConfig(staticConfig, 
dynamicConfigSupplier);
+  }
+
   @Provides
   @LazySingleton
   public DruidKubernetesClient makeKubernetesClient(
-      KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
+      KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerConfig,
       DruidKubernetesHttpClientFactory httpClientFactory,
       Lifecycle lifecycle
   )
@@ -217,7 +227,7 @@ public class KubernetesOverlordModule implements DruidModule
   TaskAdapter provideTaskAdapter(
       DruidKubernetesClient client,
       Properties properties,
-      KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
+      KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig,
       TaskConfig taskConfig,
       StartupLoggingConfig startupLoggingConfig,
       @Self DruidNode druidNode,
@@ -260,7 +270,7 @@ public class KubernetesOverlordModule implements DruidModule
           druidNode,
           smileMapper,
           taskLogs,
-          new DynamicConfigPodTemplateSelector(properties, dynamicConfigRef)
+          new DynamicConfigPodTemplateSelector(properties, 
kubernetesTaskRunnerConfig)
       );
     } else {
       return new SingleContainerTaskAdapter(
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 51913d13a71..48d709d1f84 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.druid.common.config.ConfigManager;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.RunnerTaskState;
@@ -44,6 +45,7 @@ import 
org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -56,6 +58,7 @@ import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandle
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import 
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
+import 
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
 import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
 import org.apache.druid.tasklogs.TaskLogStreamer;
 import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -76,8 +79,11 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /**
@@ -100,6 +106,7 @@ import java.util.stream.Collectors;
 public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
 {
   private static final EmittingLogger log = new 
EmittingLogger(KubernetesTaskRunner.class);
+  private static final String OBSERVER_KEY = "k8s-task-runner-capacity-%s";
   private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> 
listeners = new CopyOnWriteArrayList<>();
 
   // to cleanup old jobs that might not have been deleted.
@@ -111,19 +118,23 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   private final KubernetesPeonClient client;
   private final KubernetesTaskRunnerConfig config;
   private final ListeningExecutorService exec;
+  private final ThreadPoolExecutor tpe;
   private final HttpClient httpClient;
   private final PeonLifecycleFactory peonLifecycleFactory;
   private final ServiceEmitter emitter;
   // currently worker categories aren't supported, so it's hardcoded.
   protected static final String WORKER_CATEGORY = "_k8s_worker_category";
 
+  private final AtomicInteger currentCapacity;
+
   public KubernetesTaskRunner(
       TaskAdapter adapter,
       KubernetesTaskRunnerConfig config,
       KubernetesPeonClient client,
       HttpClient httpClient,
       PeonLifecycleFactory peonLifecycleFactory,
-      ServiceEmitter emitter
+      ServiceEmitter emitter,
+      ConfigManager configManager
   )
   {
     this.adapter = adapter;
@@ -132,10 +143,12 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
     this.httpClient = httpClient;
     this.peonLifecycleFactory = peonLifecycleFactory;
     this.cleanupExecutor = Executors.newScheduledThreadPool(1);
-    this.exec = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
-    );
     this.emitter = emitter;
+
+    this.currentCapacity = new AtomicInteger(config.getCapacity());
+    this.tpe = new ThreadPoolExecutor(currentCapacity.get(), 
currentCapacity.get(), 0L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<Runnable>(), Execs.makeThreadFactory("k8s-task-runner-%d", 
null));
+    this.exec = MoreExecutors.listeningDecorator(this.tpe);
+    configManager.addListener(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, 
StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()), 
this::syncCapacityWithDynamicConfig);
   }
 
   @Override
@@ -179,6 +192,24 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
     }
   }
 
+  private void syncCapacityWithDynamicConfig(KubernetesTaskRunnerDynamicConfig 
config)
+  {
+    int newCapacity = config.getCapacity();
+    if (newCapacity == currentCapacity.get()) {
+      return;
+    }
+    log.info("Adjusting k8s task runner capacity from [%d] to [%d]", 
currentCapacity.get(), newCapacity);
+    // maximum pool size must always be greater than or equal to the core pool 
size
+    if (newCapacity < currentCapacity.get()) {
+      tpe.setCorePoolSize(newCapacity);
+      tpe.setMaximumPoolSize(newCapacity);
+    } else {
+      tpe.setMaximumPoolSize(newCapacity);
+      tpe.setCorePoolSize(newCapacity);
+    }
+    currentCapacity.set(newCapacity);
+  }
+
   private TaskStatus runTask(Task task)
   {
     return doTask(task, true);
@@ -294,7 +325,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
     synchronized (tasks) {
       tasks.remove(taskid);
     }
-    
+
   }
 
   @Override
@@ -420,7 +451,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public Map<String, Long> getTotalTaskSlotCount()
   {
-    return ImmutableMap.of(WORKER_CATEGORY, (long) config.getCapacity());
+    return ImmutableMap.of(WORKER_CATEGORY, (long) currentCapacity.get());
   }
 
   @Override
@@ -438,13 +469,13 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public Map<String, Long> getIdleTaskSlotCount()
   {
-    return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, 
config.getCapacity() - tasks.size()));
+    return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, 
currentCapacity.get() - tasks.size()));
   }
 
   @Override
   public Map<String, Long> getUsedTaskSlotCount()
   {
-    return ImmutableMap.of(WORKER_CATEGORY, (long) 
Math.min(config.getCapacity(), tasks.size()));
+    return ImmutableMap.of(WORKER_CATEGORY, (long) 
Math.min(currentCapacity.get(), tasks.size()));
   }
 
   @Override
@@ -535,7 +566,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   @Override
   public int getTotalCapacity()
   {
-    return config.getCapacity();
+    return currentCapacity.get();
   }
 
   @Override
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
index 663126ba2e6..e7b9eb2b804 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
@@ -19,339 +19,60 @@
 
 package org.apache.druid.k8s.overlord;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.ObjectUtils;
 import org.joda.time.Period;
 
-import javax.annotation.Nonnull;
 import javax.validation.constraints.Max;
 import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
+
 import java.util.List;
 import java.util.Map;
 
-public class KubernetesTaskRunnerConfig
+public interface KubernetesTaskRunnerConfig
 {
-  @JsonProperty
-  @NotNull
-  private String namespace;
-
-  @JsonProperty
-  private String k8sTaskPodNamePrefix = "";
-
-  // This property is the namespace that the Overlord is running in.
-  // For cases where we want task pods to run on different namespace from the 
overlord, we need to specify the namespace of the overlord here.
-  // Else, we can simply leave this field alone.
-  @JsonProperty
-  private String overlordNamespace = "";
-
-  @JsonProperty
-  private boolean debugJobs = false;
-
-  /**
-   * Deprecated, please specify adapter type runtime property instead
-   * <p>
-   * I.E `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer`
-   */
-  @Deprecated
-  @JsonProperty
-  private boolean sidecarSupport = false;
-
-  @JsonProperty
-  // if this is not set, then the first container in your pod spec is assumed 
to be the overlord container.
-  // usually this is fine, but when you are dynamically adding sidecars like 
istio, the service mesh could
-  // in fact place the istio-proxy container as the first container.  Thus, 
you would specify this value to
-  // the name of your primary container.  e.g. druid-overlord
-  private String primaryContainerName = null;
-
-  @JsonProperty
-  // for multi-container jobs, we need this image to shut down sidecars after 
the main container
-  // has completed
-  private String kubexitImage = "karlkfi/kubexit:v0.3.2";
-
-  // how much time to wait for preStop hooks to execute
-  // lower number speeds up pod termination time to release locks
-  // faster, defaults to your k8s setup, usually 30 seconds.
-  private Long graceTerminationPeriodSeconds = null;
-
-  @JsonProperty
-  // disable using http / https proxy environment variables
-  private boolean disableClientProxy;
-
-  @JsonProperty
-  @NotNull
-  private Period maxTaskDuration = new Period("PT4H");
-
-  @JsonProperty
-  @NotNull
-  // how long to wait for the jobs to be cleaned up.
-  private Period taskCleanupDelay = new Period("P2D");
-
-  @JsonProperty
-  @NotNull
-  // interval for k8s job cleanup to run
-  private Period taskCleanupInterval = new Period("PT10m");
-
-  @JsonProperty
-  @NotNull
-  // how long to wait to join peon k8s jobs on startup
-  private Period taskJoinTimeout = new Period("PT1M");
-
-
-  @JsonProperty
-  @NotNull
-  // how long to wait for the peon k8s job to launch
-  private Period k8sjobLaunchTimeout = new Period("PT1H");
-
-  @JsonProperty
-  @NotNull
-  // how long to wait for log saving operations to complete
-  private Period logSaveTimeout = new Period("PT300S");
-
-  @JsonProperty
-  // ForkingTaskRunner inherits the monitors from the MM, in k8s mode
-  // the peon inherits the monitors from the overlord, so if someone specifies
-  // a TaskCountStatsMonitor in the overlord for example, the peon process
-  // fails because it can not inject this monitor in the peon process.
-  private List<String> peonMonitors = ImmutableList.of();
-
-  @JsonProperty
-  @NotNull
-  private List<String> javaOptsArray = ImmutableList.of();
-
-  @JsonProperty
-  @NotNull
-  private int cpuCoreInMicro = 0;
-
-  @JsonProperty
-  @NotNull
-  private Map<String, String> labels = ImmutableMap.of();
-
-  @JsonProperty
-  @NotNull
-  private Map<String, String> annotations = ImmutableMap.of();
-
-  @JsonProperty
-  @Min(1)
-  @Max(Integer.MAX_VALUE)
-  @NotNull
-  private Integer capacity = Integer.MAX_VALUE;
-
-  public KubernetesTaskRunnerConfig()
-  {
-  }
+  String getNamespace();
 
-  private KubernetesTaskRunnerConfig(
-      @Nonnull String namespace,
-      String overlordNamespace,
-      String k8sTaskPodNamePrefix,
-      boolean debugJobs,
-      boolean sidecarSupport,
-      String primaryContainerName,
-      String kubexitImage,
-      Long graceTerminationPeriodSeconds,
-      boolean disableClientProxy,
-      Period maxTaskDuration,
-      Period taskCleanupDelay,
-      Period taskCleanupInterval,
-      Period k8sjobLaunchTimeout,
-      Period logSaveTimeout,
-      List<String> peonMonitors,
-      List<String> javaOptsArray,
-      int cpuCoreInMicro,
-      Map<String, String> labels,
-      Map<String, String> annotations,
-      Integer capacity,
-      Period taskJoinTimeout
-  )
-  {
-    this.namespace = namespace;
-    this.overlordNamespace = ObjectUtils.getIfNull(
-        overlordNamespace,
-        this.overlordNamespace
-    );
-    this.k8sTaskPodNamePrefix = k8sTaskPodNamePrefix;
-    this.debugJobs = ObjectUtils.getIfNull(
-        debugJobs,
-        this.debugJobs
-    );
-    this.sidecarSupport = ObjectUtils.getIfNull(
-        sidecarSupport,
-        this.sidecarSupport
-    );
-    this.primaryContainerName = ObjectUtils.getIfNull(
-        primaryContainerName,
-        this.primaryContainerName
-    );
-    this.kubexitImage = ObjectUtils.getIfNull(
-        kubexitImage,
-        this.kubexitImage
-    );
-    this.graceTerminationPeriodSeconds = ObjectUtils.getIfNull(
-        graceTerminationPeriodSeconds,
-        this.graceTerminationPeriodSeconds
-    );
-    this.disableClientProxy = disableClientProxy;
-    this.maxTaskDuration = ObjectUtils.getIfNull(
-        maxTaskDuration,
-        this.maxTaskDuration
-    );
-    this.taskCleanupDelay = ObjectUtils.getIfNull(
-        taskCleanupDelay,
-        this.taskCleanupDelay
-    );
-    this.taskCleanupInterval = ObjectUtils.getIfNull(
-        taskCleanupInterval,
-        this.taskCleanupInterval
-    );
-    this.k8sjobLaunchTimeout = ObjectUtils.getIfNull(
-        k8sjobLaunchTimeout,
-        this.k8sjobLaunchTimeout
-    );
-    this.taskJoinTimeout = ObjectUtils.getIfNull(
-        taskJoinTimeout,
-        this.taskJoinTimeout
-    );
-    this.logSaveTimeout = ObjectUtils.getIfNull(
-        logSaveTimeout,
-        this.logSaveTimeout
-    );
-    this.peonMonitors = ObjectUtils.getIfNull(
-        peonMonitors,
-        this.peonMonitors
-    );
-    this.javaOptsArray = ObjectUtils.getIfNull(
-        javaOptsArray,
-        this.javaOptsArray
-    );
-    this.cpuCoreInMicro = ObjectUtils.getIfNull(
-        cpuCoreInMicro,
-        this.cpuCoreInMicro
-    );
-    this.labels = ObjectUtils.getIfNull(
-        labels,
-        this.labels
-    );
-    this.annotations = ObjectUtils.getIfNull(
-        annotations,
-        this.annotations
-    );
-    this.capacity = ObjectUtils.getIfNull(
-        capacity,
-        this.capacity
-    );
-  }
+  String getOverlordNamespace();
 
-  public String getNamespace()
-  {
-    return namespace;
-  }
-
-  public String getOverlordNamespace()
-  {
-    return overlordNamespace;
-  }
-
-  public String getK8sTaskPodNamePrefix()
-  {
-    return k8sTaskPodNamePrefix;
-  }
+  String getK8sTaskPodNamePrefix();
 
-  public boolean isDebugJobs()
-  {
-    return debugJobs;
-  }
+  boolean isDebugJobs();
 
   @Deprecated
-  public boolean isSidecarSupport()
-  {
-    return sidecarSupport;
-  }
+  boolean isSidecarSupport();
 
-  public String getPrimaryContainerName()
-  {
-    return primaryContainerName;
-  }
+  String getPrimaryContainerName();
 
-  public String getKubexitImage()
-  {
-    return kubexitImage;
-  }
+  String getKubexitImage();
 
-  public Long getGraceTerminationPeriodSeconds()
-  {
-    return graceTerminationPeriodSeconds;
-  }
+  Long getGraceTerminationPeriodSeconds();
 
-  public boolean isDisableClientProxy()
-  {
-    return disableClientProxy;
-  }
+  boolean isDisableClientProxy();
 
-  public Period getTaskTimeout()
-  {
-    return maxTaskDuration;
-  }
+  Period getTaskTimeout();
 
-  public Period getTaskJoinTimeout()
-  {
-    return taskJoinTimeout;
-  }
+  Period getTaskJoinTimeout();
 
+  Period getTaskCleanupDelay();
 
-  public Period getTaskCleanupDelay()
-  {
-    return taskCleanupDelay;
-  }
+  Period getTaskCleanupInterval();
 
-  public Period getTaskCleanupInterval()
-  {
-    return taskCleanupInterval;
-  }
+  Period getTaskLaunchTimeout();
 
-  public Period getTaskLaunchTimeout()
-  {
-    return k8sjobLaunchTimeout;
-  }
+  Period getLogSaveTimeout();
 
-  public Period getLogSaveTimeout()
-  {
-    return logSaveTimeout;
-  }
+  List<String> getPeonMonitors();
 
-  public List<String> getPeonMonitors()
-  {
-    return peonMonitors;
-  }
-
-  public List<String> getJavaOptsArray()
-  {
-    return javaOptsArray;
-  }
+  List<String> getJavaOptsArray();
 
-  public int getCpuCoreInMicro()
-  {
-    return cpuCoreInMicro;
-  }
+  int getCpuCoreInMicro();
 
-  public Map<String, String> getLabels()
-  {
-    return labels;
-  }
+  Map<String, String> getLabels();
 
-  public Map<String, String> getAnnotations()
-  {
-    return annotations;
-  }
+  Map<String, String> getAnnotations();
 
-  public Integer getCapacity()
-  {
-    return capacity;
-  }
+  Integer getCapacity();
 
-  public static Builder builder()
+  static Builder builder()
   {
     return new Builder();
   }
@@ -511,9 +232,9 @@ public class KubernetesTaskRunnerConfig
       return this;
     }
 
-    public KubernetesTaskRunnerConfig build()
+    public KubernetesTaskRunnerStaticConfig build()
     {
-      return new KubernetesTaskRunnerConfig(
+      return new KubernetesTaskRunnerStaticConfig(
           this.namespace,
           this.overlordNamespace,
           this.k8sTaskPodNamePrefix,
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
new file mode 100644
index 00000000000..c1593c3577d
--- /dev/null
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.base.Supplier;
+import 
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;
+import org.joda.time.Period;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides a flexible mechanism to configure Kubernetes task pods, 
+ * by merging the static base settings from {@link KubernetesTaskRunnerConfig} 
+ * with dynamic overrides from {@link KubernetesTaskRunnerDynamicConfig}.
+ * <p>
+ * Kubernetes will always use this effective config to run new tasks.
+ */
+public class KubernetesTaskRunnerEffectiveConfig implements 
KubernetesTaskRunnerConfig
+{
+  private final KubernetesTaskRunnerStaticConfig staticConfig;
+  private final Supplier<KubernetesTaskRunnerDynamicConfig> 
dynamicConfigSupplier;
+
+  public KubernetesTaskRunnerEffectiveConfig(
+      KubernetesTaskRunnerStaticConfig staticConfig,
+      Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigSupplier
+  )
+  {
+    this.staticConfig = staticConfig;
+    this.dynamicConfigSupplier = dynamicConfigSupplier;
+  }
+
+  @Override
+  public String getNamespace()
+  {
+    return staticConfig.getNamespace();
+  }
+
+  @Override
+  public String getOverlordNamespace()
+  {
+    return staticConfig.getOverlordNamespace();
+  }
+
+  @Override
+  public String getK8sTaskPodNamePrefix()
+  {
+    return staticConfig.getK8sTaskPodNamePrefix();
+  }
+
+  @Override
+  public boolean isDebugJobs()
+  {
+    return staticConfig.isDebugJobs();
+  }
+
+  @Override
+  public boolean isSidecarSupport()
+  {
+    return staticConfig.isSidecarSupport();
+  }
+
+  @Override
+  public String getPrimaryContainerName()
+  {
+    return staticConfig.getPrimaryContainerName();
+  }
+
+  @Override
+  public String getKubexitImage()
+  {
+    return staticConfig.getKubexitImage();
+  }
+
+  @Override
+  public Long getGraceTerminationPeriodSeconds()
+  {
+    return staticConfig.getGraceTerminationPeriodSeconds();
+  }
+
+  @Override
+  public boolean isDisableClientProxy()
+  {
+    return staticConfig.isDisableClientProxy();
+  }
+
+  @Override
+  public Period getTaskTimeout()
+  {
+    return staticConfig.getTaskTimeout();
+  }
+
+  @Override
+  public Period getTaskJoinTimeout()
+  {
+    return staticConfig.getTaskJoinTimeout();
+  }
+
+  @Override
+  public Period getTaskCleanupDelay()
+  {
+    return staticConfig.getTaskCleanupDelay();
+  }
+
+  @Override
+  public Period getTaskCleanupInterval()
+  {
+    return staticConfig.getTaskCleanupInterval();
+  }
+
+  @Override
+  public Period getTaskLaunchTimeout()
+  {
+    return staticConfig.getTaskLaunchTimeout();
+  }
+
+  @Override
+  public Period getLogSaveTimeout()
+  {
+    return staticConfig.getLogSaveTimeout();
+  }
+
+  @Override
+  public List<String> getPeonMonitors()
+  {
+    return staticConfig.getPeonMonitors();
+  }
+
+  @Override
+  public List<String> getJavaOptsArray()
+  {
+    return staticConfig.getJavaOptsArray();
+  }
+
+  @Override
+  public int getCpuCoreInMicro()
+  {
+    return staticConfig.getCpuCoreInMicro();
+  }
+
+  @Override
+  public Map<String, String> getLabels()
+  {
+    return staticConfig.getLabels();
+  }
+
+  @Override
+  public Map<String, String> getAnnotations()
+  {
+    return staticConfig.getAnnotations();
+  }
+
+  @Override
+  public Integer getCapacity()
+  {
+    if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null 
|| dynamicConfigSupplier.get().getCapacity() == null) {
+      return staticConfig.getCapacity();
+    }
+    return dynamicConfigSupplier.get().getCapacity();
+  }
+
+  public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
+  {
+    if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null 
|| dynamicConfigSupplier.get().getPodTemplateSelectStrategy() == null) {
+      return KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY;
+    }
+    return dynamicConfigSupplier.get().getPodTemplateSelectStrategy();
+  }
+}
+
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index dd8111ed49e..516d229c891 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -21,6 +21,7 @@ package org.apache.druid.k8s.overlord;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
+import org.apache.druid.common.config.ConfigManager;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.indexing.overlord.TaskRunnerFactory;
@@ -39,23 +40,25 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
   public static final String TYPE_NAME = "k8s";
   private final ObjectMapper smileMapper;
   private final HttpClient httpClient;
-  private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
+  private final KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig;
   private final TaskLogs taskLogs;
   private final DruidKubernetesClient druidKubernetesClient;
   private final ServiceEmitter emitter;
   private KubernetesTaskRunner runner;
   private final TaskAdapter taskAdapter;
+  private final ConfigManager configManager;
   private final Set<String> adapterTypeAllowingTasksInDifferentNamespaces = 
Set.of(PodTemplateTaskAdapter.TYPE);
 
   @Inject
   public KubernetesTaskRunnerFactory(
       @Smile ObjectMapper smileMapper,
       @EscalatedGlobal final HttpClient httpClient,
-      KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
+      KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig,
       TaskLogs taskLogs,
       DruidKubernetesClient druidKubernetesClient,
       ServiceEmitter emitter,
-      TaskAdapter taskAdapter
+      TaskAdapter taskAdapter,
+      ConfigManager configManager
   )
   {
     this.smileMapper = smileMapper;
@@ -65,6 +68,7 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
     this.druidKubernetesClient = druidKubernetesClient;
     this.emitter = emitter;
     this.taskAdapter = taskAdapter;
+    this.configManager = configManager;
   }
 
   @Override
@@ -99,7 +103,8 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
             smileMapper,
             
kubernetesTaskRunnerConfig.getLogSaveTimeout().toStandardDuration().getMillis()
         ),
-        emitter
+        emitter,
+        configManager
     );
     return runner;
   }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
similarity index 65%
copy from 
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
copy to 
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
index 663126ba2e6..b68e70075db 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
@@ -32,7 +32,12 @@ import javax.validation.constraints.NotNull;
 import java.util.List;
 import java.util.Map;
 
-public class KubernetesTaskRunnerConfig
+/**
+ * This configuration is populated from runtime properties with the prefix
+ * {@code druid.indexer.runner}. It is the base configuration that
+ * {@link KubernetesTaskRunnerEffectiveConfig} will use if no dynamic config 
is provided.
+ */
+public class KubernetesTaskRunnerStaticConfig implements 
KubernetesTaskRunnerConfig
 {
   @JsonProperty
   @NotNull
@@ -139,11 +144,11 @@ public class KubernetesTaskRunnerConfig
   @NotNull
   private Integer capacity = Integer.MAX_VALUE;
 
-  public KubernetesTaskRunnerConfig()
+  public KubernetesTaskRunnerStaticConfig()
   {
   }
 
-  private KubernetesTaskRunnerConfig(
+  public KubernetesTaskRunnerStaticConfig(
       @Nonnull String namespace,
       String overlordNamespace,
       String k8sTaskPodNamePrefix,
@@ -244,298 +249,131 @@ public class KubernetesTaskRunnerConfig
     );
   }
 
+  @Override
   public String getNamespace()
   {
     return namespace;
   }
 
+  @Override
   public String getOverlordNamespace()
   {
     return overlordNamespace;
   }
 
+  @Override
   public String getK8sTaskPodNamePrefix()
   {
     return k8sTaskPodNamePrefix;
   }
 
+  @Override
   public boolean isDebugJobs()
   {
     return debugJobs;
   }
 
+  @Override
   @Deprecated
   public boolean isSidecarSupport()
   {
     return sidecarSupport;
   }
 
+  @Override
   public String getPrimaryContainerName()
   {
     return primaryContainerName;
   }
 
+  @Override
   public String getKubexitImage()
   {
     return kubexitImage;
   }
 
+  @Override
   public Long getGraceTerminationPeriodSeconds()
   {
     return graceTerminationPeriodSeconds;
   }
 
+  @Override
   public boolean isDisableClientProxy()
   {
     return disableClientProxy;
   }
 
+  @Override
   public Period getTaskTimeout()
   {
     return maxTaskDuration;
   }
 
+  @Override
   public Period getTaskJoinTimeout()
   {
     return taskJoinTimeout;
   }
 
 
+  @Override
   public Period getTaskCleanupDelay()
   {
     return taskCleanupDelay;
   }
 
+  @Override
   public Period getTaskCleanupInterval()
   {
     return taskCleanupInterval;
   }
 
+  @Override
   public Period getTaskLaunchTimeout()
   {
     return k8sjobLaunchTimeout;
   }
 
+  @Override
   public Period getLogSaveTimeout()
   {
     return logSaveTimeout;
   }
 
+  @Override
   public List<String> getPeonMonitors()
   {
     return peonMonitors;
   }
 
+  @Override
   public List<String> getJavaOptsArray()
   {
     return javaOptsArray;
   }
 
+  @Override
   public int getCpuCoreInMicro()
   {
     return cpuCoreInMicro;
   }
 
+  @Override
   public Map<String, String> getLabels()
   {
     return labels;
   }
 
+  @Override
   public Map<String, String> getAnnotations()
   {
     return annotations;
   }
 
+  @Override
   public Integer getCapacity()
   {
     return capacity;
   }
-
-  public static Builder builder()
-  {
-    return new Builder();
-  }
-
-  public static class Builder
-  {
-    private String namespace;
-    private String overlordNamespace;
-    private String k8sTaskPodNamePrefix;
-    private boolean debugJob;
-    private boolean sidecarSupport;
-    private String primaryContainerName;
-    private String kubexitImage;
-    private Long graceTerminationPeriodSeconds;
-    private boolean disableClientProxy;
-    private Period maxTaskDuration;
-    private Period taskCleanupDelay;
-    private Period taskCleanupInterval;
-    private Period k8sjobLaunchTimeout;
-    private List<String> peonMonitors;
-    private List<String> javaOptsArray;
-    private int cpuCoreInMicro;
-    private Map<String, String> labels;
-    private Map<String, String> annotations;
-    private Integer capacity;
-    private Period taskJoinTimeout;
-    private Period logSaveTimeout;
-
-    public Builder()
-    {
-    }
-
-    public Builder withNamespace(String namespace)
-    {
-      this.namespace = namespace;
-      return this;
-    }
-
-    public Builder withOverlordNamespace(String overlordNamespace)
-    {
-      this.overlordNamespace = overlordNamespace;
-      return this;
-    }
-
-    public Builder withK8sTaskPodNamePrefix(String k8sTaskPodNamePrefix)
-    {
-      this.k8sTaskPodNamePrefix = k8sTaskPodNamePrefix;
-      return this;
-    }
-
-    public Builder withDebugJob(boolean debugJob)
-    {
-      this.debugJob = debugJob;
-      return this;
-    }
-
-    public Builder withSidecarSupport(boolean sidecarSupport)
-    {
-      this.sidecarSupport = sidecarSupport;
-      return this;
-    }
-
-    public Builder withPrimaryContainerName(String primaryContainerName)
-    {
-      this.primaryContainerName = primaryContainerName;
-      return this;
-    }
-
-    public Builder withKubexitImage(String kubexitImage)
-    {
-      this.kubexitImage = kubexitImage;
-      return this;
-    }
-
-    public Builder withGraceTerminationPeriodSeconds(Long 
graceTerminationPeriodSeconds)
-    {
-      this.graceTerminationPeriodSeconds = graceTerminationPeriodSeconds;
-      return this;
-    }
-
-    public Builder withDisableClientProxy(boolean disableClientProxy)
-    {
-      this.disableClientProxy = disableClientProxy;
-      return this;
-    }
-
-    public Builder withTaskTimeout(Period taskTimeout)
-    {
-      this.maxTaskDuration = taskTimeout;
-      return this;
-    }
-
-    public Builder withTaskCleanupDelay(Period taskCleanupDelay)
-    {
-      this.taskCleanupDelay = taskCleanupDelay;
-      return this;
-    }
-
-    public Builder withTaskCleanupInterval(Period taskCleanupInterval)
-    {
-      this.taskCleanupInterval = taskCleanupInterval;
-      return this;
-    }
-
-    public Builder withK8sJobLaunchTimeout(Period k8sjobLaunchTimeout)
-    {
-      this.k8sjobLaunchTimeout = k8sjobLaunchTimeout;
-      return this;
-    }
-
-    public Builder withPeonMonitors(List<String> peonMonitors)
-    {
-      this.peonMonitors = peonMonitors;
-      return this;
-    }
-
-    public Builder withCpuCore(int cpuCore)
-    {
-      this.cpuCoreInMicro = cpuCore;
-      return this;
-    }
-
-    public Builder withJavaOptsArray(List<String> javaOptsArray)
-    {
-      this.javaOptsArray = javaOptsArray;
-      return this;
-    }
-
-    public Builder withLabels(Map<String, String> labels)
-    {
-      this.labels = labels;
-      return this;
-    }
-
-    public Builder withAnnotations(Map<String, String> annotations)
-    {
-      this.annotations = annotations;
-      return this;
-    }
-
-
-    public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer 
capacity)
-    {
-      this.capacity = capacity;
-      return this;
-    }
-
-    public Builder withTaskJoinTimeout(Period taskJoinTimeout)
-    {
-      this.taskJoinTimeout = taskJoinTimeout;
-      return this;
-    }
-
-    public Builder withLogSaveTimeout(Period logSaveTimeout)
-    {
-      this.logSaveTimeout = logSaveTimeout;
-      return this;
-    }
-
-    public KubernetesTaskRunnerConfig build()
-    {
-      return new KubernetesTaskRunnerConfig(
-          this.namespace,
-          this.overlordNamespace,
-          this.k8sTaskPodNamePrefix,
-          this.debugJob,
-          this.sidecarSupport,
-          this.primaryContainerName,
-          this.kubexitImage,
-          this.graceTerminationPeriodSeconds,
-          this.disableClientProxy,
-          this.maxTaskDuration,
-          this.taskCleanupDelay,
-          this.taskCleanupInterval,
-          this.k8sjobLaunchTimeout,
-          this.logSaveTimeout,
-          this.peonMonitors,
-          this.javaOptsArray,
-          this.cpuCoreInMicro,
-          this.labels,
-          this.annotations,
-          this.capacity,
-          this.taskJoinTimeout
-      );
-    }
-  }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
index eddd5e4a1ee..fde4dcc7b0b 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
@@ -21,21 +21,26 @@ package org.apache.druid.k8s.overlord.execution;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 public class DefaultKubernetesTaskRunnerDynamicConfig implements 
KubernetesTaskRunnerDynamicConfig
 {
+  @Nullable
   private final PodTemplateSelectStrategy podTemplateSelectStrategy;
 
+  @Nullable
+  private final Integer capacity;
+
   @JsonCreator
   public DefaultKubernetesTaskRunnerDynamicConfig(
-      @JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy 
podTemplateSelectStrategy
+      @JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy 
podTemplateSelectStrategy,
+      @JsonProperty("capacity") Integer capacity
   )
   {
-    Preconditions.checkNotNull(podTemplateSelectStrategy);
     this.podTemplateSelectStrategy = podTemplateSelectStrategy;
+    this.capacity = capacity;
   }
 
   @Override
@@ -45,6 +50,31 @@ public class DefaultKubernetesTaskRunnerDynamicConfig 
implements KubernetesTaskR
     return podTemplateSelectStrategy;
   }
 
+  @Override
+  @JsonProperty
+  public Integer getCapacity()
+  {
+    return capacity;
+  }
+
+  @Override
+  public KubernetesTaskRunnerDynamicConfig 
merge(KubernetesTaskRunnerDynamicConfig other)
+  {
+    if (other == null) {
+      return this;
+    }
+    Integer mergeCapacity = getCapacity();
+    if (other.getCapacity() != null) {
+      mergeCapacity = other.getCapacity();
+    }
+
+    PodTemplateSelectStrategy mergePodTemplateSelectStrategy = 
getPodTemplateSelectStrategy();
+    if (other.getPodTemplateSelectStrategy() != null) {
+      mergePodTemplateSelectStrategy = other.getPodTemplateSelectStrategy();
+    }
+    return new 
DefaultKubernetesTaskRunnerDynamicConfig(mergePodTemplateSelectStrategy, 
mergeCapacity);
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -55,13 +85,14 @@ public class DefaultKubernetesTaskRunnerDynamicConfig 
implements KubernetesTaskR
       return false;
     }
     DefaultKubernetesTaskRunnerDynamicConfig that = 
(DefaultKubernetesTaskRunnerDynamicConfig) o;
-    return Objects.equals(podTemplateSelectStrategy, 
that.podTemplateSelectStrategy);
+    return Objects.equals(capacity, that.capacity) &&
+           Objects.equals(podTemplateSelectStrategy, 
that.podTemplateSelectStrategy);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hashCode(podTemplateSelectStrategy);
+    return Objects.hash(podTemplateSelectStrategy, capacity);
   }
 
   @Override
@@ -69,6 +100,7 @@ public class DefaultKubernetesTaskRunnerDynamicConfig 
implements KubernetesTaskR
   {
     return "DefaultKubernetesTaskRunnerDynamicConfig{" +
            "podTemplateSelectStrategy=" + podTemplateSelectStrategy +
+           "capacity=" + capacity +
            '}';
   }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
index ec03b045f50..432a41933ed 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
@@ -73,7 +73,7 @@ public class KubernetesTaskExecutionConfigResource
    * Updates the Kubernetes execution configuration.
    *
    * @param dynamicConfig the new execution configuration to set
-   * @param req             the HTTP servlet request providing context for 
audit information
+   * @param req           the HTTP servlet request providing context for audit 
information
    * @return a response indicating the success or failure of the update 
operation
    */
   @POST
@@ -84,13 +84,19 @@ public class KubernetesTaskExecutionConfigResource
       @Context final HttpServletRequest req
   )
   {
+    KubernetesTaskRunnerDynamicConfig currentConfig = getDynamicConfig();
+    KubernetesTaskRunnerDynamicConfig mergedConfig = dynamicConfig;
+
+    if (currentConfig != null) {
+      mergedConfig = currentConfig.merge(dynamicConfig);
+    }
     final ConfigManager.SetResult setResult = configManager.set(
         KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
-        dynamicConfig,
+        mergedConfig,
         AuthorizationUtils.buildAuditInfo(req)
     );
     if (setResult.isOk()) {
-      log.info("Updating K8s execution configs: %s", dynamicConfig);
+      log.info("Updating K8s execution configs: %s", mergedConfig);
 
       return Response.ok().build();
     } else {
@@ -147,11 +153,15 @@ public class KubernetesTaskExecutionConfigResource
   @Produces(MediaType.APPLICATION_JSON)
   @ResourceFilters(ConfigResourceFilter.class)
   public Response getExecutionConfig()
+  {
+    return Response.ok(getDynamicConfig()).build();
+  }
+
+  private KubernetesTaskRunnerDynamicConfig getDynamicConfig()
   {
     if (dynamicConfigRef == null) {
       dynamicConfigRef = 
configManager.watch(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, 
KubernetesTaskRunnerDynamicConfig.class);
     }
-
-    return Response.ok(dynamicConfigRef.get()).build();
+    return dynamicConfigRef.get();
   }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
index 4f6d4b07c41..fd9c9b46531 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
@@ -22,6 +22,9 @@ package org.apache.druid.k8s.overlord.execution;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+
 /**
  * Represents the configuration for task execution within a Kubernetes 
environment.
  * This interface allows for dynamic configuration of task execution 
strategies based
@@ -38,7 +41,26 @@ public interface KubernetesTaskRunnerDynamicConfig
 
   /**
    * Retrieves the execution behavior strategy associated with this 
configuration.
+   *
    * @return the execution behavior strategy
    */
   PodTemplateSelectStrategy getPodTemplateSelectStrategy();
+
+  /**
+   * Retrieves the capacity associated with this configuration.
+   *
+   * @return the capacity
+   */
+  @Min(0)
+  @Max(Integer.MAX_VALUE)
+  Integer getCapacity();
+
+  /**
+   * Merges this configuration with another, preferring values from {@code 
other}
+   * and falling back to this configuration when not present.
+   *
+   * @param other the configuration to merge with
+   * @return the merged configuration
+   */
+  KubernetesTaskRunnerDynamicConfig merge(KubernetesTaskRunnerDynamicConfig 
other);
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
index 92832b2ff66..ff4a4c2cd72 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
@@ -26,8 +26,7 @@ import io.fabric8.kubernetes.client.utils.Serialization;
 import org.apache.druid.guice.IndexingServiceModuleHelper;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.IAE;
-import 
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
-import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerEffectiveConfig;
 
 import java.io.File;
 import java.nio.file.Files;
@@ -43,17 +42,17 @@ public class DynamicConfigPodTemplateSelector implements 
PodTemplateSelector
                                               + ".k8s.podTemplate.";
 
   private final Properties properties;
-  private final Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
+  private final KubernetesTaskRunnerEffectiveConfig effectiveConfig;
   // Supplier allows Overlord to read the most recent pod template file 
without calling initializeTemplatesFromFileSystem() again.
   private HashMap<String, Supplier<PodTemplate>> podTemplates;
 
   public DynamicConfigPodTemplateSelector(
       Properties properties,
-      Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef
+      KubernetesTaskRunnerEffectiveConfig effectiveConfig
   )
   {
     this.properties = properties;
-    this.dynamicConfigRef = dynamicConfigRef;
+    this.effectiveConfig = effectiveConfig;
     initializeTemplatesFromFileSystem();
   }
 
@@ -120,14 +119,6 @@ public class DynamicConfigPodTemplateSelector implements 
PodTemplateSelector
   @Override
   public Optional<PodTemplateWithName> getPodTemplateForTask(Task task)
   {
-    PodTemplateSelectStrategy podTemplateSelectStrategy;
-    KubernetesTaskRunnerDynamicConfig dynamicConfig = dynamicConfigRef.get();
-    if (dynamicConfig == null || dynamicConfig.getPodTemplateSelectStrategy() 
== null) {
-      podTemplateSelectStrategy = 
KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY;
-    } else {
-      podTemplateSelectStrategy = dynamicConfig.getPodTemplateSelectStrategy();
-    }
-
-    return Optional.of(podTemplateSelectStrategy.getPodTemplateForTask(task, 
podTemplates));
+    return 
Optional.of(effectiveConfig.getPodTemplateSelectStrategy().getPodTemplateForTask(task,
 podTemplates));
   }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
index 2744b82b126..8c68d9324e8 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
@@ -269,7 +269,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
     }
     return podTemplateAnnotationBuilder.build();
   }
-  
+
   private Map<String, String> getJobLabels(KubernetesTaskRunnerConfig config, 
Task task)
   {
     Preconditions.checkNotNull(config.getNamespace(), "When using Custom Pod 
Templates, druid.indexer.runner.namespace cannot be null.");
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
index 59c9508005f..dd8960864e0 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -27,6 +27,7 @@ import com.google.inject.Injector;
 import com.google.inject.ProvisionException;
 import com.google.inject.TypeLiteral;
 import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigManager;
 import org.apache.druid.common.config.ConfigManagerConfig;
 import org.apache.druid.guice.ConfigModule;
 import org.apache.druid.guice.DruidGuiceExtensions;
@@ -44,6 +45,7 @@ import 
org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClient
 import 
org.apache.druid.k8s.overlord.common.httpclient.jdk.DruidKubernetesJdkHttpClientFactory;
 import 
org.apache.druid.k8s.overlord.common.httpclient.okhttp.DruidKubernetesOkHttpHttpClientFactory;
 import 
org.apache.druid.k8s.overlord.common.httpclient.vertx.DruidKubernetesVertxHttpClientFactory;
+import 
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
 import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
 import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
 import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
@@ -51,14 +53,18 @@ import 
org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
 import org.apache.druid.metadata.MetadataStorageConnector;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.server.DruidNode;
+import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.net.URL;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 @RunWith(EasyMockRunner.class)
 public class KubernetesOverlordModuleTest
@@ -81,8 +87,26 @@ public class KubernetesOverlordModuleTest
   private AuditManager auditManager;
   @Mock
   private MetadataStorageConnector metadataStorageConnector;
+  @Mock
+  private ConfigManager configManager;
   private Injector injector;
 
+  @Before
+  public void setUpConfigManagerMock()
+  {
+    EasyMock.reset(configManager);
+    EasyMock.expect(configManager.watchConfig(
+        EasyMock.anyString(),
+        EasyMock.anyObject()
+    )).andReturn(new AtomicReference<>(null)).anyTimes();
+    EasyMock.expect(configManager.addListener(
+        EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
+        EasyMock.anyString(),
+        EasyMock.anyObject(Consumer.class)
+    )).andReturn(true).anyTimes();
+    EasyMock.replay(configManager);
+  }
+
   @Test
   public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully()
   {
@@ -325,6 +349,7 @@ public class KubernetesOverlordModuleTest
                   
}).toInstance(Suppliers.ofInstance(metadataStorageTablesConfig));
               binder.bind(AuditManager.class).toInstance(auditManager);
               
binder.bind(MetadataStorageConnector.class).toInstance(metadataStorageConnector);
+              binder.bind(ConfigManager.class).toInstance(configManager);
             },
             new ConfigModule(),
             new IndexingServiceTaskLogsModule(props),
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfigTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfigTest.java
new file mode 100644
index 00000000000..c44f8a34b4e
--- /dev/null
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfigTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.base.Supplier;
+import 
org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
+import 
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;
+import 
org.apache.druid.k8s.overlord.execution.TaskTypePodTemplateSelectStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KubernetesTaskRunnerEffectiveConfigTest
+{
+  @Test
+  public void test_getCapacity_usesStaticWhenDynamicNull()
+  {
+    KubernetesTaskRunnerStaticConfig staticConfig = 
KubernetesTaskRunnerConfig.builder()
+        .withCapacity(7)
+        .build();
+    Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> null;
+    KubernetesTaskRunnerEffectiveConfig effective = new 
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+    Assert.assertEquals(7, effective.getCapacity().intValue());
+  }
+
+  @Test
+  public void test_getCapacity_usesDynamicWhenProvided()
+  {
+    KubernetesTaskRunnerStaticConfig staticConfig = 
KubernetesTaskRunnerConfig.builder()
+        .withCapacity(2)
+        .build();
+    Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> new 
DefaultKubernetesTaskRunnerDynamicConfig(null, 9);
+    KubernetesTaskRunnerEffectiveConfig effective = new 
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+    Assert.assertEquals(9, effective.getCapacity().intValue());
+  }
+
+  @Test
+  public void test_getCapacity_usesStaticWhenDynamicNullCapacity()
+  {
+    KubernetesTaskRunnerStaticConfig staticConfig = 
KubernetesTaskRunnerConfig.builder()
+        .withCapacity(7)
+        .build();
+    Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> new 
DefaultKubernetesTaskRunnerDynamicConfig(null, null);
+    KubernetesTaskRunnerEffectiveConfig effective = new 
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+    Assert.assertEquals(7, effective.getCapacity().intValue());
+  }
+
+  @Test
+  public void test_getPodTemplateSelectStrategy_usesDefaultWhenDynamicNull()
+  {
+    KubernetesTaskRunnerStaticConfig staticConfig = 
KubernetesTaskRunnerConfig.builder().build();
+    Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> null;
+    KubernetesTaskRunnerEffectiveConfig effective = new 
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+    PodTemplateSelectStrategy strategy = 
effective.getPodTemplateSelectStrategy();
+    Assert.assertTrue(strategy instanceof TaskTypePodTemplateSelectStrategy);
+    Assert.assertEquals(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY, 
strategy);
+  }
+
+  @Test
+  public void test_getPodTemplateSelectStrategy_usesDynamicWhenProvided()
+  {
+    KubernetesTaskRunnerStaticConfig staticConfig = 
KubernetesTaskRunnerConfig.builder().build();
+    PodTemplateSelectStrategy custom = new TaskTypePodTemplateSelectStrategy();
+    Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> new 
DefaultKubernetesTaskRunnerDynamicConfig(custom, null);
+    KubernetesTaskRunnerEffectiveConfig effective = new 
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+    Assert.assertEquals(custom, effective.getPodTemplateSelectStrategy());
+  }
+}
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
index 6849f9a0ecc..a67ab70a0a8 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.client.ConfigBuilder;
+import org.apache.druid.common.config.ConfigManager;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -32,6 +33,7 @@ import 
org.apache.druid.k8s.overlord.common.httpclient.vertx.DruidKubernetesVert
 import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
 import org.apache.druid.tasklogs.NoopTaskLogs;
 import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.EasyMock;
 import org.easymock.Mock;
 import org.junit.Assert;
 import org.junit.Before;
@@ -42,24 +44,28 @@ import java.io.IOException;
 public class KubernetesTaskRunnerFactoryTest
 {
   private ObjectMapper objectMapper;
-  private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
+  private KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig;
   private TaskLogs taskLogs;
 
   private DruidKubernetesClient druidKubernetesClient;
   @Mock private ServiceEmitter emitter;
   private TaskAdapter taskAdapter;
+  @Mock private ConfigManager configManager;
 
   @Before
   public void setup()
   {
     objectMapper = new TestUtils().getTestObjectMapper();
-    kubernetesTaskRunnerConfig = KubernetesTaskRunnerConfig.builder()
+    KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerStaticConfig = 
KubernetesTaskRunnerConfig.builder()
         .withCapacity(1)
         .build();
     taskLogs = new NoopTaskLogs();
     druidKubernetesClient =
         new DruidKubernetesClient(new 
DruidKubernetesVertxHttpClientFactory(new 
DruidKubernetesVertxHttpClientConfig()), new ConfigBuilder().build());
     taskAdapter = new TestTaskAdapter();
+    kubernetesTaskRunnerConfig = new 
KubernetesTaskRunnerEffectiveConfig(kubernetesTaskRunnerStaticConfig, () -> 
null);
+    configManager = EasyMock.createNiceMock(ConfigManager.class);
+    EasyMock.replay(configManager);
   }
 
   @Test
@@ -72,7 +78,8 @@ public class KubernetesTaskRunnerFactoryTest
         taskLogs,
         druidKubernetesClient,
         emitter,
-        taskAdapter
+        taskAdapter,
+        configManager
     );
 
     KubernetesTaskRunner expectedRunner = factory.build();
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfigTest.java
similarity index 94%
rename from 
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
rename to 
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfigTest.java
index 1f4a7281f64..91b5148e268 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfigTest.java
@@ -29,15 +29,15 @@ import org.junit.Test;
 
 import java.io.IOException;
 
-public class KubernetesTaskRunnerConfigTest
+public class KubernetesTaskRunnerStaticConfigTest
 {
   @Test
   public void test_deserializable() throws IOException
   {
     ObjectMapper mapper = new DefaultObjectMapper();
-    KubernetesTaskRunnerConfig config = mapper.readValue(
+    KubernetesTaskRunnerStaticConfig config = mapper.readValue(
         
this.getClass().getClassLoader().getResource("kubernetesTaskRunnerConfig.json"),
-        KubernetesTaskRunnerConfig.class
+        KubernetesTaskRunnerStaticConfig.class
     );
 
     Assert.assertEquals("namespace", config.getNamespace());
@@ -60,7 +60,7 @@ public class KubernetesTaskRunnerConfigTest
   @Test
   public void test_builder_preservesDefaults()
   {
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
         .withNamespace("namespace")
         .withDisableClientProxy(true)
         .build();
@@ -85,7 +85,7 @@ public class KubernetesTaskRunnerConfigTest
   @Test
   public void test_builder()
   {
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
         .withNamespace("namespace")
         .withDebugJob(true)
         .withSidecarSupport(true)
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 083bf2db0e6..1c72078a52e 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.k8s.overlord;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -27,6 +28,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import org.apache.commons.io.IOUtils;
+import org.apache.druid.common.config.ConfigManager;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
@@ -39,6 +41,8 @@ import org.apache.druid.java.util.http.client.Request;
 import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import 
org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
+import 
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
 import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -53,6 +57,8 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
@@ -60,6 +66,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -78,27 +85,37 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
   @Mock private ServiceEmitter emitter;
   @Mock private ListenableFuture<TaskStatus> statusFuture;
+  @Mock private ConfigManager configManager;
 
-  private KubernetesTaskRunnerConfig config;
+  private KubernetesTaskRunnerStaticConfig staticConfig;
+  private KubernetesTaskRunnerEffectiveConfig config;
   private KubernetesTaskRunner runner;
   private Task task;
 
   @Before
   public void setup()
   {
-    config = KubernetesTaskRunnerConfig.builder()
+    staticConfig = KubernetesTaskRunnerConfig.builder()
         .withCapacity(1)
         .build();
 
+    Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef = () -> new 
DefaultKubernetesTaskRunnerDynamicConfig(null, 1);
+
+    config = new KubernetesTaskRunnerEffectiveConfig(staticConfig, 
dynamicConfigRef);
+
     task = K8sTestUtils.createTask(ID, 0);
 
+    configManager = EasyMock.createNiceMock(ConfigManager.class);
+    EasyMock.replay(configManager);
+
     runner = new KubernetesTaskRunner(
         taskAdapter,
         config,
         peonClient,
         httpClient,
         new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
+        emitter,
+        configManager
     );
   }
 
@@ -113,7 +130,8 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
         peonClient,
         httpClient,
         new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
+        emitter,
+        configManager
     )
     {
       @Override
@@ -163,7 +181,8 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
         peonClient,
         httpClient,
         new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
+        emitter,
+        configManager
     )
     {
       @Override
@@ -219,7 +238,8 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
         peonClient,
         httpClient,
         new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
+        emitter,
+        configManager
     )
     {
       @Override
@@ -771,9 +791,75 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
         peonClient,
         httpClient,
         new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
-        emitter
+        emitter,
+        configManager
     );
     kubernetesTaskRunner.stop();
     Assert.assertThrows(RejectedExecutionException.class, () -> 
kubernetesTaskRunner.run(task));
   }
+
+  @Test
+  public void 
test_syncCapacityWithDynamicConfig_increase_updatesExecutorAndCapacity() throws 
Exception
+  {
+    Method method = KubernetesTaskRunner.class.getDeclaredMethod(
+        "syncCapacityWithDynamicConfig",
+        KubernetesTaskRunnerDynamicConfig.class
+    );
+    method.setAccessible(true);
+
+    // increase from 1 -> 3
+    method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null, 
3));
+
+    Field tpeField = KubernetesTaskRunner.class.getDeclaredField("tpe");
+    tpeField.setAccessible(true);
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) tpeField.get(runner);
+
+    Assert.assertEquals(3, executor.getCorePoolSize());
+    Assert.assertEquals(3, executor.getMaximumPoolSize());
+    Assert.assertEquals(3, runner.getTotalCapacity());
+  }
+
+  @Test
+  public void 
test_syncCapacityWithDynamicConfig_decrease_updatesExecutorAndCapacity() throws 
Exception
+  {
+    Method method = KubernetesTaskRunner.class.getDeclaredMethod(
+        "syncCapacityWithDynamicConfig",
+        KubernetesTaskRunnerDynamicConfig.class
+    );
+    method.setAccessible(true);
+
+    // first increase to 4 to ensure we can decrease after
+    method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null, 
4));
+    // then decrease 4 -> 2
+    method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null, 
2));
+
+    Field tpeField = KubernetesTaskRunner.class.getDeclaredField("tpe");
+    tpeField.setAccessible(true);
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) tpeField.get(runner);
+
+    Assert.assertEquals(2, executor.getCorePoolSize());
+    Assert.assertEquals(2, executor.getMaximumPoolSize());
+    Assert.assertEquals(2, runner.getTotalCapacity());
+  }
+
+  @Test
+  public void 
test_syncCapacityWithDynamicConfig_sameCapacity_noChangeAndNoError() throws 
Exception
+  {
+    Method method = KubernetesTaskRunner.class.getDeclaredMethod(
+        "syncCapacityWithDynamicConfig",
+        KubernetesTaskRunnerDynamicConfig.class
+    );
+    method.setAccessible(true);
+
+    // initial capacity is 1 in setup; calling with 1 should be a no-op
+    method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null, 
1));
+
+    Field tpeField = KubernetesTaskRunner.class.getDeclaredField("tpe");
+    tpeField.setAccessible(true);
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) tpeField.get(runner);
+
+    Assert.assertEquals(1, executor.getCorePoolSize());
+    Assert.assertEquals(1, executor.getMaximumPoolSize());
+    Assert.assertEquals(1, runner.getTotalCapacity());
+  }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
index de8919e329d..31c6d7fdd6d 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
@@ -31,17 +31,26 @@ public class DefaultKubernetesTaskRunnerDynamicConfigTest
   public void getPodTemplateSelectStrategyTest()
   {
     PodTemplateSelectStrategy strategy = new 
TaskTypePodTemplateSelectStrategy();
-    DefaultKubernetesTaskRunnerDynamicConfig config = new 
DefaultKubernetesTaskRunnerDynamicConfig(strategy);
+    DefaultKubernetesTaskRunnerDynamicConfig config = new 
DefaultKubernetesTaskRunnerDynamicConfig(strategy, 1);
 
     Assert.assertEquals(strategy, config.getPodTemplateSelectStrategy());
   }
 
+  @Test
+  public void getCapacityTest()
+  {
+    Integer capacity = 4;
+    DefaultKubernetesTaskRunnerDynamicConfig config = new 
DefaultKubernetesTaskRunnerDynamicConfig(null, 4);
+
+    Assert.assertEquals(capacity, config.getCapacity());
+  }
+
   @Test
   public void testSerde() throws Exception
   {
     final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
     PodTemplateSelectStrategy strategy = new 
TaskTypePodTemplateSelectStrategy();
-    DefaultKubernetesTaskRunnerDynamicConfig config = new 
DefaultKubernetesTaskRunnerDynamicConfig(strategy);
+    DefaultKubernetesTaskRunnerDynamicConfig config = new 
DefaultKubernetesTaskRunnerDynamicConfig(strategy, 1);
 
     DefaultKubernetesTaskRunnerDynamicConfig config2 = objectMapper.readValue(
         objectMapper.writeValueAsBytes(config),
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
index b76b7eaf0cf..c06056f0113 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -52,6 +53,10 @@ public class KubernetesTaskExecutionConfigResourceTest
   @Test
   public void setExecutionConfigSuccessfulUpdate()
   {
+    EasyMock.expect(configManager.watch(
+        KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+        KubernetesTaskRunnerDynamicConfig.class
+    )).andReturn(new AtomicReference<>(null));
     KubernetesTaskExecutionConfigResource testedResource = new 
KubernetesTaskExecutionConfigResource(
         configManager,
         auditManager
@@ -75,6 +80,10 @@ public class KubernetesTaskExecutionConfigResourceTest
   @Test
   public void setExecutionConfigFailedUpdate()
   {
+    EasyMock.expect(configManager.watch(
+        KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+        KubernetesTaskRunnerDynamicConfig.class
+    )).andReturn(new AtomicReference<>(null));
     KubernetesTaskExecutionConfigResource testedResource = new 
KubernetesTaskExecutionConfigResource(
         configManager,
         auditManager
@@ -94,4 +103,116 @@ public class KubernetesTaskExecutionConfigResourceTest
     Response result = testedResource.setExecutionConfig(dynamicConfig, req);
     assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
result.getStatus());
   }
+
+  @Test
+  public void 
setExecutionConfig_MergeUsesCurrentCapacityWhenRequestCapacityNull()
+  {
+    KubernetesTaskExecutionConfigResource testedResource = new 
KubernetesTaskExecutionConfigResource(
+        configManager,
+        auditManager
+    );
+
+    PodTemplateSelectStrategy currentStrategy = new 
TaskTypePodTemplateSelectStrategy();
+    KubernetesTaskRunnerDynamicConfig currentConfig = new 
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 5);
+    EasyMock.expect(configManager.watch(
+        KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+        KubernetesTaskRunnerDynamicConfig.class
+    )).andReturn(new AtomicReference<>(currentConfig));
+
+    PodTemplateSelectStrategy requestStrategy = new 
TaskTypePodTemplateSelectStrategy();
+    KubernetesTaskRunnerDynamicConfig requestConfig = new 
DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, null);
+
+    KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new 
DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, 5);
+
+    
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
+    
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
+    
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
+    EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
+    EasyMock.replay(req);
+
+    EasyMock.expect(configManager.set(
+        KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+        expectedMergedConfig,
+        AuthorizationUtils.buildAuditInfo(req)
+    )).andReturn(ConfigManager.SetResult.ok());
+
+    EasyMock.replay(configManager, auditManager);
+
+    Response result = testedResource.setExecutionConfig(requestConfig, req);
+    assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
+  }
+
+  @Test
+  public void 
setExecutionConfig_MergeUsesCurrentStrategyWhenRequestStrategyNull()
+  {
+    KubernetesTaskExecutionConfigResource testedResource = new 
KubernetesTaskExecutionConfigResource(
+        configManager,
+        auditManager
+    );
+
+    PodTemplateSelectStrategy currentStrategy = new 
TaskTypePodTemplateSelectStrategy();
+    KubernetesTaskRunnerDynamicConfig currentConfig = new 
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 2);
+    EasyMock.expect(configManager.watch(
+        KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+        KubernetesTaskRunnerDynamicConfig.class
+    )).andReturn(new AtomicReference<>(currentConfig));
+
+    KubernetesTaskRunnerDynamicConfig requestConfig = new 
DefaultKubernetesTaskRunnerDynamicConfig(null, 7);
+
+    KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new 
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 7);
+
+    
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
+    
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
+    
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
+    EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
+    EasyMock.replay(req);
+
+    EasyMock.expect(configManager.set(
+        KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+        expectedMergedConfig,
+        AuthorizationUtils.buildAuditInfo(req)
+    )).andReturn(ConfigManager.SetResult.ok());
+
+    EasyMock.replay(configManager, auditManager);
+
+    Response result = testedResource.setExecutionConfig(requestConfig, req);
+    assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
+  }
+
+  @Test
+  public void setExecutionConfig_MergeUsesCurrentWhenBothRequestFieldsNull()
+  {
+    KubernetesTaskExecutionConfigResource testedResource = new 
KubernetesTaskExecutionConfigResource(
+        configManager,
+        auditManager
+    );
+
+    PodTemplateSelectStrategy currentStrategy = new 
TaskTypePodTemplateSelectStrategy();
+    KubernetesTaskRunnerDynamicConfig currentConfig = new 
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9);
+    EasyMock.expect(configManager.watch(
+        KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+        KubernetesTaskRunnerDynamicConfig.class
+    )).andReturn(new AtomicReference<>(currentConfig));
+
+    KubernetesTaskRunnerDynamicConfig requestConfig = new 
DefaultKubernetesTaskRunnerDynamicConfig(null, null);
+
+    KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new 
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9);
+
+    
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
+    
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
+    
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
+    EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
+    EasyMock.replay(req);
+
+    EasyMock.expect(configManager.set(
+        KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+        expectedMergedConfig,
+        AuthorizationUtils.buildAuditInfo(req)
+    )).andReturn(ConfigManager.SetResult.ok());
+
+    EasyMock.replay(configManager, auditManager);
+
+    Response result = testedResource.setExecutionConfig(requestConfig, req);
+    assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
+  }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
index 77a819dde9c..b5fac1233c0 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
@@ -36,7 +36,8 @@ public class KubernetesTaskRunnerDynamicConfigTest
                   + "  \"type\": \"default\",\n"
                   + "  \"podTemplateSelectStrategy\": {\n"
                   + "    \"type\": \"default\"\n"
-                  + "  }\n"
+                  + "  },\n"
+                  + "  \"capacity\": 3\n"
                   + "}";
 
     KubernetesTaskRunnerDynamicConfig deserialized = jsonMapper.readValue(
@@ -45,6 +46,7 @@ public class KubernetesTaskRunnerDynamicConfigTest
     );
     PodTemplateSelectStrategy selectStrategy = 
deserialized.getPodTemplateSelectStrategy();
     Assert.assertTrue(selectStrategy instanceof 
TaskTypePodTemplateSelectStrategy);
+    Assert.assertEquals(Integer.valueOf(3), deserialized.getCapacity());
 
     json = "{\n"
            + "  \"type\": \"default\",\n"
@@ -72,5 +74,14 @@ public class KubernetesTaskRunnerDynamicConfigTest
     selectStrategy = deserialized.getPodTemplateSelectStrategy();
     Assert.assertTrue(selectStrategy instanceof 
SelectorBasedPodTemplateSelectStrategy);
     Assert.assertEquals(2, ((SelectorBasedPodTemplateSelectStrategy) 
selectStrategy).getSelectors().size());
+    Assert.assertNull(deserialized.getCapacity());
+
+    json = "{\n"
+           + "  \"type\": \"default\",\n"
+           + "  \"capacity\": 12"
+           + "}";
+    deserialized = jsonMapper.readValue(json, 
KubernetesTaskRunnerDynamicConfig.class);
+    Assert.assertEquals(Integer.valueOf(12), deserialized.getCapacity());
+    Assert.assertNull(deserialized.getPodTemplateSelectStrategy());
   }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 33f8349c619..016f3280a47 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.Task;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
 import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
 import org.apache.druid.k8s.overlord.common.JobResponse;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
@@ -110,9 +111,9 @@ public class DruidPeonClientIntegrationTest
     PodSpec podSpec = K8sTestUtils.getDummyPodSpec();
 
     Task task = K8sTestUtils.getTask();
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("default")
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("default")
+                                                                              
.build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         k8sClient,
         config,
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
index 106a98fa6a8..1052c97c537 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
@@ -29,6 +29,9 @@ import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerEffectiveConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
 import 
org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
 import 
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
@@ -53,14 +56,16 @@ public class DynamicConfigPodTemplateSelectorTest
   private Path tempDir;
   private ObjectMapper mapper;
   private PodTemplate podTemplateSpec;
-  private Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
+  private KubernetesTaskRunnerEffectiveConfig effectiveConfig;
 
   @BeforeEach
   public void setup()
   {
     mapper = new TestUtils().getTestObjectMapper();
     podTemplateSpec = K8sTestUtils.fileToResource("basePodTemplate.yaml", 
PodTemplate.class);
-    dynamicConfigRef = () -> new 
DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY);
+    Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef = () -> new 
DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY,
 1);
+    KubernetesTaskRunnerStaticConfig staticConfig = 
KubernetesTaskRunnerConfig.builder().build();
+    effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(staticConfig, 
dynamicConfigRef);
   }
 
   @Test
@@ -71,7 +76,7 @@ public class DynamicConfigPodTemplateSelectorTest
         IAE.class,
         () -> new DynamicConfigPodTemplateSelector(
             new Properties(),
-            dynamicConfigRef
+            effectiveConfig
         )
     );
     Assertions.assertEquals(
@@ -93,7 +98,7 @@ public class DynamicConfigPodTemplateSelectorTest
         IAE.class,
         () -> new DynamicConfigPodTemplateSelector(
             props,
-            dynamicConfigRef
+            effectiveConfig
         )
     );
 
@@ -111,7 +116,7 @@ public class DynamicConfigPodTemplateSelectorTest
 
     DynamicConfigPodTemplateSelector adapter = new 
DynamicConfigPodTemplateSelector(
         props,
-        dynamicConfigRef
+        effectiveConfig
     );
 
     Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -146,7 +151,7 @@ public class DynamicConfigPodTemplateSelectorTest
 
     DynamicConfigPodTemplateSelector selector = new 
DynamicConfigPodTemplateSelector(
         props,
-        dynamicConfigRef
+        effectiveConfig
     );
 
     Task kafkaTask = new NoopTask("id", "id", "datasource", 0, 0, null)
@@ -191,9 +196,10 @@ public class DynamicConfigPodTemplateSelectorTest
     props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", 
noopTemplatePath.toString());
 
     Assert.assertThrows(IAE.class, () -> new DynamicConfigPodTemplateSelector(
-        props,
-        dynamicConfigRef
-    ));
+            props,
+            effectiveConfig
+        )
+    );
   }
 
   @Test
@@ -208,7 +214,7 @@ public class DynamicConfigPodTemplateSelectorTest
 
     DynamicConfigPodTemplateSelector podTemplateSelector = new 
DynamicConfigPodTemplateSelector(
         props,
-        dynamicConfigRef
+        effectiveConfig
     );
 
     Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -242,16 +248,21 @@ public class DynamicConfigPodTemplateSelectorTest
     Properties props = new Properties();
     props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
baseTemplatePath.toString());
     props.setProperty("druid.indexer.runner.k8s.podTemplate.lowThroughput", 
lowThroughputTemplatePath.toString());
-    dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(new 
SelectorBasedPodTemplateSelectStrategy(
-        Collections.singletonList(
-            new Selector("lowThroughput", null, null, Sets.newSet(dataSource)
+    Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef = () -> new 
DefaultKubernetesTaskRunnerDynamicConfig(
+        new SelectorBasedPodTemplateSelectStrategy(
+            Collections.singletonList(
+                new Selector("lowThroughput", null, null, 
Sets.newSet(dataSource)
+                )
             )
-        )
-    ));
+        ), 1
+    );
+
+    KubernetesTaskRunnerStaticConfig staticConfig = 
KubernetesTaskRunnerConfig.builder().build();
+    effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(staticConfig, 
dynamicConfigRef);
 
     DynamicConfigPodTemplateSelector podTemplateSelector = new 
DynamicConfigPodTemplateSelector(
         props,
-        dynamicConfigRef
+        effectiveConfig
     );
 
     Task taskWithMatchedDatasource = new NoopTask("id", "id", dataSource, 0, 
0, null);
@@ -276,7 +287,7 @@ public class DynamicConfigPodTemplateSelectorTest
 
     DynamicConfigPodTemplateSelector adapter = new 
DynamicConfigPodTemplateSelector(
         props,
-        dynamicConfigRef
+        effectiveConfig
     );
 
     Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -311,7 +322,7 @@ public class DynamicConfigPodTemplateSelectorTest
 
     DynamicConfigPodTemplateSelector adapter = new 
DynamicConfigPodTemplateSelector(
         props,
-        dynamicConfigRef
+        effectiveConfig
     );
 
     Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
index 16cccf04e28..552d7201fd0 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
@@ -52,6 +52,7 @@ import org.apache.druid.indexing.common.task.Task;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
 import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
@@ -138,12 +139,12 @@ class K8sTaskAdapterTest
       }
     };
 
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("test")
-        .withOverlordNamespace("test_different")
-        .withAnnotations(ImmutableMap.of("annotation_key", "annotation_value"))
-        .withLabels(ImmutableMap.of("label_key", "label_value"))
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("test")
+                                                                              
.withOverlordNamespace("test_different")
+                                                                              
.withAnnotations(ImmutableMap.of("annotation_key", "annotation_value"))
+                                                                              
.withLabels(ImmutableMap.of("label_key", "label_value"))
+                                                                              
.build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
         config,
@@ -175,9 +176,9 @@ class K8sTaskAdapterTest
   {
     // given a task create a k8s job
     TestKubernetesClient testClient = new TestKubernetesClient(client);
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("test")
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("test")
+                                                                              
.build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
         config,
@@ -229,9 +230,9 @@ class K8sTaskAdapterTest
       }
     };
 
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("test")
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("test")
+                                                                              
.build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
         config,
@@ -277,9 +278,9 @@ class K8sTaskAdapterTest
   public void toTask_useTaskPayloadManager() throws IOException
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("test")
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("test")
+                                                                              
.build();
     Task taskInTaskPayloadManager = K8sTestUtils.getTask();
     TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class);
     
Mockito.when(mockTestLogs.streamTaskPayload("ID")).thenReturn(com.google.common.base.Optional.of(
@@ -309,7 +310,7 @@ class K8sTaskAdapterTest
   public void getTaskId()
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
-    KubernetesTaskRunnerConfig config = 
KubernetesTaskRunnerConfig.builder().build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder().build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
         config,
@@ -331,7 +332,7 @@ class K8sTaskAdapterTest
   public void getTaskId_noAnnotations()
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
-    KubernetesTaskRunnerConfig config = 
KubernetesTaskRunnerConfig.builder().build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder().build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
         config,
@@ -353,7 +354,7 @@ class K8sTaskAdapterTest
   public void getTaskId_missingTaskIdAnnotation()
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
-    KubernetesTaskRunnerConfig config = 
KubernetesTaskRunnerConfig.builder().build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder().build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
         config,
@@ -458,9 +459,9 @@ class K8sTaskAdapterTest
         new File("/tmp/"),
         0
     );
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("test")
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("test")
+                                                                              
.build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
         config,
@@ -482,9 +483,9 @@ class K8sTaskAdapterTest
 
     // we have an override, but nothing in the overlord
     config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("test")
-        
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
-        .build();
+                                             .withNamespace("test")
+                                             
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
+                                             .build();
     adapter = new SingleContainerTaskAdapter(
         testClient,
         config,
@@ -532,9 +533,9 @@ class K8sTaskAdapterTest
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
     Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class);
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("namespace")
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("namespace")
+                                                                              
.build();
 
     SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
@@ -582,9 +583,9 @@ class K8sTaskAdapterTest
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
     Pod pod = K8sTestUtils.fileToResource("probesPodSpec.yaml", Pod.class);
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-                                                                  
.withNamespace("test")
-                                                                  .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("test")
+                                                                              
.build();
 
     SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
@@ -635,11 +636,11 @@ class K8sTaskAdapterTest
 
     List<String> javaOpts = new ArrayList<>();
     javaOpts.add("-Xms1G -Xmx2G -XX:MaxDirectMemorySize=3G");
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-                                                                  
.withNamespace("namespace")
-                                                                  
.withJavaOptsArray(javaOpts)
-                                                                  
.withCpuCore(2000)
-                                                                  .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("namespace")
+                                                                              
.withJavaOptsArray(javaOpts)
+                                                                              
.withCpuCore(2000)
+                                                                              
.build();
 
     SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
index 48330d62506..477758d9ac4 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
 import org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
@@ -87,9 +88,9 @@ class MultiContainerTaskAdapterTest
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
     Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", 
Pod.class);
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("namespace")
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                        
.withNamespace("namespace")
+                                                                        
.build();
     MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
         testClient,
         config,
@@ -138,10 +139,10 @@ class MultiContainerTaskAdapterTest
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
     Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpecOrder.yaml", 
Pod.class);
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("namespace")
-        .withPrimaryContainerName("primary")
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("namespace")
+                                                                              
.withPrimaryContainerName("primary")
+                                                                              
.build();
     MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
         testClient,
         config,
@@ -192,11 +193,11 @@ class MultiContainerTaskAdapterTest
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
     Pod pod = K8sTestUtils.fileToResource("podSpec.yaml", Pod.class);
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("namespace")
-        .withPrimaryContainerName("primary")
-        
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                              
.withNamespace("namespace")
+                                                                              
.withPrimaryContainerName("primary")
+                                                                              
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
+                                                                              
.build();
 
     MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
         testClient,
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
index 45f2c356cfe..01d99a896f6 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
@@ -33,6 +33,7 @@ import 
org.apache.druid.indexing.common.config.TaskConfigBuilder;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
 import org.apache.druid.k8s.overlord.common.Base64Compression;
 import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
@@ -58,7 +59,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class PodTemplateTaskAdapterTest
 {
-  private KubernetesTaskRunnerConfig taskRunnerConfig;
+  private KubernetesTaskRunnerStaticConfig taskRunnerConfig;
   private PodTemplate podTemplateSpec;
   private TaskConfig taskConfig;
   private DruidNode node;
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
index d7d74cf1812..832a7292304 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
 import org.apache.druid.k8s.overlord.common.PeonCommandContext;
 import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
@@ -86,9 +87,9 @@ class SingleContainerTaskAdapterTest
   {
     TestKubernetesClient testClient = new TestKubernetesClient(client);
     Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", 
Pod.class);
-    KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-        .withNamespace("namespace")
-        .build();
+    KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
+                                                                        
.withNamespace("namespace")
+                                                                        
.build();
     SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
         config,
diff --git 
a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java 
b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java
index 87cbad37941..12265df31e2 100644
--- a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java
+++ b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 /**
  */
@@ -219,6 +220,26 @@ public class ConfigManager
     }
   }
 
+  public <T> boolean addListener(String configKey, String listenerKey, 
Consumer<T> listener)
+  {
+    ConfigHolder holder = watchedConfigs.get(configKey);
+    if (holder == null) {
+      log.warn("ConfigHolder not found for configKey[%s]", configKey);
+      return false;
+    }
+    return holder.addListener(listenerKey, listener);
+  }
+
+  public <T> boolean removeListener(String configKey, String listenerKey, 
Consumer<T> listener)
+  {
+    ConfigHolder holder = watchedConfigs.get(configKey);
+    if (holder == null) {
+      log.warn("ConfigHolder not found for configKey[%s]", configKey);
+      return false;
+    }
+    return holder.removeListener(listenerKey, listener);
+  }
+
   @Nonnull
   private MetadataCASUpdate createMetadataCASUpdate(
       String keyValue,
@@ -285,6 +306,7 @@ public class ConfigManager
     private final AtomicReference<byte[]> rawBytes;
     private final ConfigSerde<T> serde;
     private final AtomicReference<T> reference;
+    private final ConcurrentMap<String, Consumer<T>> listeners;
 
     ConfigHolder(
         byte[] rawBytes,
@@ -294,6 +316,7 @@ public class ConfigManager
       this.rawBytes = new AtomicReference<>(rawBytes);
       this.serde = serde;
       this.reference = new AtomicReference<>(serde.deserialize(rawBytes));
+      this.listeners = new ConcurrentHashMap<>();
     }
 
     public AtomicReference<T> getReference()
@@ -306,10 +329,38 @@ public class ConfigManager
       if (!Arrays.equals(newBytes, rawBytes.get())) {
         reference.set(serde.deserialize(newBytes));
         rawBytes.set(newBytes);
+        listeners.forEach((key, listener) -> {
+          try {
+            listener.accept(reference.get());
+          }
+          catch (Exception e) {
+            log.warn(e, "Exception when calling listener for key[%s]", key);
+          }
+        });
         return true;
       }
       return false;
     }
+
+    public boolean addListener(String key, Consumer<T> listener)
+    {
+      Consumer<T> val = listeners.putIfAbsent(key, listener);
+      if (val != null) {
+        log.warn("Listener key[%s] already exists", key);
+        return false;
+      }
+      return true;
+    }
+
+    public boolean removeListener(String key, Consumer<T> listener)
+    {
+      boolean isRemoved = listeners.remove(key, listener);
+      if (!isRemoved) {
+        log.warn("Listener key[%s] not found", key);
+        return false;
+      }
+      return true;
+    }
   }
 
   private class PollingCallable implements Callable<ScheduledExecutors.Signal>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to