This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/28.0.0 by this push:
new f1057043c89 Ability to send task types to k8s or worker task runner
(#15196) (#15255)
f1057043c89 is described below
commit f1057043c89548b5134109c50cd268bf63912166
Author: Suneet Saldanha <[email protected]>
AuthorDate: Wed Oct 25 13:30:12 2023 -0700
Ability to send task types to k8s or worker task runner (#15196) (#15255)
---
docs/development/extensions-contrib/k8s-jobs.md | 7 +-
.../overlord/KubernetesAndWorkerTaskRunner.java | 10 +-
.../KubernetesAndWorkerTaskRunnerConfig.java | 51 ++++----
.../KubernetesAndWorkerTaskRunnerFactory.java | 19 ++-
.../k8s/overlord/KubernetesOverlordModule.java | 55 ++++++++-
.../runnerstrategy/KubernetesRunnerStrategy.java | 43 +++++++
.../overlord/runnerstrategy/RunnerStrategy.java | 75 ++++++++++++
.../runnerstrategy/TaskTypeRunnerStrategy.java | 128 +++++++++++++++++++++
.../runnerstrategy/WorkerRunnerStrategy.java | 43 +++++++
.../KubernetesAndWorkerTaskRunnerConfigTest.java | 10 +-
.../KubernetesAndWorkerTaskRunnerFactoryTest.java | 11 +-
.../KubernetesAndWorkerTaskRunnerTest.java | 36 +++++-
.../KubernetesRunnerStrategyTest.java | 43 +++++++
.../runnerstrategy/TaskTypeRunnerStrategyTest.java | 64 +++++++++++
.../runnerstrategy/WorkerRunnerStrategyTest.java | 43 +++++++
.../kubernetesAndWorkerTaskRunnerConfig.json | 4 +-
16 files changed, 587 insertions(+), 55 deletions(-)
diff --git a/docs/development/extensions-contrib/k8s-jobs.md
b/docs/development/extensions-contrib/k8s-jobs.md
index 2132b55ea1e..27290a9bee5 100644
--- a/docs/development/extensions-contrib/k8s-jobs.md
+++ b/docs/development/extensions-contrib/k8s-jobs.md
@@ -284,6 +284,9 @@ To do this, set the following property.
|Property| Possible Values |Description|Default|required|
|--------|-----------------|-----------|-------|--------|
-|`druid.indexer.runner.k8sAndWorker.workerTaskRunnerType`|`String`|Determines
whether the `httpRemote` or the `remote` task runner should be used in addition
to the Kubernetes task runner.|`httpRemote`|No|
-|`druid.indexer.runner.k8sAndWorker.sendAllTasksToWorkerTaskRunner`|`boolean`|
Whether to send all the tasks to the worker task runner. If this is set to
false all tasks will be sent to Kubernetes|`false`|No|
+|`druid.indexer.runner.k8sAndWorker.runnerStrategy.type`| `String` (e.g.,
`k8s`, `worker`, `taskType`)| Defines the strategy for task runner selection.
|`k8s`|No|
+|`druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType`| `String`
(e.g., `httpRemote`, `remote`)| Specifies the variant of the worker task runner
to be utilized.|`httpRemote`|No|
+| **For `taskType` runner strategy:**|||||
+|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.default`| `String`
(e.g., `k8s`, `worker`) | Specifies the default runner to use if no overrides
apply. This setting ensures there is always a fallback runner
available.|None|No|
+|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.overrides`|
`JsonObject`(e.g., `{"index_kafka": "worker"}`)| Defines task-specific
overrides for runner types. Each entry sets a task type to a specific runner,
allowing fine control. |`{}`|No|
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
index 243f6626c66..2c45a0ec7b8 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
@@ -38,6 +38,7 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
import org.apache.druid.tasklogs.TaskLogStreamer;
import javax.annotation.Nullable;
@@ -57,17 +58,17 @@ public class KubernetesAndWorkerTaskRunner implements
TaskLogStreamer, WorkerTas
{
private final KubernetesTaskRunner kubernetesTaskRunner;
private final WorkerTaskRunner workerTaskRunner;
- private final KubernetesAndWorkerTaskRunnerConfig
kubernetesAndWorkerTaskRunnerConfig;
+ private final RunnerStrategy runnerStrategy;
public KubernetesAndWorkerTaskRunner(
KubernetesTaskRunner kubernetesTaskRunner,
WorkerTaskRunner workerTaskRunner,
- KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
+ RunnerStrategy runnerStrategy
)
{
this.kubernetesTaskRunner = kubernetesTaskRunner;
this.workerTaskRunner = workerTaskRunner;
- this.kubernetesAndWorkerTaskRunnerConfig =
kubernetesAndWorkerTaskRunnerConfig;
+ this.runnerStrategy = runnerStrategy;
}
@Override
@@ -101,7 +102,8 @@ public class KubernetesAndWorkerTaskRunner implements
TaskLogStreamer, WorkerTas
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
- if
(kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) {
+ RunnerStrategy.RunnerType runnerType =
runnerStrategy.getRunnerTypeForTask(task);
+ if (RunnerStrategy.RunnerType.WORKER_RUNNER_TYPE.equals(runnerType)) {
return workerTaskRunner.run(task);
} else {
return kubernetesTaskRunner.run(task);
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
index 8e6fb8f7c61..0ffeb0103af 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
@@ -26,56 +26,49 @@ import org.apache.commons.lang3.ObjectUtils;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
-import javax.validation.constraints.NotNull;
+import javax.annotation.Nullable;
public class KubernetesAndWorkerTaskRunnerConfig
{
- private static final String DEFAULT_WORKER_TASK_RUNNER_TYPE =
HttpRemoteTaskRunnerFactory.TYPE_NAME;
- /**
- * Select which worker task runner to use in addition to the Kubernetes
runner, options are httpRemote or remote.
- */
- @JsonProperty
- @NotNull
- private final String workerTaskRunnerType;
+ private final String RUNNER_STRATEGY_TYPE = "runnerStrategy.type";
+ private final String RUNNER_STRATEGY_WORKER_TYPE =
"runnerStrategy.workerType";
/**
- * Whether or not to send tasks to the worker task runner instead of the
Kubernetes runner.
+ * Select which runner type a task would run on, options are k8s or worker.
*/
- @JsonProperty
- @NotNull
- private final boolean sendAllTasksToWorkerTaskRunner;
+ @JsonProperty(RUNNER_STRATEGY_TYPE)
+ private String runnerStrategy;
+
+ @JsonProperty(RUNNER_STRATEGY_WORKER_TYPE)
+ private String workerType;
@JsonCreator
public KubernetesAndWorkerTaskRunnerConfig(
- @JsonProperty("workerTaskRunnerType") String workerTaskRunnerType,
- @JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean
sendAllTasksToWorkerTaskRunner
+ @JsonProperty(RUNNER_STRATEGY_TYPE) @Nullable String runnerStrategy,
+ @JsonProperty(RUNNER_STRATEGY_WORKER_TYPE) @Nullable String workerType
)
{
- this.workerTaskRunnerType = ObjectUtils.defaultIfNull(
- workerTaskRunnerType,
- DEFAULT_WORKER_TASK_RUNNER_TYPE
- );
+ this.runnerStrategy = ObjectUtils.defaultIfNull(runnerStrategy,
KubernetesTaskRunnerFactory.TYPE_NAME);
+ this.workerType = ObjectUtils.defaultIfNull(workerType,
HttpRemoteTaskRunnerFactory.TYPE_NAME);
Preconditions.checkArgument(
-
this.workerTaskRunnerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
- this.workerTaskRunnerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
- "workerTaskRunnerType must be set to one of (%s, %s)",
+ this.workerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
+ this.workerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
+ "workerType must be set to one of (%s, %s)",
HttpRemoteTaskRunnerFactory.TYPE_NAME,
RemoteTaskRunnerFactory.TYPE_NAME
);
- this.sendAllTasksToWorkerTaskRunner = ObjectUtils.defaultIfNull(
- sendAllTasksToWorkerTaskRunner,
- false
- );
}
- public String getWorkerTaskRunnerType()
+ @JsonProperty(RUNNER_STRATEGY_TYPE)
+ public String getRunnerStrategy()
{
- return workerTaskRunnerType;
+ return runnerStrategy;
}
- public boolean isSendAllTasksToWorkerTaskRunner()
+ @JsonProperty(RUNNER_STRATEGY_WORKER_TYPE)
+ public String getWorkerType()
{
- return sendAllTasksToWorkerTaskRunner;
+ return workerType;
}
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
index 49ca454f50a..de6db915c8a 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
@@ -22,7 +22,9 @@ package org.apache.druid.k8s.overlord;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
+import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
public class KubernetesAndWorkerTaskRunnerFactory implements
TaskRunnerFactory<KubernetesAndWorkerTaskRunner>
@@ -33,6 +35,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements
TaskRunnerFactory<K
private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
private final RemoteTaskRunnerFactory remoteTaskRunnerFactory;
private final KubernetesAndWorkerTaskRunnerConfig
kubernetesAndWorkerTaskRunnerConfig;
+ private final RunnerStrategy runnerStrategy;
private KubernetesAndWorkerTaskRunner runner;
@@ -41,13 +44,15 @@ public class KubernetesAndWorkerTaskRunnerFactory
implements TaskRunnerFactory<K
KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory,
HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory,
RemoteTaskRunnerFactory remoteTaskRunnerFactory,
- KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
+ KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig,
+ RunnerStrategy runnerStrategy
)
{
this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory;
this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory;
this.remoteTaskRunnerFactory = remoteTaskRunnerFactory;
this.kubernetesAndWorkerTaskRunnerConfig =
kubernetesAndWorkerTaskRunnerConfig;
+ this.runnerStrategy = runnerStrategy;
}
@Override
@@ -55,13 +60,19 @@ public class KubernetesAndWorkerTaskRunnerFactory
implements TaskRunnerFactory<K
{
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunnerFactory.build(),
-
HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(kubernetesAndWorkerTaskRunnerConfig.getWorkerTaskRunnerType())
?
- httpRemoteTaskRunnerFactory.build() :
remoteTaskRunnerFactory.build(),
- kubernetesAndWorkerTaskRunnerConfig
+ getWorkerTaskRunner(),
+ runnerStrategy
);
return runner;
}
+ private WorkerTaskRunner getWorkerTaskRunner()
+ {
+ String workerType = kubernetesAndWorkerTaskRunnerConfig.getWorkerType();
+ return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) ?
+ httpRemoteTaskRunnerFactory.build() :
remoteTaskRunnerFactory.build();
+ }
+
@Override
public KubernetesAndWorkerTaskRunner get()
{
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index 654a1cd108c..afd9d9a7c4e 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -20,7 +20,9 @@
package org.apache.druid.k8s.overlord;
import com.google.inject.Binder;
+import com.google.inject.Inject;
import com.google.inject.Key;
+import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import io.fabric8.kubernetes.client.Config;
@@ -29,6 +31,7 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.LoadScope;
@@ -37,27 +40,35 @@ import
org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
+import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;
+import java.util.Properties;
+
@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
public class KubernetesOverlordModule implements DruidModule
{
private static final Logger log = new Logger(KubernetesOverlordModule.class);
+ private static final String K8SANDWORKER_PROPERTIES_PREFIX =
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX
+ +
".k8sAndWorker";
+ private static final String RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING =
K8SANDWORKER_PROPERTIES_PREFIX
+ +
".runnerStrategy.%s";
@Override
public void configure(Binder binder)
{
// druid.indexer.runner.type=k8s
JsonConfigProvider.bind(binder,
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX,
KubernetesTaskRunnerConfig.class);
- JsonConfigProvider.bind(binder,
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8sAndWorker",
KubernetesAndWorkerTaskRunnerConfig.class);
+ JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX,
KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue",
TaskQueueConfig.class);
PolyBind.createChoice(
binder,
@@ -78,6 +89,9 @@ public class KubernetesOverlordModule implements DruidModule
.in(LazySingleton.class);
binder.bind(KubernetesTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(KubernetesAndWorkerTaskRunnerFactory.class).in(LazySingleton.class);
+ binder.bind(RunnerStrategy.class)
+ .toProvider(RunnerStrategyProvider.class)
+ .in(LazySingleton.class);
configureTaskLogs(binder);
}
@@ -116,6 +130,45 @@ public class KubernetesOverlordModule implements
DruidModule
return client;
}
+ private static class RunnerStrategyProvider implements
Provider<RunnerStrategy>
+ {
+ private KubernetesAndWorkerTaskRunnerConfig runnerConfig;
+ private Properties props;
+ private JsonConfigurator configurator;
+
+ @Inject
+ public void inject(
+ KubernetesAndWorkerTaskRunnerConfig runnerConfig,
+ Properties props,
+ JsonConfigurator configurator
+ )
+ {
+ this.runnerConfig = runnerConfig;
+ this.props = props;
+ this.configurator = configurator;
+ }
+
+ @Override
+ public RunnerStrategy get()
+ {
+ String runnerStrategy = runnerConfig.getRunnerStrategy();
+
+ final String runnerStrategyPropertyBase = StringUtils.format(
+ RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING,
+ runnerStrategy
+ );
+ final JsonConfigProvider<RunnerStrategy> provider =
JsonConfigProvider.of(
+ runnerStrategyPropertyBase,
+ RunnerStrategy.class
+ );
+
+ props.put(runnerStrategyPropertyBase + ".type", runnerStrategy);
+ provider.inject(props, configurator);
+
+ return provider.get();
+ }
+ }
+
private void configureTaskLogs(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type",
Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java
new file mode 100644
index 00000000000..8b0a6374ad4
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.indexing.common.task.Task;
+
+/**
+ * Implementation of {@link RunnerStrategy} that always selects the Kubernetes
runner type for tasks.
+ *
+ * <p>This strategy is specific for tasks that are intended to be executed in
a Kubernetes environment.
+ * Regardless of task specifics, this strategy always returns {@link
RunnerType#KUBERNETES_RUNNER_TYPE}.
+ */
+public class KubernetesRunnerStrategy implements RunnerStrategy
+{
+ @JsonCreator
+ public KubernetesRunnerStrategy()
+ {
+ }
+
+ @Override
+ public RunnerType getRunnerTypeForTask(Task task)
+ {
+ return RunnerType.KUBERNETES_RUNNER_TYPE;
+ }
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java
new file mode 100644
index 00000000000..5aa2bc4723a
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory;
+
+/**
+ * Strategy interface for selecting the appropriate runner type based on the
task spec or specific context conditions.
+ *
+ * <p>This interface is part of a strategy pattern and is implemented by
different classes that handle
+ * the logic of selecting a runner type based on various criteria. Each task
submitted to the system
+ * will pass through the strategy implementation to determine the correct
runner for execution.
+ *
+ * <p>The strategy uses {@link RunnerType} as a standardized way of referring
to and managing different types of runners.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl =
KubernetesRunnerStrategy.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "k8s", value = KubernetesRunnerStrategy.class),
+ @JsonSubTypes.Type(name = "worker", value = WorkerRunnerStrategy.class),
+ @JsonSubTypes.Type(name = "taskType", value = TaskTypeRunnerStrategy.class)
+})
+public interface RunnerStrategy
+{
+ String WORKER_NAME = "worker";
+
+ /**
+ * Enumerates the available runner types, each associated with a specific
method of task execution.
+ * These runner types are used by the strategies to make decisions and by
the system to route tasks appropriately.
+ */
+ enum RunnerType
+ {
+ KUBERNETES_RUNNER_TYPE(KubernetesTaskRunnerFactory.TYPE_NAME),
+ WORKER_RUNNER_TYPE(WORKER_NAME);
+
+ private final String type;
+
+ RunnerType(String type)
+ {
+ this.type = type;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+ }
+
+ /**
+ * Analyzes the task and determines the appropriate runner type for
executing it.
+ *
+ * @param task The task that needs to be executed.
+ * @return The runner type deemed most suitable for executing the task.
+ */
+ RunnerType getRunnerTypeForTask(Task task);
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java
new file mode 100644
index 00000000000..6a16314be5b
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.indexing.common.task.Task;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Implementation of {@link RunnerStrategy} that allows dynamic selection of
runner type based on task type.
+ *
+ * <p>This strategy checks each task's type against a set of overrides to
determine the appropriate runner type.
+ * If no override is specified for a task's type, it uses a default runner.
+ *
+ * <p>Runner types are determined based on configurations provided at
construction, including default runner
+ * type and specific overrides per task type. This strategy is designed for
environments where tasks may require
+ * different execution environments (e.g., Kubernetes or worker nodes).
+ */
+public class TaskTypeRunnerStrategy implements RunnerStrategy
+{
+ @Nullable
+ private final Map<String, String> overrides;
+ private final RunnerStrategy kubernetesRunnerStrategy = new
KubernetesRunnerStrategy();
+ private WorkerRunnerStrategy workerRunnerStrategy;
+ private final RunnerStrategy defaultRunnerStrategy;
+ private final String defaultRunner;
+
+ @JsonCreator
+ public TaskTypeRunnerStrategy(
+ @JsonProperty("default") String defaultRunner,
+ @JsonProperty("overrides") @Nullable Map<String, String> overrides
+ )
+ {
+ Preconditions.checkNotNull(defaultRunner);
+ workerRunnerStrategy = new WorkerRunnerStrategy();
+ defaultRunnerStrategy =
RunnerType.WORKER_RUNNER_TYPE.getType().equals(defaultRunner) ?
+ workerRunnerStrategy : kubernetesRunnerStrategy;
+ validate(overrides);
+ this.defaultRunner = defaultRunner;
+ this.overrides = overrides;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map<String, String> getOverrides()
+ {
+ return overrides;
+ }
+
+ @JsonProperty
+ public String getDefault()
+ {
+ return defaultRunner;
+ }
+
+ @Override
+ public RunnerType getRunnerTypeForTask(Task task)
+ {
+ String runnerType = null;
+ if (overrides != null) {
+ runnerType = overrides.get(task.getType());
+ }
+
+ RunnerStrategy runnerStrategy = getRunnerSelectStrategy(runnerType);
+ return runnerStrategy.getRunnerTypeForTask(task);
+ }
+
+ private RunnerStrategy getRunnerSelectStrategy(String runnerType)
+ {
+ if (runnerType == null) {
+ return defaultRunnerStrategy;
+ }
+
+ if (WORKER_NAME.equals(runnerType)) {
+ return workerRunnerStrategy;
+ } else {
+ return kubernetesRunnerStrategy;
+ }
+ }
+
+ private void validate(Map<String, String> overrides)
+ {
+ if (overrides == null) {
+ return;
+ }
+
+ boolean hasValidRunnerType =
+ overrides.values().stream().allMatch(v ->
RunnerType.WORKER_RUNNER_TYPE.getType().equals(v)
+ ||
RunnerType.KUBERNETES_RUNNER_TYPE.getType().equals(v));
+ Preconditions.checkArgument(
+ hasValidRunnerType,
+ "Invalid config in 'overrides'. Each runner type must be either '%s'
or '%s'.",
+ RunnerType.WORKER_RUNNER_TYPE.getType(),
+ RunnerType.KUBERNETES_RUNNER_TYPE.getType()
+ );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TaskTypeRunnerStrategy{" +
+ "default=" + defaultRunner +
+ ", overrides=" + overrides +
+ '}';
+ }
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java
new file mode 100644
index 00000000000..bd06f91aa8f
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.indexing.common.task.Task;
+
+/**
+ * Implementation of {@link RunnerStrategy} that always selects the Worker
runner type for tasks.
+ *
+ * <p>This strategy is specific for tasks that are intended to be executed in
a Worker environment.
+ * Regardless of task specifics, this strategy always returns {@link
RunnerType#WORKER_RUNNER_TYPE}.
+ */
+public class WorkerRunnerStrategy implements RunnerStrategy
+{
+ @JsonCreator
+ public WorkerRunnerStrategy()
+ {
+ }
+
+ @Override
+ public RunnerType getRunnerTypeForTask(Task task)
+ {
+ return RunnerType.WORKER_RUNNER_TYPE;
+ }
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
index 979aba69291..8ad631682f9 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
@@ -20,8 +20,6 @@
package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
-import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
@@ -39,9 +37,8 @@ public class KubernetesAndWorkerTaskRunnerConfigTest
KubernetesAndWorkerTaskRunnerConfig.class
);
- Assert.assertEquals(RemoteTaskRunnerFactory.TYPE_NAME,
config.getWorkerTaskRunnerType());
- Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
-
+ Assert.assertEquals("worker", config.getRunnerStrategy());
+ Assert.assertEquals("remote", config.getWorkerType());
}
@Test
@@ -49,7 +46,6 @@ public class KubernetesAndWorkerTaskRunnerConfigTest
{
KubernetesAndWorkerTaskRunnerConfig config = new
KubernetesAndWorkerTaskRunnerConfig(null, null);
- Assert.assertEquals(HttpRemoteTaskRunnerFactory.TYPE_NAME,
config.getWorkerTaskRunnerType());
- Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
+ Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME,
config.getRunnerStrategy());
}
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
index c8e6d3afa03..88696017d05 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.k8s.overlord;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
+import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
+import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
@@ -45,7 +47,8 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends
EasyMockSupport
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
- new KubernetesAndWorkerTaskRunnerConfig(null, null)
+ new KubernetesAndWorkerTaskRunnerConfig(null, null),
+ new WorkerRunnerStrategy()
);
EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null);
@@ -63,7 +66,8 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends
EasyMockSupport
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
- new KubernetesAndWorkerTaskRunnerConfig("remote", null)
+ new KubernetesAndWorkerTaskRunnerConfig(null, "remote"),
+ new WorkerRunnerStrategy()
);
EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
@@ -81,7 +85,8 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends
EasyMockSupport
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
- new KubernetesAndWorkerTaskRunnerConfig("noop", null)
+ new KubernetesAndWorkerTaskRunnerConfig(null, "noop"),
+ new KubernetesRunnerStrategy()
);
EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
index af5a6c39bb0..3ab515cc6e5 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
@@ -31,6 +31,9 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
+import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
+import org.apache.druid.k8s.overlord.runnerstrategy.TaskTypeRunnerStrategy;
+import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
@@ -67,7 +70,7 @@ public class KubernetesAndWorkerTaskRunnerTest extends
EasyMockSupport
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
- new KubernetesAndWorkerTaskRunnerConfig(null, null)
+ new KubernetesRunnerStrategy()
);
}
@@ -77,7 +80,7 @@ public class KubernetesAndWorkerTaskRunnerTest extends
EasyMockSupport
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new
KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
- new KubernetesAndWorkerTaskRunnerConfig(null, false)
+ new KubernetesRunnerStrategy()
);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(kubernetesTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
@@ -93,7 +96,7 @@ public class KubernetesAndWorkerTaskRunnerTest extends
EasyMockSupport
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new
KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
- new KubernetesAndWorkerTaskRunnerConfig(null, true)
+ new WorkerRunnerStrategy()
);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(workerTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
@@ -103,6 +106,33 @@ public class KubernetesAndWorkerTaskRunnerTest extends
EasyMockSupport
verifyAll();
}
+ @Test
+ public void test_runOnKubernetesOrWorkerBasedOnStrategy() throws
ExecutionException, InterruptedException
+ {
+ TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s",
ImmutableMap.of("index_kafka", "worker"));
+ KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new
KubernetesAndWorkerTaskRunner(
+ kubernetesTaskRunner,
+ workerTaskRunner,
+ runnerStrategy
+ );
+ Task taskMock = EasyMock.createMock(Task.class);
+ TaskStatus taskStatus = TaskStatus.success(ID);
+ EasyMock.expect(taskMock.getId()).andReturn(ID).anyTimes();
+
+ EasyMock.expect(taskMock.getType()).andReturn("index_kafka").once();
+
EasyMock.expect(workerTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
+ EasyMock.replay(taskMock, workerTaskRunner);
+ Assert.assertEquals(taskStatus,
kubernetesAndWorkerTaskRunner.run(taskMock).get());
+ EasyMock.verify(taskMock, workerTaskRunner);
+ EasyMock.reset(taskMock, workerTaskRunner);
+
+ EasyMock.expect(taskMock.getType()).andReturn("compact").once();
+
EasyMock.expect(kubernetesTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
+ EasyMock.replay(taskMock, kubernetesTaskRunner);
+ Assert.assertEquals(taskStatus,
kubernetesAndWorkerTaskRunner.run(taskMock).get());
+ EasyMock.verify(taskMock, kubernetesTaskRunner);
+ }
+
@Test
public void test_getUsedCapacity()
{
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java
new file mode 100644
index 00000000000..880d5528ac7
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import org.apache.druid.indexing.common.task.Task;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class KubernetesRunnerStrategyTest extends EasyMockSupport
+{
+ @Mock
+ Task task;
+
+ @Test
+ public void test_kubernetesRunnerStrategy_returnsCorrectRunnerType()
+ {
+ KubernetesRunnerStrategy runnerStrategy = new KubernetesRunnerStrategy();
+
+ Assert.assertEquals(RunnerStrategy.RunnerType.KUBERNETES_RUNNER_TYPE,
runnerStrategy.getRunnerTypeForTask(task));
+ }
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java
new file mode 100644
index 00000000000..a32630ed614
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class TaskTypeRunnerStrategyTest extends EasyMockSupport
+{
+ @Mock
+ Task task;
+
+ @Test
+ public void test_taskTypeRunnerStrategy_returnsCorrectRunnerType()
+ {
+ TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s",
ImmutableMap.of("index_kafka", "worker"));
+ EasyMock.expect(task.getType()).andReturn("index_kafka");
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(task.getType()).andReturn("compact");
+ EasyMock.expectLastCall().once();
+ replayAll();
+ Assert.assertEquals(RunnerStrategy.WORKER_NAME,
runnerStrategy.getRunnerTypeForTask(task).getType());
+ Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME,
runnerStrategy.getRunnerTypeForTask(task).getType());
+ verifyAll();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void test_invalidOverridesConfig_shouldThrowException()
+ {
+ new TaskTypeRunnerStrategy(
+ "k8s",
+ ImmutableMap.of(
+ "index_kafka",
+ "non_exist_runner"
+ )
+ );
+ }
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java
new file mode 100644
index 00000000000..1a3ae34fc6a
--- /dev/null
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+
+import org.apache.druid.indexing.common.task.Task;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class WorkerRunnerStrategyTest extends EasyMockSupport
+{
+ @Mock
+ Task task;
+
+ @Test
+ public void test_workerRunnerStrategy_returnsCorrectRunnerType()
+ {
+ WorkerRunnerStrategy runnerStrategy = new WorkerRunnerStrategy();
+ Assert.assertEquals(RunnerStrategy.WORKER_NAME,
runnerStrategy.getRunnerTypeForTask(task).getType());
+ }
+}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
index 757b07ebda5..43e7414f11f 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
@@ -1,4 +1,4 @@
{
- "workerTaskRunnerType": "remote",
- "sendAllTasksToWorkerTaskRunner": false
+ "runnerStrategy.type": "worker",
+ "runnerStrategy.workerType": "remote"
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]