suneet-s commented on code in PR #14918:
URL: https://github.com/apache/druid/pull/14918#discussion_r1317700175


##########
docs/development/extensions-contrib/k8s-jobs.md:
##########
@@ -272,4 +272,18 @@ roleRef:
   kind: Role
   name: druid-k8s-task-scheduler
   apiGroup: rbac.authorization.k8s.io
-```
\ No newline at end of file
+```
+
+## Migration/Kubernetes and Worker Task Runner
+If you are running a cluster with tasks running on middle managers or indexers 
and want to do a zero downtime migration to mm-less ingestion, the mm-less 
ingestion system is capable of running in migration mode by reading tasks from 
middle managers/indexers and Kubernetes and writing tasks to either middle 
managers or to Kubernetes.
+
+To do this, set the following property.
+`druid.indexer.runner.type: k8sAndWorker` (instead of 
`druid.indexer.runner.type: k8s`)

Review Comment:
   ```suggestion
   `druid.indexer.runner.type: k8sAndWorker`
   ```



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Mixed mode task runner that can run tasks on either Kubernetes or workers 
based on KubernetesAndWorkerTaskRunnerConfig.
+ * This task runner is always aware of task runner running on either system.
+ */
+public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, 
WorkerTaskRunner
+{
+  private final KubernetesTaskRunner kubernetesTaskRunner;
+  private final WorkerTaskRunner workerTaskRunner;
+  private final KubernetesAndWorkerTaskRunnerConfig 
kubernetesAndWorkerTaskRunnerConfig;
+
+  public KubernetesAndWorkerTaskRunner(
+      KubernetesTaskRunner kubernetesTaskRunner,
+      WorkerTaskRunner workerTaskRunner,
+      KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
+  )
+  {
+    this.kubernetesTaskRunner = kubernetesTaskRunner;
+    this.workerTaskRunner = workerTaskRunner;
+    this.kubernetesAndWorkerTaskRunnerConfig = 
kubernetesAndWorkerTaskRunnerConfig;
+  }
+
+  @Override
+  public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
+  {
+    return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.restore(), 
workerTaskRunner.restore()));
+  }
+
+  @Override
+  @LifecycleStart
+  public void start()
+  {
+  }
+
+  @Override
+  public void registerListener(TaskRunnerListener listener, Executor executor)
+  {
+    kubernetesTaskRunner.registerListener(listener, executor);
+    workerTaskRunner.registerListener(listener, executor);
+  }
+
+  @Override
+  public void unregisterListener(String listenerId)
+  {
+    kubernetesTaskRunner.unregisterListener(listenerId);
+    workerTaskRunner.unregisterListener(listenerId);
+  }
+
+  @Override
+  public ListenableFuture<TaskStatus> run(Task task)
+  {
+    if 
(kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) {
+      return workerTaskRunner.run(task);
+    } else {
+      return kubernetesTaskRunner.run(task);
+    }
+  }
+
+  @Override
+  public void shutdown(String taskid, String reason)
+  {
+    // Technically this is a no-op for tasks a runner does't know about.
+    workerTaskRunner.shutdown(taskid, reason);
+    kubernetesTaskRunner.shutdown(taskid, reason);
+  }
+
+  @Override
+  @LifecycleStop
+  public void stop()
+  {
+  }
+
+  @Override
+  public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
+  {
+    return 
Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getRunningTasks(), 
workerTaskRunner.getRunningTasks()));
+  }
+
+  @Override
+  public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
+  {
+    return 
Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getPendingTasks(), 
workerTaskRunner.getPendingTasks()));
+
+  }
+
+  @Override
+  public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
+  {
+    return 
Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getKnownTasks(), 
workerTaskRunner.getKnownTasks()));
+
+  }
+
+  @Override
+  public Optional<ScalingStats> getScalingStats()
+  {
+    return workerTaskRunner.getScalingStats();
+  }
+
+  @Override
+  public Map<String, Long> getTotalTaskSlotCount()
+  {
+    Map<String, Long> taskSlotCounts = new HashMap<>();
+    taskSlotCounts.putAll(kubernetesTaskRunner.getTotalTaskSlotCount());
+    taskSlotCounts.putAll(workerTaskRunner.getTotalTaskSlotCount());
+    return taskSlotCounts;
+  }
+
+  @Override
+  public Map<String, Long> getIdleTaskSlotCount()
+  {
+    Map<String, Long> taskSlotCounts = new HashMap<>();
+    taskSlotCounts.putAll(kubernetesTaskRunner.getIdleTaskSlotCount());
+    taskSlotCounts.putAll(workerTaskRunner.getIdleTaskSlotCount());
+    return taskSlotCounts;
+  }
+
+  @Override
+  public Map<String, Long> getUsedTaskSlotCount()
+  {
+    Map<String, Long> taskSlotCounts = new HashMap<>();
+    taskSlotCounts.putAll(kubernetesTaskRunner.getUsedTaskSlotCount());
+    taskSlotCounts.putAll(workerTaskRunner.getUsedTaskSlotCount());
+    return taskSlotCounts;
+  }
+
+  @Override
+  public Map<String, Long> getLazyTaskSlotCount()
+  {
+    Map<String, Long> taskSlotCounts = new HashMap<>();
+    taskSlotCounts.putAll(kubernetesTaskRunner.getLazyTaskSlotCount());
+    taskSlotCounts.putAll(workerTaskRunner.getLazyTaskSlotCount());
+    return taskSlotCounts;
+  }
+
+  @Override
+  public Map<String, Long> getBlacklistedTaskSlotCount()
+  {
+    Map<String, Long> taskSlotCounts = new HashMap<>();
+    taskSlotCounts.putAll(kubernetesTaskRunner.getBlacklistedTaskSlotCount());
+    taskSlotCounts.putAll(workerTaskRunner.getBlacklistedTaskSlotCount());
+    return taskSlotCounts;
+  }
+
+  @Override
+  public Collection<ImmutableWorkerInfo> getWorkers()
+  {
+    return workerTaskRunner.getWorkers();
+  }
+
+  @Override
+  public Collection<Worker> getLazyWorkers()
+  {
+    return workerTaskRunner.getLazyWorkers();
+  }
+
+  @Override
+  public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> 
isLazyWorker, int maxWorkers)
+  {
+    return workerTaskRunner.markWorkersLazy(isLazyWorker, maxWorkers);
+  }
+
+  @Override
+  public WorkerTaskRunnerConfig getConfig()
+  {
+    return workerTaskRunner.getConfig();
+  }
+
+  @Override
+  public Collection<Task> getPendingTaskPayloads()
+  {
+    return workerTaskRunner.getPendingTaskPayloads();
+  }
+
+  @Override
+  public Optional<InputStream> streamTaskLog(String taskid, long offset) 
throws IOException
+  {
+    Optional<InputStream> kubernetesTaskLog = 
kubernetesTaskRunner.streamTaskLog(taskid, offset);
+    if (kubernetesTaskLog.isPresent()) {
+      return kubernetesTaskLog;
+    } else if (workerTaskRunner instanceof TaskLogStreamer) {
+      return ((TaskLogStreamer) workerTaskRunner).streamTaskLog(taskid, 
offset);
+    }
+    return Optional.absent();
+  }
+
+  @Nullable
+  @Override
+  public RunnerTaskState getRunnerTaskState(String taskId)
+  {
+    RunnerTaskState runnerTaskState = 
kubernetesTaskRunner.getRunnerTaskState(taskId);
+    if (runnerTaskState == null) {
+      return workerTaskRunner.getRunnerTaskState(taskId);
+    }
+
+    return runnerTaskState;
+  }
+
+  @Override
+  public List<TaskRunner> getSubTaskRunners()
+  {
+    return ImmutableList.of(kubernetesTaskRunner, workerTaskRunner);
+  }
+
+  @Override
+  public int getTotalCapacity()
+  {
+    return kubernetesTaskRunner.getTotalCapacity() + 
workerTaskRunner.getTotalCapacity();
+  }
+
+  @Override
+  public int getUsedCapacity()
+  {
+    return kubernetesTaskRunner.getUsedCapacity() + 
workerTaskRunner.getUsedCapacity();
+  }

