This is an automated email from the ASF dual-hosted git repository.
frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4a76915e662 Implement dynamic capacity for kubernetes task runner
(#18591)
4a76915e662 is described below
commit 4a76915e6620127423210e4ccecaee43d9ee3bb6
Author: Gabriel Chang <[email protected]>
AuthorDate: Mon Dec 8 10:27:40 2025 +0800
Implement dynamic capacity for kubernetes task runner (#18591)
* Implement dynamic capacity for kubernetes task runner
* Update docs
* Refactor to use getIfNull
* Update docs wording
* Fix based on comments
* Update wording for docs
* Upate static config java doc
* Undo removal of constructor
* Initial config observer implementation
* Use StringUtils.format
* Add missing import
* Update listener operations to be atomic
* Fix config manager intialisation in tests
* Add tests for effective config
* Test syncCapacityWithDynamicConfig
* Fix checkstyle
* Update KubernetesOverlordModuleTest to setup ConfigManager
* Update docs
* Use AtomicInteger for currentCapacity
---
docs/development/extensions-core/k8s-jobs.md | 21 +-
.../k8s/overlord/KubernetesOverlordModule.java | 18 +-
.../druid/k8s/overlord/KubernetesTaskRunner.java | 49 ++-
.../k8s/overlord/KubernetesTaskRunnerConfig.java | 331 ++-------------------
.../KubernetesTaskRunnerEffectiveConfig.java | 188 ++++++++++++
.../k8s/overlord/KubernetesTaskRunnerFactory.java | 13 +-
....java => KubernetesTaskRunnerStaticConfig.java} | 220 ++------------
.../DefaultKubernetesTaskRunnerDynamicConfig.java | 42 ++-
.../KubernetesTaskExecutionConfigResource.java | 20 +-
.../KubernetesTaskRunnerDynamicConfig.java | 22 ++
.../DynamicConfigPodTemplateSelector.java | 19 +-
.../taskadapter/PodTemplateTaskAdapter.java | 2 +-
.../k8s/overlord/KubernetesOverlordModuleTest.java | 25 ++
.../KubernetesTaskRunnerEffectiveConfigTest.java | 90 ++++++
.../overlord/KubernetesTaskRunnerFactoryTest.java | 13 +-
...a => KubernetesTaskRunnerStaticConfigTest.java} | 10 +-
.../k8s/overlord/KubernetesTaskRunnerTest.java | 100 ++++++-
...faultKubernetesTaskRunnerDynamicConfigTest.java | 13 +-
.../KubernetesTaskExecutionConfigResourceTest.java | 121 ++++++++
.../KubernetesTaskRunnerDynamicConfigTest.java | 13 +-
.../DruidPeonClientIntegrationTest.java | 7 +-
.../DynamicConfigPodTemplateSelectorTest.java | 47 +--
.../overlord/taskadapter/K8sTaskAdapterTest.java | 71 ++---
.../taskadapter/MultiContainerTaskAdapterTest.java | 25 +-
.../taskadapter/PodTemplateTaskAdapterTest.java | 3 +-
.../SingleContainerTaskAdapterTest.java | 7 +-
.../apache/druid/common/config/ConfigManager.java | 51 ++++
27 files changed, 905 insertions(+), 636 deletions(-)
diff --git a/docs/development/extensions-core/k8s-jobs.md
b/docs/development/extensions-core/k8s-jobs.md
index 289934646af..6dc7bdd70ea 100644
--- a/docs/development/extensions-core/k8s-jobs.md
+++ b/docs/development/extensions-core/k8s-jobs.md
@@ -48,9 +48,9 @@ Other configurations required are:
Druid operators can dynamically tune certain features within this extension.
You don't need to restart the Overlord
service for these changes to take effect.
-Druid can dynamically tune [pod template selection](#pod-template-selection),
which allows you to configure the pod
-template based on the task to be run. To enable dynamic pod template
selection, first configure the
-[custom template pod adapter](#custom-template-pod-adapter).
+Druid can dynamically tune [pod template selection](#pod-template-selection)
and [capacity](#properties). Where capacity refers to
`druid.indexer.runner.capacity`.
+
+Pod template selection allows you to configure the pod template based on the
task to be run. To enable dynamic pod template selection, first configure the
[custom template pod adapter](#custom-template-pod-adapter).
Use the following APIs to view and update the dynamic configuration for the
Kubernetes task runner.
@@ -126,7 +126,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
"type": ["index_kafka"]
}
]
- }
+ },
+ "capacity": 12
}
```
</details>
@@ -135,6 +136,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
Updates the dynamic configuration for the Kubernetes Task Runner
+Note: Both `podTemplateSelectStrategy` and `capacity` are optional fields. A
POST request may include either, both, or neither.
+
##### URL
`POST` `/druid/indexer/v1/k8s/taskrunner/executionconfig`
@@ -193,7 +196,8 @@ curl
"http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskrunner/executionconf
"type": ["index_kafka"]
}
]
- }
+ },
+ "capacity": 6
}'
```
@@ -225,7 +229,8 @@ Content-Type: application/json
"type": ["index_kafka"]
}
]
- }
+ },
+ "capacity": 6
}
```
@@ -309,7 +314,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"comment": "",
"ip": "127.0.0.1"
},
- "payload": "{\"type\":
\"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"}",
+ "payload": "{\"type\":
\"default\",\"podTemplateSelectStrategy\":{\"type\":
\"taskType\"},\"capacity\":6",
"auditTime": "2024-06-13T20:59:51.622Z"
}
]
@@ -790,7 +795,7 @@ Should you require the needed permissions for interacting
across Kubernetes name
| `druid.indexer.runner.annotations` | `JsonObject` | Additional annotations
you want to add to peon pod. | `{}` | No |
| `druid.indexer.runner.peonMonitors` | `JsonArray` | Overrides
`druid.monitoring.monitors`. Use this property if you don't want to inherit
monitors from the Overlord. | `[]` | No |
| `druid.indexer.runner.graceTerminationPeriodSeconds` | `Long` | Number of
seconds you want to wait after a sigterm for container lifecycle hooks to
complete. Keep at a smaller value if you want tasks to hold locks for shorter
periods. | `PT30S` (K8s default) | No |
-| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that
can be sent to Kubernetes. | `2147483647` | No |
+| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that
can be sent to Kubernetes. Value will be overridden if a dynamic config value
has been set. | `2147483647` | No |
| `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core
for the task. | `1000` | No |
| `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the
ingestion task makes a best effort to persist the pod logs from `k8s` to
persistent task log storage. The timeout ensures that `k8s` connection issues
do not cause the pod to hang indefinitely thereby blocking Overlord operations.
If the timeout occurs before the logs are saved, those logs will not be
available in Druid. | `PT300S` | NO |
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index 021abef0828..936e86c888c 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -95,7 +95,7 @@ public class KubernetesOverlordModule implements DruidModule
public void configure(Binder binder)
{
// druid.indexer.runner.type=k8s
- JsonConfigProvider.bind(binder,
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX,
KubernetesTaskRunnerConfig.class);
+ JsonConfigProvider.bind(binder,
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX,
KubernetesTaskRunnerStaticConfig.class);
JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX,
KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue",
TaskQueueConfig.class);
JacksonConfigProvider.bind(binder,
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
KubernetesTaskRunnerDynamicConfig.class, null);
@@ -150,10 +150,20 @@ public class KubernetesOverlordModule implements
DruidModule
JsonConfigProvider.bind(binder, JDK_HTTPCLIENT_PROPERITES_PREFIX,
DruidKubernetesJdkHttpClientConfig.class);
}
+ @Provides
+ @LazySingleton
+ public KubernetesTaskRunnerEffectiveConfig provideEffectiveConfig(
+ KubernetesTaskRunnerStaticConfig staticConfig,
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigSupplier
+ )
+ {
+ return new KubernetesTaskRunnerEffectiveConfig(staticConfig,
dynamicConfigSupplier);
+ }
+
@Provides
@LazySingleton
public DruidKubernetesClient makeKubernetesClient(
- KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
+ KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerConfig,
DruidKubernetesHttpClientFactory httpClientFactory,
Lifecycle lifecycle
)
@@ -217,7 +227,7 @@ public class KubernetesOverlordModule implements DruidModule
TaskAdapter provideTaskAdapter(
DruidKubernetesClient client,
Properties properties,
- KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
+ KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
@Self DruidNode druidNode,
@@ -260,7 +270,7 @@ public class KubernetesOverlordModule implements DruidModule
druidNode,
smileMapper,
taskLogs,
- new DynamicConfigPodTemplateSelector(properties, dynamicConfigRef)
+ new DynamicConfigPodTemplateSelector(properties,
kubernetesTaskRunnerConfig)
);
} else {
return new SingleContainerTaskAdapter(
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 51913d13a71..48d709d1f84 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
@@ -44,6 +45,7 @@ import
org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -56,6 +58,7 @@ import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandle
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
+import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -76,8 +79,11 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@@ -100,6 +106,7 @@ import java.util.stream.Collectors;
public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
{
private static final EmittingLogger log = new
EmittingLogger(KubernetesTaskRunner.class);
+ private static final String OBSERVER_KEY = "k8s-task-runner-capacity-%s";
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>>
listeners = new CopyOnWriteArrayList<>();
// to cleanup old jobs that might not have been deleted.
@@ -111,19 +118,23 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
private final KubernetesPeonClient client;
private final KubernetesTaskRunnerConfig config;
private final ListeningExecutorService exec;
+ private final ThreadPoolExecutor tpe;
private final HttpClient httpClient;
private final PeonLifecycleFactory peonLifecycleFactory;
private final ServiceEmitter emitter;
// currently worker categories aren't supported, so it's hardcoded.
protected static final String WORKER_CATEGORY = "_k8s_worker_category";
+ private final AtomicInteger currentCapacity;
+
public KubernetesTaskRunner(
TaskAdapter adapter,
KubernetesTaskRunnerConfig config,
KubernetesPeonClient client,
HttpClient httpClient,
PeonLifecycleFactory peonLifecycleFactory,
- ServiceEmitter emitter
+ ServiceEmitter emitter,
+ ConfigManager configManager
)
{
this.adapter = adapter;
@@ -132,10 +143,12 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
this.httpClient = httpClient;
this.peonLifecycleFactory = peonLifecycleFactory;
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
- this.exec = MoreExecutors.listeningDecorator(
- Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
- );
this.emitter = emitter;
+
+ this.currentCapacity = new AtomicInteger(config.getCapacity());
+ this.tpe = new ThreadPoolExecutor(currentCapacity.get(),
currentCapacity.get(), 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>(), Execs.makeThreadFactory("k8s-task-runner-%d",
null));
+ this.exec = MoreExecutors.listeningDecorator(this.tpe);
+ configManager.addListener(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()),
this::syncCapacityWithDynamicConfig);
}
@Override
@@ -179,6 +192,24 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
}
}
+ private void syncCapacityWithDynamicConfig(KubernetesTaskRunnerDynamicConfig
config)
+ {
+ int newCapacity = config.getCapacity();
+ if (newCapacity == currentCapacity.get()) {
+ return;
+ }
+ log.info("Adjusting k8s task runner capacity from [%d] to [%d]",
currentCapacity.get(), newCapacity);
+ // maximum pool size must always be greater than or equal to the core pool
size
+ if (newCapacity < currentCapacity.get()) {
+ tpe.setCorePoolSize(newCapacity);
+ tpe.setMaximumPoolSize(newCapacity);
+ } else {
+ tpe.setMaximumPoolSize(newCapacity);
+ tpe.setCorePoolSize(newCapacity);
+ }
+ currentCapacity.set(newCapacity);
+ }
+
private TaskStatus runTask(Task task)
{
return doTask(task, true);
@@ -294,7 +325,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
synchronized (tasks) {
tasks.remove(taskid);
}
-
+
}
@Override
@@ -420,7 +451,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
- return ImmutableMap.of(WORKER_CATEGORY, (long) config.getCapacity());
+ return ImmutableMap.of(WORKER_CATEGORY, (long) currentCapacity.get());
}
@Override
@@ -438,13 +469,13 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public Map<String, Long> getIdleTaskSlotCount()
{
- return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0,
config.getCapacity() - tasks.size()));
+ return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0,
currentCapacity.get() - tasks.size()));
}
@Override
public Map<String, Long> getUsedTaskSlotCount()
{
- return ImmutableMap.of(WORKER_CATEGORY, (long)
Math.min(config.getCapacity(), tasks.size()));
+ return ImmutableMap.of(WORKER_CATEGORY, (long)
Math.min(currentCapacity.get(), tasks.size()));
}
@Override
@@ -535,7 +566,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
@Override
public int getTotalCapacity()
{
- return config.getCapacity();
+ return currentCapacity.get();
}
@Override
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
index 663126ba2e6..e7b9eb2b804 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
@@ -19,339 +19,60 @@
package org.apache.druid.k8s.overlord;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.ObjectUtils;
import org.joda.time.Period;
-import javax.annotation.Nonnull;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
+
import java.util.List;
import java.util.Map;
-public class KubernetesTaskRunnerConfig
+public interface KubernetesTaskRunnerConfig
{
- @JsonProperty
- @NotNull
- private String namespace;
-
- @JsonProperty
- private String k8sTaskPodNamePrefix = "";
-
- // This property is the namespace that the Overlord is running in.
- // For cases where we want task pods to run on different namespace from the
overlord, we need to specify the namespace of the overlord here.
- // Else, we can simply leave this field alone.
- @JsonProperty
- private String overlordNamespace = "";
-
- @JsonProperty
- private boolean debugJobs = false;
-
- /**
- * Deprecated, please specify adapter type runtime property instead
- * <p>
- * I.E `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer`
- */
- @Deprecated
- @JsonProperty
- private boolean sidecarSupport = false;
-
- @JsonProperty
- // if this is not set, then the first container in your pod spec is assumed
to be the overlord container.
- // usually this is fine, but when you are dynamically adding sidecars like
istio, the service mesh could
- // in fact place the istio-proxy container as the first container. Thus,
you would specify this value to
- // the name of your primary container. e.g. druid-overlord
- private String primaryContainerName = null;
-
- @JsonProperty
- // for multi-container jobs, we need this image to shut down sidecars after
the main container
- // has completed
- private String kubexitImage = "karlkfi/kubexit:v0.3.2";
-
- // how much time to wait for preStop hooks to execute
- // lower number speeds up pod termination time to release locks
- // faster, defaults to your k8s setup, usually 30 seconds.
- private Long graceTerminationPeriodSeconds = null;
-
- @JsonProperty
- // disable using http / https proxy environment variables
- private boolean disableClientProxy;
-
- @JsonProperty
- @NotNull
- private Period maxTaskDuration = new Period("PT4H");
-
- @JsonProperty
- @NotNull
- // how long to wait for the jobs to be cleaned up.
- private Period taskCleanupDelay = new Period("P2D");
-
- @JsonProperty
- @NotNull
- // interval for k8s job cleanup to run
- private Period taskCleanupInterval = new Period("PT10m");
-
- @JsonProperty
- @NotNull
- // how long to wait to join peon k8s jobs on startup
- private Period taskJoinTimeout = new Period("PT1M");
-
-
- @JsonProperty
- @NotNull
- // how long to wait for the peon k8s job to launch
- private Period k8sjobLaunchTimeout = new Period("PT1H");
-
- @JsonProperty
- @NotNull
- // how long to wait for log saving operations to complete
- private Period logSaveTimeout = new Period("PT300S");
-
- @JsonProperty
- // ForkingTaskRunner inherits the monitors from the MM, in k8s mode
- // the peon inherits the monitors from the overlord, so if someone specifies
- // a TaskCountStatsMonitor in the overlord for example, the peon process
- // fails because it can not inject this monitor in the peon process.
- private List<String> peonMonitors = ImmutableList.of();
-
- @JsonProperty
- @NotNull
- private List<String> javaOptsArray = ImmutableList.of();
-
- @JsonProperty
- @NotNull
- private int cpuCoreInMicro = 0;
-
- @JsonProperty
- @NotNull
- private Map<String, String> labels = ImmutableMap.of();
-
- @JsonProperty
- @NotNull
- private Map<String, String> annotations = ImmutableMap.of();
-
- @JsonProperty
- @Min(1)
- @Max(Integer.MAX_VALUE)
- @NotNull
- private Integer capacity = Integer.MAX_VALUE;
-
- public KubernetesTaskRunnerConfig()
- {
- }
+ String getNamespace();
- private KubernetesTaskRunnerConfig(
- @Nonnull String namespace,
- String overlordNamespace,
- String k8sTaskPodNamePrefix,
- boolean debugJobs,
- boolean sidecarSupport,
- String primaryContainerName,
- String kubexitImage,
- Long graceTerminationPeriodSeconds,
- boolean disableClientProxy,
- Period maxTaskDuration,
- Period taskCleanupDelay,
- Period taskCleanupInterval,
- Period k8sjobLaunchTimeout,
- Period logSaveTimeout,
- List<String> peonMonitors,
- List<String> javaOptsArray,
- int cpuCoreInMicro,
- Map<String, String> labels,
- Map<String, String> annotations,
- Integer capacity,
- Period taskJoinTimeout
- )
- {
- this.namespace = namespace;
- this.overlordNamespace = ObjectUtils.getIfNull(
- overlordNamespace,
- this.overlordNamespace
- );
- this.k8sTaskPodNamePrefix = k8sTaskPodNamePrefix;
- this.debugJobs = ObjectUtils.getIfNull(
- debugJobs,
- this.debugJobs
- );
- this.sidecarSupport = ObjectUtils.getIfNull(
- sidecarSupport,
- this.sidecarSupport
- );
- this.primaryContainerName = ObjectUtils.getIfNull(
- primaryContainerName,
- this.primaryContainerName
- );
- this.kubexitImage = ObjectUtils.getIfNull(
- kubexitImage,
- this.kubexitImage
- );
- this.graceTerminationPeriodSeconds = ObjectUtils.getIfNull(
- graceTerminationPeriodSeconds,
- this.graceTerminationPeriodSeconds
- );
- this.disableClientProxy = disableClientProxy;
- this.maxTaskDuration = ObjectUtils.getIfNull(
- maxTaskDuration,
- this.maxTaskDuration
- );
- this.taskCleanupDelay = ObjectUtils.getIfNull(
- taskCleanupDelay,
- this.taskCleanupDelay
- );
- this.taskCleanupInterval = ObjectUtils.getIfNull(
- taskCleanupInterval,
- this.taskCleanupInterval
- );
- this.k8sjobLaunchTimeout = ObjectUtils.getIfNull(
- k8sjobLaunchTimeout,
- this.k8sjobLaunchTimeout
- );
- this.taskJoinTimeout = ObjectUtils.getIfNull(
- taskJoinTimeout,
- this.taskJoinTimeout
- );
- this.logSaveTimeout = ObjectUtils.getIfNull(
- logSaveTimeout,
- this.logSaveTimeout
- );
- this.peonMonitors = ObjectUtils.getIfNull(
- peonMonitors,
- this.peonMonitors
- );
- this.javaOptsArray = ObjectUtils.getIfNull(
- javaOptsArray,
- this.javaOptsArray
- );
- this.cpuCoreInMicro = ObjectUtils.getIfNull(
- cpuCoreInMicro,
- this.cpuCoreInMicro
- );
- this.labels = ObjectUtils.getIfNull(
- labels,
- this.labels
- );
- this.annotations = ObjectUtils.getIfNull(
- annotations,
- this.annotations
- );
- this.capacity = ObjectUtils.getIfNull(
- capacity,
- this.capacity
- );
- }
+ String getOverlordNamespace();
- public String getNamespace()
- {
- return namespace;
- }
-
- public String getOverlordNamespace()
- {
- return overlordNamespace;
- }
-
- public String getK8sTaskPodNamePrefix()
- {
- return k8sTaskPodNamePrefix;
- }
+ String getK8sTaskPodNamePrefix();
- public boolean isDebugJobs()
- {
- return debugJobs;
- }
+ boolean isDebugJobs();
@Deprecated
- public boolean isSidecarSupport()
- {
- return sidecarSupport;
- }
+ boolean isSidecarSupport();
- public String getPrimaryContainerName()
- {
- return primaryContainerName;
- }
+ String getPrimaryContainerName();
- public String getKubexitImage()
- {
- return kubexitImage;
- }
+ String getKubexitImage();
- public Long getGraceTerminationPeriodSeconds()
- {
- return graceTerminationPeriodSeconds;
- }
+ Long getGraceTerminationPeriodSeconds();
- public boolean isDisableClientProxy()
- {
- return disableClientProxy;
- }
+ boolean isDisableClientProxy();
- public Period getTaskTimeout()
- {
- return maxTaskDuration;
- }
+ Period getTaskTimeout();
- public Period getTaskJoinTimeout()
- {
- return taskJoinTimeout;
- }
+ Period getTaskJoinTimeout();
+ Period getTaskCleanupDelay();
- public Period getTaskCleanupDelay()
- {
- return taskCleanupDelay;
- }
+ Period getTaskCleanupInterval();
- public Period getTaskCleanupInterval()
- {
- return taskCleanupInterval;
- }
+ Period getTaskLaunchTimeout();
- public Period getTaskLaunchTimeout()
- {
- return k8sjobLaunchTimeout;
- }
+ Period getLogSaveTimeout();
- public Period getLogSaveTimeout()
- {
- return logSaveTimeout;
- }
+ List<String> getPeonMonitors();
- public List<String> getPeonMonitors()
- {
- return peonMonitors;
- }
-
- public List<String> getJavaOptsArray()
- {
- return javaOptsArray;
- }
+ List<String> getJavaOptsArray();
- public int getCpuCoreInMicro()
- {
- return cpuCoreInMicro;
- }
+ int getCpuCoreInMicro();
- public Map<String, String> getLabels()
- {
- return labels;
- }
+ Map<String, String> getLabels();
- public Map<String, String> getAnnotations()
- {
- return annotations;
- }
+ Map<String, String> getAnnotations();
- public Integer getCapacity()
- {
- return capacity;
- }
+ Integer getCapacity();
- public static Builder builder()
+ static Builder builder()
{
return new Builder();
}
@@ -511,9 +232,9 @@ public class KubernetesTaskRunnerConfig
return this;
}
- public KubernetesTaskRunnerConfig build()
+ public KubernetesTaskRunnerStaticConfig build()
{
- return new KubernetesTaskRunnerConfig(
+ return new KubernetesTaskRunnerStaticConfig(
this.namespace,
this.overlordNamespace,
this.k8sTaskPodNamePrefix,
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
new file mode 100644
index 00000000000..c1593c3577d
--- /dev/null
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.base.Supplier;
+import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;
+import org.joda.time.Period;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides a flexible mechanism to configure Kubernetes task pods,
+ * by merging the static base settings from {@link KubernetesTaskRunnerConfig}
+ * with dynamic overrides from {@link KubernetesTaskRunnerDynamicConfig}.
+ * <p>
+ * Kubernetes will always use this effective config to run new tasks.
+ */
+public class KubernetesTaskRunnerEffectiveConfig implements
KubernetesTaskRunnerConfig
+{
+ private final KubernetesTaskRunnerStaticConfig staticConfig;
+ private final Supplier<KubernetesTaskRunnerDynamicConfig>
dynamicConfigSupplier;
+
+ public KubernetesTaskRunnerEffectiveConfig(
+ KubernetesTaskRunnerStaticConfig staticConfig,
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigSupplier
+ )
+ {
+ this.staticConfig = staticConfig;
+ this.dynamicConfigSupplier = dynamicConfigSupplier;
+ }
+
+ @Override
+ public String getNamespace()
+ {
+ return staticConfig.getNamespace();
+ }
+
+ @Override
+ public String getOverlordNamespace()
+ {
+ return staticConfig.getOverlordNamespace();
+ }
+
+ @Override
+ public String getK8sTaskPodNamePrefix()
+ {
+ return staticConfig.getK8sTaskPodNamePrefix();
+ }
+
+ @Override
+ public boolean isDebugJobs()
+ {
+ return staticConfig.isDebugJobs();
+ }
+
+ @Override
+ public boolean isSidecarSupport()
+ {
+ return staticConfig.isSidecarSupport();
+ }
+
+ @Override
+ public String getPrimaryContainerName()
+ {
+ return staticConfig.getPrimaryContainerName();
+ }
+
+ @Override
+ public String getKubexitImage()
+ {
+ return staticConfig.getKubexitImage();
+ }
+
+ @Override
+ public Long getGraceTerminationPeriodSeconds()
+ {
+ return staticConfig.getGraceTerminationPeriodSeconds();
+ }
+
+ @Override
+ public boolean isDisableClientProxy()
+ {
+ return staticConfig.isDisableClientProxy();
+ }
+
+ @Override
+ public Period getTaskTimeout()
+ {
+ return staticConfig.getTaskTimeout();
+ }
+
+ @Override
+ public Period getTaskJoinTimeout()
+ {
+ return staticConfig.getTaskJoinTimeout();
+ }
+
+ @Override
+ public Period getTaskCleanupDelay()
+ {
+ return staticConfig.getTaskCleanupDelay();
+ }
+
+ @Override
+ public Period getTaskCleanupInterval()
+ {
+ return staticConfig.getTaskCleanupInterval();
+ }
+
+ @Override
+ public Period getTaskLaunchTimeout()
+ {
+ return staticConfig.getTaskLaunchTimeout();
+ }
+
+ @Override
+ public Period getLogSaveTimeout()
+ {
+ return staticConfig.getLogSaveTimeout();
+ }
+
+ @Override
+ public List<String> getPeonMonitors()
+ {
+ return staticConfig.getPeonMonitors();
+ }
+
+ @Override
+ public List<String> getJavaOptsArray()
+ {
+ return staticConfig.getJavaOptsArray();
+ }
+
+ @Override
+ public int getCpuCoreInMicro()
+ {
+ return staticConfig.getCpuCoreInMicro();
+ }
+
+ @Override
+ public Map<String, String> getLabels()
+ {
+ return staticConfig.getLabels();
+ }
+
+ @Override
+ public Map<String, String> getAnnotations()
+ {
+ return staticConfig.getAnnotations();
+ }
+
+ @Override
+ public Integer getCapacity()
+ {
+ if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null
|| dynamicConfigSupplier.get().getCapacity() == null) {
+ return staticConfig.getCapacity();
+ }
+ return dynamicConfigSupplier.get().getCapacity();
+ }
+
+ public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
+ {
+ if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null
|| dynamicConfigSupplier.get().getPodTemplateSelectStrategy() == null) {
+ return KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY;
+ }
+ return dynamicConfigSupplier.get().getPodTemplateSelectStrategy();
+ }
+}
+
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index dd8111ed49e..516d229c891 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -21,6 +21,7 @@ package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
+import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
@@ -39,23 +40,25 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
public static final String TYPE_NAME = "k8s";
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
- private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
+ private final KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig;
private final TaskLogs taskLogs;
private final DruidKubernetesClient druidKubernetesClient;
private final ServiceEmitter emitter;
private KubernetesTaskRunner runner;
private final TaskAdapter taskAdapter;
+ private final ConfigManager configManager;
private final Set<String> adapterTypeAllowingTasksInDifferentNamespaces =
Set.of(PodTemplateTaskAdapter.TYPE);
@Inject
public KubernetesTaskRunnerFactory(
@Smile ObjectMapper smileMapper,
@EscalatedGlobal final HttpClient httpClient,
- KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
+ KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig,
TaskLogs taskLogs,
DruidKubernetesClient druidKubernetesClient,
ServiceEmitter emitter,
- TaskAdapter taskAdapter
+ TaskAdapter taskAdapter,
+ ConfigManager configManager
)
{
this.smileMapper = smileMapper;
@@ -65,6 +68,7 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
this.druidKubernetesClient = druidKubernetesClient;
this.emitter = emitter;
this.taskAdapter = taskAdapter;
+ this.configManager = configManager;
}
@Override
@@ -99,7 +103,8 @@ public class KubernetesTaskRunnerFactory implements
TaskRunnerFactory<Kubernetes
smileMapper,
kubernetesTaskRunnerConfig.getLogSaveTimeout().toStandardDuration().getMillis()
),
- emitter
+ emitter,
+ configManager
);
return runner;
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
similarity index 65%
copy from
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
copy to
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
index 663126ba2e6..b68e70075db 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
@@ -32,7 +32,12 @@ import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
-public class KubernetesTaskRunnerConfig
+/**
+ * This configuration is populated from runtime properties with the prefix
+ * {@code druid.indexer.runner}. It is the base configuration that
+ * {@link KubernetesTaskRunnerEffectiveConfig} will use if no dynamic config
is provided.
+ */
+public class KubernetesTaskRunnerStaticConfig implements
KubernetesTaskRunnerConfig
{
@JsonProperty
@NotNull
@@ -139,11 +144,11 @@ public class KubernetesTaskRunnerConfig
@NotNull
private Integer capacity = Integer.MAX_VALUE;
- public KubernetesTaskRunnerConfig()
+ public KubernetesTaskRunnerStaticConfig()
{
}
- private KubernetesTaskRunnerConfig(
+ public KubernetesTaskRunnerStaticConfig(
@Nonnull String namespace,
String overlordNamespace,
String k8sTaskPodNamePrefix,
@@ -244,298 +249,131 @@ public class KubernetesTaskRunnerConfig
);
}
+ @Override
public String getNamespace()
{
return namespace;
}
+ @Override
public String getOverlordNamespace()
{
return overlordNamespace;
}
+ @Override
public String getK8sTaskPodNamePrefix()
{
return k8sTaskPodNamePrefix;
}
+ @Override
public boolean isDebugJobs()
{
return debugJobs;
}
+ @Override
@Deprecated
public boolean isSidecarSupport()
{
return sidecarSupport;
}
+ @Override
public String getPrimaryContainerName()
{
return primaryContainerName;
}
+ @Override
public String getKubexitImage()
{
return kubexitImage;
}
+ @Override
public Long getGraceTerminationPeriodSeconds()
{
return graceTerminationPeriodSeconds;
}
+ @Override
public boolean isDisableClientProxy()
{
return disableClientProxy;
}
+ @Override
public Period getTaskTimeout()
{
return maxTaskDuration;
}
+ @Override
public Period getTaskJoinTimeout()
{
return taskJoinTimeout;
}
+ @Override
public Period getTaskCleanupDelay()
{
return taskCleanupDelay;
}
+ @Override
public Period getTaskCleanupInterval()
{
return taskCleanupInterval;
}
+ @Override
public Period getTaskLaunchTimeout()
{
return k8sjobLaunchTimeout;
}
+ @Override
public Period getLogSaveTimeout()
{
return logSaveTimeout;
}
+ @Override
public List<String> getPeonMonitors()
{
return peonMonitors;
}
+ @Override
public List<String> getJavaOptsArray()
{
return javaOptsArray;
}
+ @Override
public int getCpuCoreInMicro()
{
return cpuCoreInMicro;
}
+ @Override
public Map<String, String> getLabels()
{
return labels;
}
+ @Override
public Map<String, String> getAnnotations()
{
return annotations;
}
+ @Override
public Integer getCapacity()
{
return capacity;
}
-
- public static Builder builder()
- {
- return new Builder();
- }
-
- public static class Builder
- {
- private String namespace;
- private String overlordNamespace;
- private String k8sTaskPodNamePrefix;
- private boolean debugJob;
- private boolean sidecarSupport;
- private String primaryContainerName;
- private String kubexitImage;
- private Long graceTerminationPeriodSeconds;
- private boolean disableClientProxy;
- private Period maxTaskDuration;
- private Period taskCleanupDelay;
- private Period taskCleanupInterval;
- private Period k8sjobLaunchTimeout;
- private List<String> peonMonitors;
- private List<String> javaOptsArray;
- private int cpuCoreInMicro;
- private Map<String, String> labels;
- private Map<String, String> annotations;
- private Integer capacity;
- private Period taskJoinTimeout;
- private Period logSaveTimeout;
-
- public Builder()
- {
- }
-
- public Builder withNamespace(String namespace)
- {
- this.namespace = namespace;
- return this;
- }
-
- public Builder withOverlordNamespace(String overlordNamespace)
- {
- this.overlordNamespace = overlordNamespace;
- return this;
- }
-
- public Builder withK8sTaskPodNamePrefix(String k8sTaskPodNamePrefix)
- {
- this.k8sTaskPodNamePrefix = k8sTaskPodNamePrefix;
- return this;
- }
-
- public Builder withDebugJob(boolean debugJob)
- {
- this.debugJob = debugJob;
- return this;
- }
-
- public Builder withSidecarSupport(boolean sidecarSupport)
- {
- this.sidecarSupport = sidecarSupport;
- return this;
- }
-
- public Builder withPrimaryContainerName(String primaryContainerName)
- {
- this.primaryContainerName = primaryContainerName;
- return this;
- }
-
- public Builder withKubexitImage(String kubexitImage)
- {
- this.kubexitImage = kubexitImage;
- return this;
- }
-
- public Builder withGraceTerminationPeriodSeconds(Long
graceTerminationPeriodSeconds)
- {
- this.graceTerminationPeriodSeconds = graceTerminationPeriodSeconds;
- return this;
- }
-
- public Builder withDisableClientProxy(boolean disableClientProxy)
- {
- this.disableClientProxy = disableClientProxy;
- return this;
- }
-
- public Builder withTaskTimeout(Period taskTimeout)
- {
- this.maxTaskDuration = taskTimeout;
- return this;
- }
-
- public Builder withTaskCleanupDelay(Period taskCleanupDelay)
- {
- this.taskCleanupDelay = taskCleanupDelay;
- return this;
- }
-
- public Builder withTaskCleanupInterval(Period taskCleanupInterval)
- {
- this.taskCleanupInterval = taskCleanupInterval;
- return this;
- }
-
- public Builder withK8sJobLaunchTimeout(Period k8sjobLaunchTimeout)
- {
- this.k8sjobLaunchTimeout = k8sjobLaunchTimeout;
- return this;
- }
-
- public Builder withPeonMonitors(List<String> peonMonitors)
- {
- this.peonMonitors = peonMonitors;
- return this;
- }
-
- public Builder withCpuCore(int cpuCore)
- {
- this.cpuCoreInMicro = cpuCore;
- return this;
- }
-
- public Builder withJavaOptsArray(List<String> javaOptsArray)
- {
- this.javaOptsArray = javaOptsArray;
- return this;
- }
-
- public Builder withLabels(Map<String, String> labels)
- {
- this.labels = labels;
- return this;
- }
-
- public Builder withAnnotations(Map<String, String> annotations)
- {
- this.annotations = annotations;
- return this;
- }
-
-
- public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer
capacity)
- {
- this.capacity = capacity;
- return this;
- }
-
- public Builder withTaskJoinTimeout(Period taskJoinTimeout)
- {
- this.taskJoinTimeout = taskJoinTimeout;
- return this;
- }
-
- public Builder withLogSaveTimeout(Period logSaveTimeout)
- {
- this.logSaveTimeout = logSaveTimeout;
- return this;
- }
-
- public KubernetesTaskRunnerConfig build()
- {
- return new KubernetesTaskRunnerConfig(
- this.namespace,
- this.overlordNamespace,
- this.k8sTaskPodNamePrefix,
- this.debugJob,
- this.sidecarSupport,
- this.primaryContainerName,
- this.kubexitImage,
- this.graceTerminationPeriodSeconds,
- this.disableClientProxy,
- this.maxTaskDuration,
- this.taskCleanupDelay,
- this.taskCleanupInterval,
- this.k8sjobLaunchTimeout,
- this.logSaveTimeout,
- this.peonMonitors,
- this.javaOptsArray,
- this.cpuCoreInMicro,
- this.labels,
- this.annotations,
- this.capacity,
- this.taskJoinTimeout
- );
- }
- }
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
index eddd5e4a1ee..fde4dcc7b0b 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
@@ -21,21 +21,26 @@ package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
import java.util.Objects;
public class DefaultKubernetesTaskRunnerDynamicConfig implements
KubernetesTaskRunnerDynamicConfig
{
+ @Nullable
private final PodTemplateSelectStrategy podTemplateSelectStrategy;
+ @Nullable
+ private final Integer capacity;
+
@JsonCreator
public DefaultKubernetesTaskRunnerDynamicConfig(
- @JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy
podTemplateSelectStrategy
+ @JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy
podTemplateSelectStrategy,
+ @JsonProperty("capacity") Integer capacity
)
{
- Preconditions.checkNotNull(podTemplateSelectStrategy);
this.podTemplateSelectStrategy = podTemplateSelectStrategy;
+ this.capacity = capacity;
}
@Override
@@ -45,6 +50,31 @@ public class DefaultKubernetesTaskRunnerDynamicConfig
implements KubernetesTaskR
return podTemplateSelectStrategy;
}
+ @Override
+ @JsonProperty
+ public Integer getCapacity()
+ {
+ return capacity;
+ }
+
+ @Override
+ public KubernetesTaskRunnerDynamicConfig
merge(KubernetesTaskRunnerDynamicConfig other)
+ {
+ if (other == null) {
+ return this;
+ }
+ Integer mergeCapacity = getCapacity();
+ if (other.getCapacity() != null) {
+ mergeCapacity = other.getCapacity();
+ }
+
+ PodTemplateSelectStrategy mergePodTemplateSelectStrategy =
getPodTemplateSelectStrategy();
+ if (other.getPodTemplateSelectStrategy() != null) {
+ mergePodTemplateSelectStrategy = other.getPodTemplateSelectStrategy();
+ }
+ return new
DefaultKubernetesTaskRunnerDynamicConfig(mergePodTemplateSelectStrategy,
mergeCapacity);
+ }
+
@Override
public boolean equals(Object o)
{
@@ -55,13 +85,14 @@ public class DefaultKubernetesTaskRunnerDynamicConfig
implements KubernetesTaskR
return false;
}
DefaultKubernetesTaskRunnerDynamicConfig that =
(DefaultKubernetesTaskRunnerDynamicConfig) o;
- return Objects.equals(podTemplateSelectStrategy,
that.podTemplateSelectStrategy);
+ return Objects.equals(capacity, that.capacity) &&
+ Objects.equals(podTemplateSelectStrategy,
that.podTemplateSelectStrategy);
}
@Override
public int hashCode()
{
- return Objects.hashCode(podTemplateSelectStrategy);
+ return Objects.hash(podTemplateSelectStrategy, capacity);
}
@Override
@@ -69,6 +100,7 @@ public class DefaultKubernetesTaskRunnerDynamicConfig
implements KubernetesTaskR
{
return "DefaultKubernetesTaskRunnerDynamicConfig{" +
"podTemplateSelectStrategy=" + podTemplateSelectStrategy +
+ "capacity=" + capacity +
'}';
}
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
index ec03b045f50..432a41933ed 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
@@ -73,7 +73,7 @@ public class KubernetesTaskExecutionConfigResource
* Updates the Kubernetes execution configuration.
*
* @param dynamicConfig the new execution configuration to set
- * @param req the HTTP servlet request providing context for
audit information
+ * @param req the HTTP servlet request providing context for audit
information
* @return a response indicating the success or failure of the update
operation
*/
@POST
@@ -84,13 +84,19 @@ public class KubernetesTaskExecutionConfigResource
@Context final HttpServletRequest req
)
{
+ KubernetesTaskRunnerDynamicConfig currentConfig = getDynamicConfig();
+ KubernetesTaskRunnerDynamicConfig mergedConfig = dynamicConfig;
+
+ if (currentConfig != null) {
+ mergedConfig = currentConfig.merge(dynamicConfig);
+ }
final ConfigManager.SetResult setResult = configManager.set(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
- dynamicConfig,
+ mergedConfig,
AuthorizationUtils.buildAuditInfo(req)
);
if (setResult.isOk()) {
- log.info("Updating K8s execution configs: %s", dynamicConfig);
+ log.info("Updating K8s execution configs: %s", mergedConfig);
return Response.ok().build();
} else {
@@ -147,11 +153,15 @@ public class KubernetesTaskExecutionConfigResource
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(ConfigResourceFilter.class)
public Response getExecutionConfig()
+ {
+ return Response.ok(getDynamicConfig()).build();
+ }
+
+ private KubernetesTaskRunnerDynamicConfig getDynamicConfig()
{
if (dynamicConfigRef == null) {
dynamicConfigRef =
configManager.watch(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
KubernetesTaskRunnerDynamicConfig.class);
}
-
- return Response.ok(dynamicConfigRef.get()).build();
+ return dynamicConfigRef.get();
}
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
index 4f6d4b07c41..fd9c9b46531 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
@@ -22,6 +22,9 @@ package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+
/**
* Represents the configuration for task execution within a Kubernetes
environment.
* This interface allows for dynamic configuration of task execution
strategies based
@@ -38,7 +41,26 @@ public interface KubernetesTaskRunnerDynamicConfig
/**
* Retrieves the execution behavior strategy associated with this
configuration.
+ *
* @return the execution behavior strategy
*/
PodTemplateSelectStrategy getPodTemplateSelectStrategy();
+
+ /**
+ * Retrieves the capacity associated with this configuration.
+ *
+ * @return the capacity
+ */
+ @Min(0)
+ @Max(Integer.MAX_VALUE)
+ Integer getCapacity();
+
+ /**
+ * Merges this configuration with another, preferring values from {@code
other}
+ * and falling back to this configuration when not present.
+ *
+ * @param other the configuration to merge with
+ * @return the merged configuration
+ */
+ KubernetesTaskRunnerDynamicConfig merge(KubernetesTaskRunnerDynamicConfig
other);
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
index 92832b2ff66..ff4a4c2cd72 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java
@@ -26,8 +26,7 @@ import io.fabric8.kubernetes.client.utils.Serialization;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
-import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
-import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerEffectiveConfig;
import java.io.File;
import java.nio.file.Files;
@@ -43,17 +42,17 @@ public class DynamicConfigPodTemplateSelector implements
PodTemplateSelector
+ ".k8s.podTemplate.";
private final Properties properties;
- private final Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
+ private final KubernetesTaskRunnerEffectiveConfig effectiveConfig;
// Supplier allows Overlord to read the most recent pod template file
without calling initializeTemplatesFromFileSystem() again.
private HashMap<String, Supplier<PodTemplate>> podTemplates;
public DynamicConfigPodTemplateSelector(
Properties properties,
- Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef
+ KubernetesTaskRunnerEffectiveConfig effectiveConfig
)
{
this.properties = properties;
- this.dynamicConfigRef = dynamicConfigRef;
+ this.effectiveConfig = effectiveConfig;
initializeTemplatesFromFileSystem();
}
@@ -120,14 +119,6 @@ public class DynamicConfigPodTemplateSelector implements
PodTemplateSelector
@Override
public Optional<PodTemplateWithName> getPodTemplateForTask(Task task)
{
- PodTemplateSelectStrategy podTemplateSelectStrategy;
- KubernetesTaskRunnerDynamicConfig dynamicConfig = dynamicConfigRef.get();
- if (dynamicConfig == null || dynamicConfig.getPodTemplateSelectStrategy()
== null) {
- podTemplateSelectStrategy =
KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY;
- } else {
- podTemplateSelectStrategy = dynamicConfig.getPodTemplateSelectStrategy();
- }
-
- return Optional.of(podTemplateSelectStrategy.getPodTemplateForTask(task,
podTemplates));
+ return
Optional.of(effectiveConfig.getPodTemplateSelectStrategy().getPodTemplateForTask(task,
podTemplates));
}
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
index 2744b82b126..8c68d9324e8 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
@@ -269,7 +269,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
}
return podTemplateAnnotationBuilder.build();
}
-
+
private Map<String, String> getJobLabels(KubernetesTaskRunnerConfig config,
Task task)
{
Preconditions.checkNotNull(config.getNamespace(), "When using Custom Pod
Templates, druid.indexer.runner.namespace cannot be null.");
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
index 59c9508005f..dd8960864e0 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -27,6 +27,7 @@ import com.google.inject.Injector;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.ConfigManagerConfig;
import org.apache.druid.guice.ConfigModule;
import org.apache.druid.guice.DruidGuiceExtensions;
@@ -44,6 +45,7 @@ import
org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClient
import
org.apache.druid.k8s.overlord.common.httpclient.jdk.DruidKubernetesJdkHttpClientFactory;
import
org.apache.druid.k8s.overlord.common.httpclient.okhttp.DruidKubernetesOkHttpHttpClientFactory;
import
org.apache.druid.k8s.overlord.common.httpclient.vertx.DruidKubernetesVertxHttpClientFactory;
+import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
@@ -51,14 +53,18 @@ import
org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.server.DruidNode;
+import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.net.URL;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
@RunWith(EasyMockRunner.class)
public class KubernetesOverlordModuleTest
@@ -81,8 +87,26 @@ public class KubernetesOverlordModuleTest
private AuditManager auditManager;
@Mock
private MetadataStorageConnector metadataStorageConnector;
+ @Mock
+ private ConfigManager configManager;
private Injector injector;
+ @Before
+ public void setUpConfigManagerMock()
+ {
+ EasyMock.reset(configManager);
+ EasyMock.expect(configManager.watchConfig(
+ EasyMock.anyString(),
+ EasyMock.anyObject()
+ )).andReturn(new AtomicReference<>(null)).anyTimes();
+ EasyMock.expect(configManager.addListener(
+ EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY),
+ EasyMock.anyString(),
+ EasyMock.anyObject(Consumer.class)
+ )).andReturn(true).anyTimes();
+ EasyMock.replay(configManager);
+ }
+
@Test
public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully()
{
@@ -325,6 +349,7 @@ public class KubernetesOverlordModuleTest
}).toInstance(Suppliers.ofInstance(metadataStorageTablesConfig));
binder.bind(AuditManager.class).toInstance(auditManager);
binder.bind(MetadataStorageConnector.class).toInstance(metadataStorageConnector);
+ binder.bind(ConfigManager.class).toInstance(configManager);
},
new ConfigModule(),
new IndexingServiceTaskLogsModule(props),
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfigTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfigTest.java
new file mode 100644
index 00000000000..c44f8a34b4e
--- /dev/null
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfigTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.base.Supplier;
+import
org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
+import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;
+import
org.apache.druid.k8s.overlord.execution.TaskTypePodTemplateSelectStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KubernetesTaskRunnerEffectiveConfigTest
+{
+ @Test
+ public void test_getCapacity_usesStaticWhenDynamicNull()
+ {
+ KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder()
+ .withCapacity(7)
+ .build();
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> null;
+ KubernetesTaskRunnerEffectiveConfig effective = new
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+ Assert.assertEquals(7, effective.getCapacity().intValue());
+ }
+
+ @Test
+ public void test_getCapacity_usesDynamicWhenProvided()
+ {
+ KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder()
+ .withCapacity(2)
+ .build();
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> new
DefaultKubernetesTaskRunnerDynamicConfig(null, 9);
+ KubernetesTaskRunnerEffectiveConfig effective = new
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+ Assert.assertEquals(9, effective.getCapacity().intValue());
+ }
+
+ @Test
+ public void test_getCapacity_usesStaticWhenDynamicNullCapacity()
+ {
+ KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder()
+ .withCapacity(7)
+ .build();
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> new
DefaultKubernetesTaskRunnerDynamicConfig(null, null);
+ KubernetesTaskRunnerEffectiveConfig effective = new
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+ Assert.assertEquals(7, effective.getCapacity().intValue());
+ }
+
+ @Test
+ public void test_getPodTemplateSelectStrategy_usesDefaultWhenDynamicNull()
+ {
+ KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder().build();
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> null;
+ KubernetesTaskRunnerEffectiveConfig effective = new
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+ PodTemplateSelectStrategy strategy =
effective.getPodTemplateSelectStrategy();
+ Assert.assertTrue(strategy instanceof TaskTypePodTemplateSelectStrategy);
+ Assert.assertEquals(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY,
strategy);
+ }
+
+ @Test
+ public void test_getPodTemplateSelectStrategy_usesDynamicWhenProvided()
+ {
+ KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder().build();
+ PodTemplateSelectStrategy custom = new TaskTypePodTemplateSelectStrategy();
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicSupplier = () -> new
DefaultKubernetesTaskRunnerDynamicConfig(custom, null);
+ KubernetesTaskRunnerEffectiveConfig effective = new
KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier);
+
+ Assert.assertEquals(custom, effective.getPodTemplateSelectStrategy());
+ }
+}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
index 6849f9a0ecc..a67ab70a0a8 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.ConfigBuilder;
+import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -32,6 +33,7 @@ import
org.apache.druid.k8s.overlord.common.httpclient.vertx.DruidKubernetesVert
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.EasyMock;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
@@ -42,24 +44,28 @@ import java.io.IOException;
public class KubernetesTaskRunnerFactoryTest
{
private ObjectMapper objectMapper;
- private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
+ private KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig;
private TaskLogs taskLogs;
private DruidKubernetesClient druidKubernetesClient;
@Mock private ServiceEmitter emitter;
private TaskAdapter taskAdapter;
+ @Mock private ConfigManager configManager;
@Before
public void setup()
{
objectMapper = new TestUtils().getTestObjectMapper();
- kubernetesTaskRunnerConfig = KubernetesTaskRunnerConfig.builder()
+ KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerStaticConfig =
KubernetesTaskRunnerConfig.builder()
.withCapacity(1)
.build();
taskLogs = new NoopTaskLogs();
druidKubernetesClient =
new DruidKubernetesClient(new
DruidKubernetesVertxHttpClientFactory(new
DruidKubernetesVertxHttpClientConfig()), new ConfigBuilder().build());
taskAdapter = new TestTaskAdapter();
+ kubernetesTaskRunnerConfig = new
KubernetesTaskRunnerEffectiveConfig(kubernetesTaskRunnerStaticConfig, () ->
null);
+ configManager = EasyMock.createNiceMock(ConfigManager.class);
+ EasyMock.replay(configManager);
}
@Test
@@ -72,7 +78,8 @@ public class KubernetesTaskRunnerFactoryTest
taskLogs,
druidKubernetesClient,
emitter,
- taskAdapter
+ taskAdapter,
+ configManager
);
KubernetesTaskRunner expectedRunner = factory.build();
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfigTest.java
similarity index 94%
rename from
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
rename to
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfigTest.java
index 1f4a7281f64..91b5148e268 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfigTest.java
@@ -29,15 +29,15 @@ import org.junit.Test;
import java.io.IOException;
-public class KubernetesTaskRunnerConfigTest
+public class KubernetesTaskRunnerStaticConfigTest
{
@Test
public void test_deserializable() throws IOException
{
ObjectMapper mapper = new DefaultObjectMapper();
- KubernetesTaskRunnerConfig config = mapper.readValue(
+ KubernetesTaskRunnerStaticConfig config = mapper.readValue(
this.getClass().getClassLoader().getResource("kubernetesTaskRunnerConfig.json"),
- KubernetesTaskRunnerConfig.class
+ KubernetesTaskRunnerStaticConfig.class
);
Assert.assertEquals("namespace", config.getNamespace());
@@ -60,7 +60,7 @@ public class KubernetesTaskRunnerConfigTest
@Test
public void test_builder_preservesDefaults()
{
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
.withNamespace("namespace")
.withDisableClientProxy(true)
.build();
@@ -85,7 +85,7 @@ public class KubernetesTaskRunnerConfigTest
@Test
public void test_builder()
{
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
.withNamespace("namespace")
.withDebugJob(true)
.withSidecarSupport(true)
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 083bf2db0e6..1c72078a52e 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.k8s.overlord;
import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -27,6 +28,7 @@ import com.google.common.util.concurrent.SettableFuture;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.io.IOUtils;
+import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -39,6 +41,8 @@ import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import
org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
+import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@@ -53,6 +57,8 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
@@ -60,6 +66,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -78,27 +85,37 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
@Mock private ServiceEmitter emitter;
@Mock private ListenableFuture<TaskStatus> statusFuture;
+ @Mock private ConfigManager configManager;
- private KubernetesTaskRunnerConfig config;
+ private KubernetesTaskRunnerStaticConfig staticConfig;
+ private KubernetesTaskRunnerEffectiveConfig config;
private KubernetesTaskRunner runner;
private Task task;
@Before
public void setup()
{
- config = KubernetesTaskRunnerConfig.builder()
+ staticConfig = KubernetesTaskRunnerConfig.builder()
.withCapacity(1)
.build();
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef = () -> new
DefaultKubernetesTaskRunnerDynamicConfig(null, 1);
+
+ config = new KubernetesTaskRunnerEffectiveConfig(staticConfig,
dynamicConfigRef);
+
task = K8sTestUtils.createTask(ID, 0);
+ configManager = EasyMock.createNiceMock(ConfigManager.class);
+ EasyMock.replay(configManager);
+
runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
+ emitter,
+ configManager
);
}
@@ -113,7 +130,8 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
+ emitter,
+ configManager
)
{
@Override
@@ -163,7 +181,8 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
+ emitter,
+ configManager
)
{
@Override
@@ -219,7 +238,8 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
+ emitter,
+ configManager
)
{
@Override
@@ -771,9 +791,75 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
+ emitter,
+ configManager
);
kubernetesTaskRunner.stop();
Assert.assertThrows(RejectedExecutionException.class, () ->
kubernetesTaskRunner.run(task));
}
+
+ @Test
+ public void
test_syncCapacityWithDynamicConfig_increase_updatesExecutorAndCapacity() throws
Exception
+ {
+ Method method = KubernetesTaskRunner.class.getDeclaredMethod(
+ "syncCapacityWithDynamicConfig",
+ KubernetesTaskRunnerDynamicConfig.class
+ );
+ method.setAccessible(true);
+
+ // increase from 1 -> 3
+ method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null,
3));
+
+ Field tpeField = KubernetesTaskRunner.class.getDeclaredField("tpe");
+ tpeField.setAccessible(true);
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) tpeField.get(runner);
+
+ Assert.assertEquals(3, executor.getCorePoolSize());
+ Assert.assertEquals(3, executor.getMaximumPoolSize());
+ Assert.assertEquals(3, runner.getTotalCapacity());
+ }
+
+ @Test
+ public void
test_syncCapacityWithDynamicConfig_decrease_updatesExecutorAndCapacity() throws
Exception
+ {
+ Method method = KubernetesTaskRunner.class.getDeclaredMethod(
+ "syncCapacityWithDynamicConfig",
+ KubernetesTaskRunnerDynamicConfig.class
+ );
+ method.setAccessible(true);
+
+ // first increase to 4 to ensure we can decrease after
+ method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null,
4));
+ // then decrease 4 -> 2
+ method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null,
2));
+
+ Field tpeField = KubernetesTaskRunner.class.getDeclaredField("tpe");
+ tpeField.setAccessible(true);
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) tpeField.get(runner);
+
+ Assert.assertEquals(2, executor.getCorePoolSize());
+ Assert.assertEquals(2, executor.getMaximumPoolSize());
+ Assert.assertEquals(2, runner.getTotalCapacity());
+ }
+
+ @Test
+ public void
test_syncCapacityWithDynamicConfig_sameCapacity_noChangeAndNoError() throws
Exception
+ {
+ Method method = KubernetesTaskRunner.class.getDeclaredMethod(
+ "syncCapacityWithDynamicConfig",
+ KubernetesTaskRunnerDynamicConfig.class
+ );
+ method.setAccessible(true);
+
+ // initial capacity is 1 in setup; calling with 1 should be a no-op
+ method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null,
1));
+
+ Field tpeField = KubernetesTaskRunner.class.getDeclaredField("tpe");
+ tpeField.setAccessible(true);
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) tpeField.get(runner);
+
+ Assert.assertEquals(1, executor.getCorePoolSize());
+ Assert.assertEquals(1, executor.getMaximumPoolSize());
+ Assert.assertEquals(1, runner.getTotalCapacity());
+ }
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
index de8919e329d..31c6d7fdd6d 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
@@ -31,17 +31,26 @@ public class DefaultKubernetesTaskRunnerDynamicConfigTest
public void getPodTemplateSelectStrategyTest()
{
PodTemplateSelectStrategy strategy = new
TaskTypePodTemplateSelectStrategy();
- DefaultKubernetesTaskRunnerDynamicConfig config = new
DefaultKubernetesTaskRunnerDynamicConfig(strategy);
+ DefaultKubernetesTaskRunnerDynamicConfig config = new
DefaultKubernetesTaskRunnerDynamicConfig(strategy, 1);
Assert.assertEquals(strategy, config.getPodTemplateSelectStrategy());
}
+ @Test
+ public void getCapacityTest()
+ {
+ Integer capacity = 4;
+ DefaultKubernetesTaskRunnerDynamicConfig config = new
DefaultKubernetesTaskRunnerDynamicConfig(null, 4);
+
+ Assert.assertEquals(capacity, config.getCapacity());
+ }
+
@Test
public void testSerde() throws Exception
{
final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
PodTemplateSelectStrategy strategy = new
TaskTypePodTemplateSelectStrategy();
- DefaultKubernetesTaskRunnerDynamicConfig config = new
DefaultKubernetesTaskRunnerDynamicConfig(strategy);
+ DefaultKubernetesTaskRunnerDynamicConfig config = new
DefaultKubernetesTaskRunnerDynamicConfig(strategy, 1);
DefaultKubernetesTaskRunnerDynamicConfig config2 = objectMapper.readValue(
objectMapper.writeValueAsBytes(config),
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
index b76b7eaf0cf..c06056f0113 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -52,6 +53,10 @@ public class KubernetesTaskExecutionConfigResourceTest
@Test
public void setExecutionConfigSuccessfulUpdate()
{
+ EasyMock.expect(configManager.watch(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ KubernetesTaskRunnerDynamicConfig.class
+ )).andReturn(new AtomicReference<>(null));
KubernetesTaskExecutionConfigResource testedResource = new
KubernetesTaskExecutionConfigResource(
configManager,
auditManager
@@ -75,6 +80,10 @@ public class KubernetesTaskExecutionConfigResourceTest
@Test
public void setExecutionConfigFailedUpdate()
{
+ EasyMock.expect(configManager.watch(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ KubernetesTaskRunnerDynamicConfig.class
+ )).andReturn(new AtomicReference<>(null));
KubernetesTaskExecutionConfigResource testedResource = new
KubernetesTaskExecutionConfigResource(
configManager,
auditManager
@@ -94,4 +103,116 @@ public class KubernetesTaskExecutionConfigResourceTest
Response result = testedResource.setExecutionConfig(dynamicConfig, req);
assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
result.getStatus());
}
+
+ @Test
+ public void
setExecutionConfig_MergeUsesCurrentCapacityWhenRequestCapacityNull()
+ {
+ KubernetesTaskExecutionConfigResource testedResource = new
KubernetesTaskExecutionConfigResource(
+ configManager,
+ auditManager
+ );
+
+ PodTemplateSelectStrategy currentStrategy = new
TaskTypePodTemplateSelectStrategy();
+ KubernetesTaskRunnerDynamicConfig currentConfig = new
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 5);
+ EasyMock.expect(configManager.watch(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ KubernetesTaskRunnerDynamicConfig.class
+ )).andReturn(new AtomicReference<>(currentConfig));
+
+ PodTemplateSelectStrategy requestStrategy = new
TaskTypePodTemplateSelectStrategy();
+ KubernetesTaskRunnerDynamicConfig requestConfig = new
DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, null);
+
+ KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new
DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, 5);
+
+
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
+
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
+
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
+ EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
+ EasyMock.replay(req);
+
+ EasyMock.expect(configManager.set(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ expectedMergedConfig,
+ AuthorizationUtils.buildAuditInfo(req)
+ )).andReturn(ConfigManager.SetResult.ok());
+
+ EasyMock.replay(configManager, auditManager);
+
+ Response result = testedResource.setExecutionConfig(requestConfig, req);
+ assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
+ }
+
+ @Test
+ public void
setExecutionConfig_MergeUsesCurrentStrategyWhenRequestStrategyNull()
+ {
+ KubernetesTaskExecutionConfigResource testedResource = new
KubernetesTaskExecutionConfigResource(
+ configManager,
+ auditManager
+ );
+
+ PodTemplateSelectStrategy currentStrategy = new
TaskTypePodTemplateSelectStrategy();
+ KubernetesTaskRunnerDynamicConfig currentConfig = new
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 2);
+ EasyMock.expect(configManager.watch(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ KubernetesTaskRunnerDynamicConfig.class
+ )).andReturn(new AtomicReference<>(currentConfig));
+
+ KubernetesTaskRunnerDynamicConfig requestConfig = new
DefaultKubernetesTaskRunnerDynamicConfig(null, 7);
+
+ KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 7);
+
+
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
+
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
+
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
+ EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
+ EasyMock.replay(req);
+
+ EasyMock.expect(configManager.set(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ expectedMergedConfig,
+ AuthorizationUtils.buildAuditInfo(req)
+ )).andReturn(ConfigManager.SetResult.ok());
+
+ EasyMock.replay(configManager, auditManager);
+
+ Response result = testedResource.setExecutionConfig(requestConfig, req);
+ assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
+ }
+
+ @Test
+ public void setExecutionConfig_MergeUsesCurrentWhenBothRequestFieldsNull()
+ {
+ KubernetesTaskExecutionConfigResource testedResource = new
KubernetesTaskExecutionConfigResource(
+ configManager,
+ auditManager
+ );
+
+ PodTemplateSelectStrategy currentStrategy = new
TaskTypePodTemplateSelectStrategy();
+ KubernetesTaskRunnerDynamicConfig currentConfig = new
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9);
+ EasyMock.expect(configManager.watch(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ KubernetesTaskRunnerDynamicConfig.class
+ )).andReturn(new AtomicReference<>(currentConfig));
+
+ KubernetesTaskRunnerDynamicConfig requestConfig = new
DefaultKubernetesTaskRunnerDynamicConfig(null, null);
+
+ KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new
DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9);
+
+
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
+
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
+
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
+ EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
+ EasyMock.replay(req);
+
+ EasyMock.expect(configManager.set(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ expectedMergedConfig,
+ AuthorizationUtils.buildAuditInfo(req)
+ )).andReturn(ConfigManager.SetResult.ok());
+
+ EasyMock.replay(configManager, auditManager);
+
+ Response result = testedResource.setExecutionConfig(requestConfig, req);
+ assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
+ }
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
index 77a819dde9c..b5fac1233c0 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
@@ -36,7 +36,8 @@ public class KubernetesTaskRunnerDynamicConfigTest
+ " \"type\": \"default\",\n"
+ " \"podTemplateSelectStrategy\": {\n"
+ " \"type\": \"default\"\n"
- + " }\n"
+ + " },\n"
+ + " \"capacity\": 3\n"
+ "}";
KubernetesTaskRunnerDynamicConfig deserialized = jsonMapper.readValue(
@@ -45,6 +46,7 @@ public class KubernetesTaskRunnerDynamicConfigTest
);
PodTemplateSelectStrategy selectStrategy =
deserialized.getPodTemplateSelectStrategy();
Assert.assertTrue(selectStrategy instanceof
TaskTypePodTemplateSelectStrategy);
+ Assert.assertEquals(Integer.valueOf(3), deserialized.getCapacity());
json = "{\n"
+ " \"type\": \"default\",\n"
@@ -72,5 +74,14 @@ public class KubernetesTaskRunnerDynamicConfigTest
selectStrategy = deserialized.getPodTemplateSelectStrategy();
Assert.assertTrue(selectStrategy instanceof
SelectorBasedPodTemplateSelectStrategy);
Assert.assertEquals(2, ((SelectorBasedPodTemplateSelectStrategy)
selectStrategy).getSelectors().size());
+ Assert.assertNull(deserialized.getCapacity());
+
+ json = "{\n"
+ + " \"type\": \"default\",\n"
+ + " \"capacity\": 12"
+ + "}";
+ deserialized = jsonMapper.readValue(json,
KubernetesTaskRunnerDynamicConfig.class);
+ Assert.assertEquals(Integer.valueOf(12), deserialized.getCapacity());
+ Assert.assertNull(deserialized.getPodTemplateSelectStrategy());
}
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 33f8349c619..016f3280a47 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
@@ -110,9 +111,9 @@ public class DruidPeonClientIntegrationTest
PodSpec podSpec = K8sTestUtils.getDummyPodSpec();
Task task = K8sTestUtils.getTask();
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("default")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("default")
+
.build();
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
k8sClient,
config,
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
index 106a98fa6a8..1052c97c537 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java
@@ -29,6 +29,9 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerEffectiveConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import
org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
@@ -53,14 +56,16 @@ public class DynamicConfigPodTemplateSelectorTest
private Path tempDir;
private ObjectMapper mapper;
private PodTemplate podTemplateSpec;
- private Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
+ private KubernetesTaskRunnerEffectiveConfig effectiveConfig;
@BeforeEach
public void setup()
{
mapper = new TestUtils().getTestObjectMapper();
podTemplateSpec = K8sTestUtils.fileToResource("basePodTemplate.yaml",
PodTemplate.class);
- dynamicConfigRef = () -> new
DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY);
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef = () -> new
DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY,
1);
+ KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder().build();
+ effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(staticConfig,
dynamicConfigRef);
}
@Test
@@ -71,7 +76,7 @@ public class DynamicConfigPodTemplateSelectorTest
IAE.class,
() -> new DynamicConfigPodTemplateSelector(
new Properties(),
- dynamicConfigRef
+ effectiveConfig
)
);
Assertions.assertEquals(
@@ -93,7 +98,7 @@ public class DynamicConfigPodTemplateSelectorTest
IAE.class,
() -> new DynamicConfigPodTemplateSelector(
props,
- dynamicConfigRef
+ effectiveConfig
)
);
@@ -111,7 +116,7 @@ public class DynamicConfigPodTemplateSelectorTest
DynamicConfigPodTemplateSelector adapter = new
DynamicConfigPodTemplateSelector(
props,
- dynamicConfigRef
+ effectiveConfig
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -146,7 +151,7 @@ public class DynamicConfigPodTemplateSelectorTest
DynamicConfigPodTemplateSelector selector = new
DynamicConfigPodTemplateSelector(
props,
- dynamicConfigRef
+ effectiveConfig
);
Task kafkaTask = new NoopTask("id", "id", "datasource", 0, 0, null)
@@ -191,9 +196,10 @@ public class DynamicConfigPodTemplateSelectorTest
props.setProperty("druid.indexer.runner.k8s.podTemplate.noop",
noopTemplatePath.toString());
Assert.assertThrows(IAE.class, () -> new DynamicConfigPodTemplateSelector(
- props,
- dynamicConfigRef
- ));
+ props,
+ effectiveConfig
+ )
+ );
}
@Test
@@ -208,7 +214,7 @@ public class DynamicConfigPodTemplateSelectorTest
DynamicConfigPodTemplateSelector podTemplateSelector = new
DynamicConfigPodTemplateSelector(
props,
- dynamicConfigRef
+ effectiveConfig
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -242,16 +248,21 @@ public class DynamicConfigPodTemplateSelectorTest
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base",
baseTemplatePath.toString());
props.setProperty("druid.indexer.runner.k8s.podTemplate.lowThroughput",
lowThroughputTemplatePath.toString());
- dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(new
SelectorBasedPodTemplateSelectStrategy(
- Collections.singletonList(
- new Selector("lowThroughput", null, null, Sets.newSet(dataSource)
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef = () -> new
DefaultKubernetesTaskRunnerDynamicConfig(
+ new SelectorBasedPodTemplateSelectStrategy(
+ Collections.singletonList(
+ new Selector("lowThroughput", null, null,
Sets.newSet(dataSource)
+ )
)
- )
- ));
+ ), 1
+ );
+
+ KubernetesTaskRunnerStaticConfig staticConfig =
KubernetesTaskRunnerConfig.builder().build();
+ effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(staticConfig,
dynamicConfigRef);
DynamicConfigPodTemplateSelector podTemplateSelector = new
DynamicConfigPodTemplateSelector(
props,
- dynamicConfigRef
+ effectiveConfig
);
Task taskWithMatchedDatasource = new NoopTask("id", "id", dataSource, 0,
0, null);
@@ -276,7 +287,7 @@ public class DynamicConfigPodTemplateSelectorTest
DynamicConfigPodTemplateSelector adapter = new
DynamicConfigPodTemplateSelector(
props,
- dynamicConfigRef
+ effectiveConfig
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -311,7 +322,7 @@ public class DynamicConfigPodTemplateSelectorTest
DynamicConfigPodTemplateSelector adapter = new
DynamicConfigPodTemplateSelector(
props,
- dynamicConfigRef
+ effectiveConfig
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
index 16cccf04e28..552d7201fd0 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
@@ -52,6 +52,7 @@ import org.apache.druid.indexing.common.task.Task;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
@@ -138,12 +139,12 @@ class K8sTaskAdapterTest
}
};
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("test")
- .withOverlordNamespace("test_different")
- .withAnnotations(ImmutableMap.of("annotation_key", "annotation_value"))
- .withLabels(ImmutableMap.of("label_key", "label_value"))
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("test")
+
.withOverlordNamespace("test_different")
+
.withAnnotations(ImmutableMap.of("annotation_key", "annotation_value"))
+
.withLabels(ImmutableMap.of("label_key", "label_value"))
+
.build();
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
@@ -175,9 +176,9 @@ class K8sTaskAdapterTest
{
// given a task create a k8s job
TestKubernetesClient testClient = new TestKubernetesClient(client);
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("test")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("test")
+
.build();
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
@@ -229,9 +230,9 @@ class K8sTaskAdapterTest
}
};
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("test")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("test")
+
.build();
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
@@ -277,9 +278,9 @@ class K8sTaskAdapterTest
public void toTask_useTaskPayloadManager() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("test")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("test")
+
.build();
Task taskInTaskPayloadManager = K8sTestUtils.getTask();
TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class);
Mockito.when(mockTestLogs.streamTaskPayload("ID")).thenReturn(com.google.common.base.Optional.of(
@@ -309,7 +310,7 @@ class K8sTaskAdapterTest
public void getTaskId()
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
- KubernetesTaskRunnerConfig config =
KubernetesTaskRunnerConfig.builder().build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder().build();
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
@@ -331,7 +332,7 @@ class K8sTaskAdapterTest
public void getTaskId_noAnnotations()
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
- KubernetesTaskRunnerConfig config =
KubernetesTaskRunnerConfig.builder().build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder().build();
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
@@ -353,7 +354,7 @@ class K8sTaskAdapterTest
public void getTaskId_missingTaskIdAnnotation()
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
- KubernetesTaskRunnerConfig config =
KubernetesTaskRunnerConfig.builder().build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder().build();
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
@@ -458,9 +459,9 @@ class K8sTaskAdapterTest
new File("/tmp/"),
0
);
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("test")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("test")
+
.build();
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
@@ -482,9 +483,9 @@ class K8sTaskAdapterTest
// we have an override, but nothing in the overlord
config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("test")
-
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
- .build();
+ .withNamespace("test")
+
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
+ .build();
adapter = new SingleContainerTaskAdapter(
testClient,
config,
@@ -532,9 +533,9 @@ class K8sTaskAdapterTest
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class);
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("namespace")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("namespace")
+
.build();
SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
@@ -582,9 +583,9 @@ class K8sTaskAdapterTest
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("probesPodSpec.yaml", Pod.class);
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-
.withNamespace("test")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("test")
+
.build();
SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
@@ -635,11 +636,11 @@ class K8sTaskAdapterTest
List<String> javaOpts = new ArrayList<>();
javaOpts.add("-Xms1G -Xmx2G -XX:MaxDirectMemorySize=3G");
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
-
.withNamespace("namespace")
-
.withJavaOptsArray(javaOpts)
-
.withCpuCore(2000)
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("namespace")
+
.withJavaOptsArray(javaOpts)
+
.withCpuCore(2000)
+
.build();
SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
index 48330d62506..477758d9ac4 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
@@ -87,9 +88,9 @@ class MultiContainerTaskAdapterTest
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml",
Pod.class);
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("namespace")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("namespace")
+
.build();
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
testClient,
config,
@@ -138,10 +139,10 @@ class MultiContainerTaskAdapterTest
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpecOrder.yaml",
Pod.class);
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("namespace")
- .withPrimaryContainerName("primary")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("namespace")
+
.withPrimaryContainerName("primary")
+
.build();
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
testClient,
config,
@@ -192,11 +193,11 @@ class MultiContainerTaskAdapterTest
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("podSpec.yaml", Pod.class);
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("namespace")
- .withPrimaryContainerName("primary")
-
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("namespace")
+
.withPrimaryContainerName("primary")
+
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
+
.build();
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
testClient,
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
index 45f2c356cfe..01d99a896f6 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
@@ -33,6 +33,7 @@ import
org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
@@ -58,7 +59,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class PodTemplateTaskAdapterTest
{
- private KubernetesTaskRunnerConfig taskRunnerConfig;
+ private KubernetesTaskRunnerStaticConfig taskRunnerConfig;
private PodTemplate podTemplateSpec;
private TaskConfig taskConfig;
private DruidNode node;
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
index d7d74cf1812..832a7292304 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
@@ -86,9 +87,9 @@ class SingleContainerTaskAdapterTest
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml",
Pod.class);
- KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
- .withNamespace("namespace")
- .build();
+ KubernetesTaskRunnerStaticConfig config =
KubernetesTaskRunnerConfig.builder()
+
.withNamespace("namespace")
+
.build();
SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
diff --git
a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java
b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java
index 87cbad37941..12265df31e2 100644
--- a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java
+++ b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
/**
*/
@@ -219,6 +220,26 @@ public class ConfigManager
}
}
+ public <T> boolean addListener(String configKey, String listenerKey,
Consumer<T> listener)
+ {
+ ConfigHolder holder = watchedConfigs.get(configKey);
+ if (holder == null) {
+ log.warn("ConfigHolder not found for configKey[%s]", configKey);
+ return false;
+ }
+ return holder.addListener(listenerKey, listener);
+ }
+
+ public <T> boolean removeListener(String configKey, String listenerKey,
Consumer<T> listener)
+ {
+ ConfigHolder holder = watchedConfigs.get(configKey);
+ if (holder == null) {
+ log.warn("ConfigHolder not found for configKey[%s]", configKey);
+ return false;
+ }
+ return holder.removeListener(listenerKey, listener);
+ }
+
@Nonnull
private MetadataCASUpdate createMetadataCASUpdate(
String keyValue,
@@ -285,6 +306,7 @@ public class ConfigManager
private final AtomicReference<byte[]> rawBytes;
private final ConfigSerde<T> serde;
private final AtomicReference<T> reference;
+ private final ConcurrentMap<String, Consumer<T>> listeners;
ConfigHolder(
byte[] rawBytes,
@@ -294,6 +316,7 @@ public class ConfigManager
this.rawBytes = new AtomicReference<>(rawBytes);
this.serde = serde;
this.reference = new AtomicReference<>(serde.deserialize(rawBytes));
+ this.listeners = new ConcurrentHashMap<>();
}
public AtomicReference<T> getReference()
@@ -306,10 +329,38 @@ public class ConfigManager
if (!Arrays.equals(newBytes, rawBytes.get())) {
reference.set(serde.deserialize(newBytes));
rawBytes.set(newBytes);
+ listeners.forEach((key, listener) -> {
+ try {
+ listener.accept(reference.get());
+ }
+ catch (Exception e) {
+ log.warn(e, "Exception when calling listener for key[%s]", key);
+ }
+ });
return true;
}
return false;
}
+
+ public boolean addListener(String key, Consumer<T> listener)
+ {
+ Consumer<T> val = listeners.putIfAbsent(key, listener);
+ if (val != null) {
+ log.warn("Listener key[%s] already exists", key);
+ return false;
+ }
+ return true;
+ }
+
+ public boolean removeListener(String key, Consumer<T> listener)
+ {
+ boolean isRemoved = listeners.remove(key, listener);
+ if (!isRemoved) {
+ log.warn("Listener key[%s] not found", key);
+ return false;
+ }
+ return true;
+ }
}
private class PollingCallable implements Callable<ScheduledExecutors.Signal>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]