suneet-s commented on code in PR #14918: URL: https://github.com/apache/druid/pull/14918#discussion_r1317700175
########## docs/development/extensions-contrib/k8s-jobs.md: ########## @@ -272,4 +272,18 @@ roleRef: kind: Role name: druid-k8s-task-scheduler apiGroup: rbac.authorization.k8s.io -``` \ No newline at end of file +``` + +## Migration/Kubernetes and Worker Task Runner +If you are running a cluster with tasks running on middle managers or indexers and want to do a zero downtime migration to mm-less ingestion, the mm-less ingestion system is capable of running in migration mode by reading tasks from middle managers/indexers and Kubernetes and writing tasks to either middle managers or to Kubernetes. + +To do this, set the following property. +`druid.indexer.runner.type: k8sAndWorker` (instead of `druid.indexer.runner.type: k8s`) Review Comment: ```suggestion `druid.indexer.runner.type: k8sAndWorker` ``` ########## extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.TaskRunner; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.WorkerTaskRunner; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; +import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.tasklogs.TaskLogStreamer; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Mixed mode task runner that can run tasks on either Kubernetes or workers based on KubernetesAndWorkerTaskRunnerConfig. + * This task runner is always aware of task runner running on either system. + */ +public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTaskRunner +{ + private final KubernetesTaskRunner kubernetesTaskRunner; + private final WorkerTaskRunner workerTaskRunner; + private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig; + + public KubernetesAndWorkerTaskRunner( + KubernetesTaskRunner kubernetesTaskRunner, + WorkerTaskRunner workerTaskRunner, + KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig + ) + { + this.kubernetesTaskRunner = kubernetesTaskRunner; + this.workerTaskRunner = workerTaskRunner; + this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig; + } + + @Override + public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() + { + return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.restore(), workerTaskRunner.restore())); + } + + @Override + @LifecycleStart + public void start() + { + } + + @Override + public void registerListener(TaskRunnerListener listener, Executor executor) + { + kubernetesTaskRunner.registerListener(listener, executor); + workerTaskRunner.registerListener(listener, executor); + } + + @Override + public void unregisterListener(String listenerId) + { + kubernetesTaskRunner.unregisterListener(listenerId); + workerTaskRunner.unregisterListener(listenerId); + } + + @Override + public ListenableFuture<TaskStatus> run(Task task) + { + if (kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) { + return workerTaskRunner.run(task); + } else { + return kubernetesTaskRunner.run(task); + } + } + + @Override + public void shutdown(String taskid, String reason) + { + // Technically this is a no-op for tasks a runner does't know about. + workerTaskRunner.shutdown(taskid, reason); + kubernetesTaskRunner.shutdown(taskid, reason); + } + + @Override + @LifecycleStop + public void stop() + { + } + + @Override + public Collection<? extends TaskRunnerWorkItem> getRunningTasks() + { + return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getRunningTasks(), workerTaskRunner.getRunningTasks())); + } + + @Override + public Collection<? extends TaskRunnerWorkItem> getPendingTasks() + { + return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getPendingTasks(), workerTaskRunner.getPendingTasks())); + + } + + @Override + public Collection<? extends TaskRunnerWorkItem> getKnownTasks() + { + return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getKnownTasks(), workerTaskRunner.getKnownTasks())); + + } + + @Override + public Optional<ScalingStats> getScalingStats() + { + return workerTaskRunner.getScalingStats(); + } + + @Override + public Map<String, Long> getTotalTaskSlotCount() + { + Map<String, Long> taskSlotCounts = new HashMap<>(); + taskSlotCounts.putAll(kubernetesTaskRunner.getTotalTaskSlotCount()); + taskSlotCounts.putAll(workerTaskRunner.getTotalTaskSlotCount()); + return taskSlotCounts; + } + + @Override + public Map<String, Long> getIdleTaskSlotCount() + { + Map<String, Long> taskSlotCounts = new HashMap<>(); + taskSlotCounts.putAll(kubernetesTaskRunner.getIdleTaskSlotCount()); + taskSlotCounts.putAll(workerTaskRunner.getIdleTaskSlotCount()); + return taskSlotCounts; + } + + @Override + public Map<String, Long> getUsedTaskSlotCount() + { + Map<String, Long> taskSlotCounts = new HashMap<>(); + taskSlotCounts.putAll(kubernetesTaskRunner.getUsedTaskSlotCount()); + taskSlotCounts.putAll(workerTaskRunner.getUsedTaskSlotCount()); + return taskSlotCounts; + } + + @Override + public Map<String, Long> getLazyTaskSlotCount() + { + Map<String, Long> taskSlotCounts = new HashMap<>(); + taskSlotCounts.putAll(kubernetesTaskRunner.getLazyTaskSlotCount()); + taskSlotCounts.putAll(workerTaskRunner.getLazyTaskSlotCount()); + return taskSlotCounts; + } + + @Override + public Map<String, Long> getBlacklistedTaskSlotCount() + { + Map<String, Long> taskSlotCounts = new HashMap<>(); + taskSlotCounts.putAll(kubernetesTaskRunner.getBlacklistedTaskSlotCount()); + taskSlotCounts.putAll(workerTaskRunner.getBlacklistedTaskSlotCount()); + return taskSlotCounts; + } + + @Override + public Collection<ImmutableWorkerInfo> getWorkers() + { + return workerTaskRunner.getWorkers(); + } + + @Override + public Collection<Worker> getLazyWorkers() + { + return workerTaskRunner.getLazyWorkers(); + } + + @Override + public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers) + { + return workerTaskRunner.markWorkersLazy(isLazyWorker, maxWorkers); + } + + @Override + public WorkerTaskRunnerConfig getConfig() + { + return workerTaskRunner.getConfig(); + } + + @Override + public Collection<Task> getPendingTaskPayloads() + { + return workerTaskRunner.getPendingTaskPayloads(); + } + + @Override + public Optional<InputStream> streamTaskLog(String taskid, long offset) throws IOException + { + Optional<InputStream> kubernetesTaskLog = kubernetesTaskRunner.streamTaskLog(taskid, offset); + if (kubernetesTaskLog.isPresent()) { + return kubernetesTaskLog; + } else if (workerTaskRunner instanceof TaskLogStreamer) { + return ((TaskLogStreamer) workerTaskRunner).streamTaskLog(taskid, offset); + } + return Optional.absent(); + } + + @Nullable + @Override + public RunnerTaskState getRunnerTaskState(String taskId) + { + RunnerTaskState runnerTaskState = kubernetesTaskRunner.getRunnerTaskState(taskId); + if (runnerTaskState == null) { + return workerTaskRunner.getRunnerTaskState(taskId); + } + + return runnerTaskState; + } + + @Override + public List<TaskRunner> getSubTaskRunners() + { + return ImmutableList.of(kubernetesTaskRunner, workerTaskRunner); + } + + @Override + public int getTotalCapacity() + { + return kubernetesTaskRunner.getTotalCapacity() + workerTaskRunner.getTotalCapacity(); + } + + @Override + public int getUsedCapacity() + { + return kubernetesTaskRunner.getUsedCapacity() + workerTaskRunner.getUsedCapacity(); + } Review Comment: The interface definitions say used capacity and total capacity can return -1 to indicate it is not implemented / tasks can not be found. It seems like there should be some special handling here if either task runner returns -1. However, looking at the implementations of the 3 task runners involved, it doesn't look like any of them return -1 ever. ########## docs/development/extensions-contrib/k8s-jobs.md: ########## @@ -272,4 +272,18 @@ roleRef: kind: Role name: druid-k8s-task-scheduler apiGroup: rbac.authorization.k8s.io -``` \ No newline at end of file +``` + +## Migration/Kubernetes and Worker Task Runner +If you are running a cluster with tasks running on middle managers or indexers and want to do a zero downtime migration to mm-less ingestion, the mm-less ingestion system is capable of running in migration mode by reading tasks from middle managers/indexers and Kubernetes and writing tasks to either middle managers or to Kubernetes. + +To do this, set the following property. +`druid.indexer.runner.type: k8sAndWorker` (instead of `druid.indexer.runner.type: k8s`) + Review Comment: It would be helpful here to describe the steps we envision an operator taking. For example: 1. Set `druid.indexer.runner.type: k8sAndWorker` and update the cluster 2. Let the cluster run for a while till all the tasks have switched over to running as k8s pods 3. Verify that the system is healthy and stable 4. Set `druid.indexer.runner.type: k8s` and update the cluster ########## extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.google.inject.Inject; +import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; +import org.apache.druid.indexing.overlord.TaskRunnerFactory; +import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; + + +public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<KubernetesAndWorkerTaskRunner> +{ + public static final String TYPE_NAME = "k8sAndWorker"; + + private final KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory; + private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory; + private final RemoteTaskRunnerFactory remoteTaskRunnerFactory; + private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig; + + private KubernetesAndWorkerTaskRunner runner; + + @Inject + public KubernetesAndWorkerTaskRunnerFactory( + KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory, + HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory, + RemoteTaskRunnerFactory remoteTaskRunnerFactory, + KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig + ) + { + this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory; + this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory; + this.remoteTaskRunnerFactory = remoteTaskRunnerFactory; + this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig; + } + + @Override + public KubernetesAndWorkerTaskRunner build() + { + runner = new KubernetesAndWorkerTaskRunner( + kubernetesTaskRunnerFactory.build(), + kubernetesAndWorkerTaskRunnerConfig.getWorkerTaskRunnerType().equals("remote") ? + remoteTaskRunnerFactory.build() : httpRemoteTaskRunnerFactory.build(), Review Comment: I think this could be pulled out into a new method and the string types `remote` and `httpRemote` should be explicitly handled. Alternatively, we can consider changing the config to a boolean that is something like `usesHttp` then this if else logic is easier to follow. What do you think? ########## docs/development/extensions-contrib/k8s-jobs.md: ########## @@ -272,4 +272,18 @@ roleRef: kind: Role name: druid-k8s-task-scheduler apiGroup: rbac.authorization.k8s.io -``` \ No newline at end of file +``` + +## Migration/Kubernetes and Worker Task Runner +If you are running a cluster with tasks running on middle managers or indexers and want to do a zero downtime migration to mm-less ingestion, the mm-less ingestion system is capable of running in migration mode by reading tasks from middle managers/indexers and Kubernetes and writing tasks to either middle managers or to Kubernetes. + +To do this, set the following property. +`druid.indexer.runner.type: k8sAndWorker` (instead of `druid.indexer.runner.type: k8s`) + +### Additional Configurations + +|Property| Possible Values | Description |Default|required| +|--------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|--------| Review Comment: ```suggestion |Property|Possible Values|Description|Default|Required| |---------|---------------|------------|-------|---------| ``` Can you use this formatting please. It is used elsewhere in the docs. If you are using intelliJ, you will need to go to Settings > Editor > Markdown and uncheck `Reformat table when typing` ########## extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang3.ObjectUtils; + +import javax.validation.constraints.NotNull; + +public class KubernetesAndWorkerTaskRunnerConfig +{ + + /** + * Select which worker task runner to use in addition to the Kubernetes runner, options are httpRemote or remote. + * */ + @JsonProperty + @NotNull + private String workerTaskRunnerType = "httpRemote"; + + /** + * Whether or not to send tasks to the worker task runner instead of the Kubernetes runner. + * */ + @JsonProperty + @NotNull + private Boolean sendAllTasksToWorkerTaskRunner; + + @JsonCreator + public KubernetesAndWorkerTaskRunnerConfig( + @JsonProperty("workerTaskRunnerType") String workerTaskRunnerType, + @JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean sendAllTasksToWorkerTaskRunner + ) + { + this.workerTaskRunnerType = ObjectUtils.defaultIfNull( Review Comment: Add validation and a nice error message that only `remote` and `httpRemote` are supported. ########## extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.TaskRunner; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.WorkerTaskRunner; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; +import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.tasklogs.TaskLogStreamer; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Mixed mode task runner that can run tasks on either Kubernetes or workers based on KubernetesAndWorkerTaskRunnerConfig. + * This task runner is always aware of task runner running on either system. + */ +public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTaskRunner +{ + private final KubernetesTaskRunner kubernetesTaskRunner; + private final WorkerTaskRunner workerTaskRunner; + private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig; + + public KubernetesAndWorkerTaskRunner( + KubernetesTaskRunner kubernetesTaskRunner, + WorkerTaskRunner workerTaskRunner, + KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig + ) + { + this.kubernetesTaskRunner = kubernetesTaskRunner; + this.workerTaskRunner = workerTaskRunner; + this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig; + } + + @Override + public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() + { + return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.restore(), workerTaskRunner.restore())); + } + + @Override + @LifecycleStart + public void start() + { + } + + @Override + public void registerListener(TaskRunnerListener listener, Executor executor) + { + kubernetesTaskRunner.registerListener(listener, executor); + workerTaskRunner.registerListener(listener, executor); + } + + @Override + public void unregisterListener(String listenerId) + { + kubernetesTaskRunner.unregisterListener(listenerId); + workerTaskRunner.unregisterListener(listenerId); + } + + @Override + public ListenableFuture<TaskStatus> run(Task task) + { + if (kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) { + return workerTaskRunner.run(task); + } else { + return kubernetesTaskRunner.run(task); + } + } + + @Override + public void shutdown(String taskid, String reason) + { + // Technically this is a no-op for tasks a runner does't know about. + workerTaskRunner.shutdown(taskid, reason); + kubernetesTaskRunner.shutdown(taskid, reason); + } + + @Override + @LifecycleStop + public void stop() + { + } + + @Override + public Collection<? extends TaskRunnerWorkItem> getRunningTasks() + { + return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getRunningTasks(), workerTaskRunner.getRunningTasks())); + } + + @Override + public Collection<? extends TaskRunnerWorkItem> getPendingTasks() + { + return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getPendingTasks(), workerTaskRunner.getPendingTasks())); + + } + + @Override + public Collection<? extends TaskRunnerWorkItem> getKnownTasks() + { + return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getKnownTasks(), workerTaskRunner.getKnownTasks())); + + } + + @Override + public Optional<ScalingStats> getScalingStats() + { + return workerTaskRunner.getScalingStats(); + } + + @Override + public Map<String, Long> getTotalTaskSlotCount() + { + Map<String, Long> taskSlotCounts = new HashMap<>(); + taskSlotCounts.putAll(kubernetesTaskRunner.getTotalTaskSlotCount()); + taskSlotCounts.putAll(workerTaskRunner.getTotalTaskSlotCount()); + return taskSlotCounts; + } + + @Override + public Map<String, Long> getIdleTaskSlotCount() + { + Map<String, Long> taskSlotCounts = new HashMap<>(); + taskSlotCounts.putAll(kubernetesTaskRunner.getIdleTaskSlotCount()); + taskSlotCounts.putAll(workerTaskRunner.getIdleTaskSlotCount()); Review Comment: nit: This will better handle when the k8sTaskRunner supports tier names that could collide with tier names from the workerTaskRunner. In the current implementation, the tier name for the K8sTaskRunner has a very low likelihood of colliding with the tier names from the worker task runner, which is why I'm calling this a nit. Similar comments for the other map merging ```suggestion Map<String, Long> taskSlotCounts = new HashMap<>(workerTaskRunner.getIdleTaskSlotCount()); kubernetesTaskRunner.getIdleTaskSlotCount().forEach((tier, count) -> taskSlotCounts.merge(tier, count, Long::sum)); ``` ########## extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang3.ObjectUtils; + +import javax.validation.constraints.NotNull; + +public class KubernetesAndWorkerTaskRunnerConfig +{ + + /** + * Select which worker task runner to use in addition to the Kubernetes runner, options are httpRemote or remote. + * */ + @JsonProperty + @NotNull + private String workerTaskRunnerType = "httpRemote"; Review Comment: Is there a way to get the default from CliOverlord#runnerConfigModule instead of this being hard coded here? ``` PolyBind.createChoice( binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(HttpRemoteTaskRunnerFactory.class) ); ``` ########## extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class KubernetesAndWorkerTaskRunnerConfigTest +{ + @Test + public void test_deserializable() throws IOException + { + ObjectMapper mapper = new DefaultObjectMapper(); + KubernetesAndWorkerTaskRunnerConfig config = mapper.readValue( + this.getClass().getClassLoader().getResource("kubernetesAndWorkerTaskRunnerConfig.json"), + KubernetesAndWorkerTaskRunnerConfig.class + ); + + Assert.assertEquals("remote", config.getWorkerTaskRunnerType()); + Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner()); + + } + + @Test + public void test_withDefaults() + { + KubernetesAndWorkerTaskRunnerConfig config = new KubernetesAndWorkerTaskRunnerConfig(null, null); + + Assert.assertEquals("httpRemote", config.getWorkerTaskRunnerType()); + Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner()); + } +} Review Comment: A test for invalid workerTaskRunnerType would be good to add. ########## extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.TaskRunner; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.WorkerTaskRunner; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; +import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.tasklogs.TaskLogStreamer; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Mixed mode task runner that can run tasks on either Kubernetes or workers based on KubernetesAndWorkerTaskRunnerConfig. + * This task runner is always aware of task runner running on either system. + */ +public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTaskRunner +{ + private final KubernetesTaskRunner kubernetesTaskRunner; + private final WorkerTaskRunner workerTaskRunner; + private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig; + + public KubernetesAndWorkerTaskRunner( + KubernetesTaskRunner kubernetesTaskRunner, + WorkerTaskRunner workerTaskRunner, + KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig + ) + { + this.kubernetesTaskRunner = kubernetesTaskRunner; + this.workerTaskRunner = workerTaskRunner; + this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig; + } + + @Override + public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() + { + return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.restore(), workerTaskRunner.restore())); + } + + @Override + @LifecycleStart + public void start() + { Review Comment: nit: A comment here (and in the stop method) about how lifecycle start and stop events are handled for the actual task runners and why it is not needed for this composite class would be helpful. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java: ########## @@ -136,6 +136,9 @@ public void becomeLeader() } leaderLifecycle.addManagedInstance(taskRunner); + for (TaskRunner subTaskRunner : taskRunner.getSubTaskRunners()) { + leaderLifecycle.addManagedInstance(subTaskRunner); + } Review Comment: Is this change needed if `KubernetesAndWorkerTaskRunner#start` and `KubernetesAndWorkerTaskRunner#stop` call start and stop on the task runners they are delegating to? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