Review Comment:
   The interface definitions say used capacity and total capacity can return -1 
to indicate it is not implemented / tasks can not be found. It seems like there 
should be some special handling here if either task runner returns -1.
   
   However, looking at the implementations of the 3 task runners involved, it 
doesn't look like any of them return -1 ever.



##########
docs/development/extensions-contrib/k8s-jobs.md:
##########
@@ -272,4 +272,18 @@ roleRef:
   kind: Role
   name: druid-k8s-task-scheduler
   apiGroup: rbac.authorization.k8s.io
-```
\ No newline at end of file
+```
+
+## Migration/Kubernetes and Worker Task Runner
+If you are running a cluster with tasks running on middle managers or indexers 
and want to do a zero downtime migration to mm-less ingestion, the mm-less 
ingestion system is capable of running in migration mode by reading tasks from 
middle managers/indexers and Kubernetes and writing tasks to either middle 
managers or to Kubernetes.
+
+To do this, set the following property.
+`druid.indexer.runner.type: k8sAndWorker` (instead of 
`druid.indexer.runner.type: k8s`)
+

Review Comment:
   It would be helpful here to describe the steps we envision an operator 
taking. For example:
   
   1. Set `druid.indexer.runner.type: k8sAndWorker` and update the cluster
   2. Let the cluster run for a while till all the tasks have switched over to 
running as k8s pods
   3. Verify that the system is healthy and stable
   4. Set `druid.indexer.runner.type: k8s` and update the cluster
   



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
+import org.apache.druid.indexing.overlord.TaskRunnerFactory;
+import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
+
+
+public class KubernetesAndWorkerTaskRunnerFactory implements 
TaskRunnerFactory<KubernetesAndWorkerTaskRunner>
+{
+  public static final String TYPE_NAME = "k8sAndWorker";
+
+  private final KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
+  private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
+  private final RemoteTaskRunnerFactory remoteTaskRunnerFactory;
+  private final KubernetesAndWorkerTaskRunnerConfig 
kubernetesAndWorkerTaskRunnerConfig;
+
+  private KubernetesAndWorkerTaskRunner runner;
+
+  @Inject
+  public KubernetesAndWorkerTaskRunnerFactory(
+      KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory,
+      HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory,
+      RemoteTaskRunnerFactory remoteTaskRunnerFactory,
+      KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
+  )
+  {
+    this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory;
+    this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory;
+    this.remoteTaskRunnerFactory = remoteTaskRunnerFactory;
+    this.kubernetesAndWorkerTaskRunnerConfig = 
kubernetesAndWorkerTaskRunnerConfig;
+  }
+
+  @Override
+  public KubernetesAndWorkerTaskRunner build()
+  {
+    runner = new KubernetesAndWorkerTaskRunner(
+        kubernetesTaskRunnerFactory.build(),
+        
kubernetesAndWorkerTaskRunnerConfig.getWorkerTaskRunnerType().equals("remote") ?
+            remoteTaskRunnerFactory.build() : 
httpRemoteTaskRunnerFactory.build(),

Review Comment:
   I think this could be pulled out into a new method and the string types 
`remote` and `httpRemote` should be explicitly handled. Alternatively, we can 
consider changing the config to a boolean that is something like `usesHttp` 
then this if else logic is easier to follow.
   
   What do you think?



##########
docs/development/extensions-contrib/k8s-jobs.md:
##########
@@ -272,4 +272,18 @@ roleRef:
   kind: Role
   name: druid-k8s-task-scheduler
   apiGroup: rbac.authorization.k8s.io
-```
\ No newline at end of file
+```
+
+## Migration/Kubernetes and Worker Task Runner
+If you are running a cluster with tasks running on middle managers or indexers 
and want to do a zero downtime migration to mm-less ingestion, the mm-less 
ingestion system is capable of running in migration mode by reading tasks from 
middle managers/indexers and Kubernetes and writing tasks to either middle 
managers or to Kubernetes.
+
+To do this, set the following property.
+`druid.indexer.runner.type: k8sAndWorker` (instead of 
`druid.indexer.runner.type: k8s`)
+
+### Additional Configurations
+
+|Property| Possible Values | Description                                       
                                                                                
                                                                                
                               |Default|required|
+|--------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|--------|

Review Comment:
   ```suggestion
   |Property|Possible Values|Description|Default|Required|
   |---------|---------------|------------|-------|---------|
   ```
   
   Can you use this formatting please. It is used elsewhere in the docs. If you 
are using intelliJ, you will need to go to Settings > Editor > Markdown and 
uncheck `Reformat table when typing`



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang3.ObjectUtils;
+
+import javax.validation.constraints.NotNull;
+
+public class KubernetesAndWorkerTaskRunnerConfig
+{
+
+  /**
+  * Select which worker task runner to use in addition to the Kubernetes 
runner, options are httpRemote or remote.
+   * */
+  @JsonProperty
+  @NotNull
+  private String workerTaskRunnerType = "httpRemote";
+
+  /**
+   * Whether or not to send tasks to the worker task runner instead of the 
Kubernetes runner.
+   * */
+  @JsonProperty
+  @NotNull
+  private Boolean sendAllTasksToWorkerTaskRunner;
+
+  @JsonCreator
+  public KubernetesAndWorkerTaskRunnerConfig(
+      @JsonProperty("workerTaskRunnerType") String workerTaskRunnerType,
+      @JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean 
sendAllTasksToWorkerTaskRunner
+  )
+  {
+    this.workerTaskRunnerType = ObjectUtils.defaultIfNull(

Review Comment:
   Add validation and a nice error message that only `remote` and `httpRemote` 
are supported.



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Mixed mode task runner that can run tasks on either Kubernetes or workers 
based on KubernetesAndWorkerTaskRunnerConfig.
+ * This task runner is always aware of task runner running on either system.
+ */
+public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, 
WorkerTaskRunner
+{
+  private final KubernetesTaskRunner kubernetesTaskRunner;
+  private final WorkerTaskRunner workerTaskRunner;
+  private final KubernetesAndWorkerTaskRunnerConfig 
kubernetesAndWorkerTaskRunnerConfig;
+
+  public KubernetesAndWorkerTaskRunner(
+      KubernetesTaskRunner kubernetesTaskRunner,
+      WorkerTaskRunner workerTaskRunner,
+      KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
+  )
+  {
+    this.kubernetesTaskRunner = kubernetesTaskRunner;
+    this.workerTaskRunner = workerTaskRunner;
+    this.kubernetesAndWorkerTaskRunnerConfig = 
kubernetesAndWorkerTaskRunnerConfig;
+  }
+
+  @Override
+  public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
+  {
+    return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.restore(), 
workerTaskRunner.restore()));
+  }
+
+  @Override
+  @LifecycleStart
+  public void start()
+  {
+  }
+
+  @Override
+  public void registerListener(TaskRunnerListener listener, Executor executor)
+  {
+    kubernetesTaskRunner.registerListener(listener, executor);
+    workerTaskRunner.registerListener(listener, executor);
+  }
+
+  @Override
+  public void unregisterListener(String listenerId)
+  {
+    kubernetesTaskRunner.unregisterListener(listenerId);
+    workerTaskRunner.unregisterListener(listenerId);
+  }
+
+  @Override
+  public ListenableFuture<TaskStatus> run(Task task)
+  {
+    if 
(kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) {
+      return workerTaskRunner.run(task);
+    } else {
+      return kubernetesTaskRunner.run(task);
+    }
+  }
+
+  @Override
+  public void shutdown(String taskid, String reason)
+  {
+    // Technically this is a no-op for tasks a runner does't know about.
+    workerTaskRunner.shutdown(taskid, reason);
+    kubernetesTaskRunner.shutdown(taskid, reason);
+  }
+
+  @Override
+  @LifecycleStop
+  public void stop()
+  {
+  }
+
+  @Override
+  public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
+  {
+    return 
Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getRunningTasks(), 
workerTaskRunner.getRunningTasks()));
+  }
+
+  @Override
+  public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
+  {
+    return 
Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getPendingTasks(), 
workerTaskRunner.getPendingTasks()));
+
+  }
+
+  @Override
+  public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
+  {
+    return 
Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getKnownTasks(), 
workerTaskRunner.getKnownTasks()));
+
+  }
+
+  @Override
+  public Optional<ScalingStats> getScalingStats()
+  {
+    return workerTaskRunner.getScalingStats();
+  }
+
+  @Override
+  public Map<String, Long> getTotalTaskSlotCount()
+  {
+    Map<String, Long> taskSlotCounts = new HashMap<>();
+    taskSlotCounts.putAll(kubernetesTaskRunner.getTotalTaskSlotCount());
+    taskSlotCounts.putAll(workerTaskRunner.getTotalTaskSlotCount());
+    return taskSlotCounts;
+  }
+
+  @Override
+  public Map<String, Long> getIdleTaskSlotCount()
+  {
+    Map<String, Long> taskSlotCounts = new HashMap<>();
+    taskSlotCounts.putAll(kubernetesTaskRunner.getIdleTaskSlotCount());
+    taskSlotCounts.putAll(workerTaskRunner.getIdleTaskSlotCount());

Review Comment:
   nit: This will better handle when the k8sTaskRunner supports tier names that 
could collide with tier names from the workerTaskRunner. In the current 
implementation, the tier name for the K8sTaskRunner has a very low likelihood 
of colliding with the tier names from the worker task runner, which is why I'm 
calling this a nit. Similar comments for the other map merging
   
   ```suggestion
       Map<String, Long> taskSlotCounts = new 
HashMap<>(workerTaskRunner.getIdleTaskSlotCount());
       kubernetesTaskRunner.getIdleTaskSlotCount().forEach((tier, count) -> 
taskSlotCounts.merge(tier, count, Long::sum));
   ```



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang3.ObjectUtils;
+
+import javax.validation.constraints.NotNull;
+
+public class KubernetesAndWorkerTaskRunnerConfig
+{
+
+  /**
+  * Select which worker task runner to use in addition to the Kubernetes 
runner, options are httpRemote or remote.
+   * */
+  @JsonProperty
+  @NotNull
+  private String workerTaskRunnerType = "httpRemote";

Review Comment:
   Is there a way to get the default from CliOverlord#runnerConfigModule 
instead of this being hard coded here?
   
   ```
   PolyBind.createChoice(
       binder,
       "druid.indexer.runner.type",
       Key.get(TaskRunnerFactory.class),
       Key.get(HttpRemoteTaskRunnerFactory.class)
   );
   ```



##########
extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class KubernetesAndWorkerTaskRunnerConfigTest
+{
+  @Test
+  public void test_deserializable() throws IOException
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    KubernetesAndWorkerTaskRunnerConfig config = mapper.readValue(
+        
this.getClass().getClassLoader().getResource("kubernetesAndWorkerTaskRunnerConfig.json"),
+        KubernetesAndWorkerTaskRunnerConfig.class
+    );
+
+    Assert.assertEquals("remote", config.getWorkerTaskRunnerType());
+    Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
+
+  }
+
+  @Test
+  public void test_withDefaults()
+  {
+    KubernetesAndWorkerTaskRunnerConfig config = new 
KubernetesAndWorkerTaskRunnerConfig(null, null);
+
+    Assert.assertEquals("httpRemote", config.getWorkerTaskRunnerType());
+    Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
+  }
+}

Review Comment:
   A test for invalid workerTaskRunnerType would be good to add.



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Mixed mode task runner that can run tasks on either Kubernetes or workers 
based on KubernetesAndWorkerTaskRunnerConfig.
+ * This task runner is always aware of task runner running on either system.
+ */
+public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, 
WorkerTaskRunner
+{
+  private final KubernetesTaskRunner kubernetesTaskRunner;
+  private final WorkerTaskRunner workerTaskRunner;
+  private final KubernetesAndWorkerTaskRunnerConfig 
kubernetesAndWorkerTaskRunnerConfig;
+
+  public KubernetesAndWorkerTaskRunner(
+      KubernetesTaskRunner kubernetesTaskRunner,
+      WorkerTaskRunner workerTaskRunner,
+      KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
+  )
+  {
+    this.kubernetesTaskRunner = kubernetesTaskRunner;
+    this.workerTaskRunner = workerTaskRunner;
+    this.kubernetesAndWorkerTaskRunnerConfig = 
kubernetesAndWorkerTaskRunnerConfig;
+  }
+
+  @Override
+  public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
+  {
+    return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.restore(), 
workerTaskRunner.restore()));
+  }
+
+  @Override
+  @LifecycleStart
+  public void start()
+  {

Review Comment:
   nit: A comment here (and in the stop method) about how lifecycle start and 
stop events are handled for the actual task runners and why it is not needed 
for this composite class would be helpful.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java:
##########
@@ -136,6 +136,9 @@ public void becomeLeader()
           }
 
           leaderLifecycle.addManagedInstance(taskRunner);
+          for (TaskRunner subTaskRunner : taskRunner.getSubTaskRunners()) {
+            leaderLifecycle.addManagedInstance(subTaskRunner);
+          }

Review Comment:
   Is this change needed if `KubernetesAndWorkerTaskRunner#start` and 
`KubernetesAndWorkerTaskRunner#stop` call start and stop on the task runners 
they are delegating to?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to