nishantmonu51 commented on a change in pull request #10910:
URL: https://github.com/apache/druid/pull/10910#discussion_r603927681



##########
File path: 
extensions-contrib/kubernetes-middlemanager-extensions/src/main/java/org/apache/druid/k8s/middlemanager/K8sForkingTaskRunner.java
##########
@@ -0,0 +1,970 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.middlemanager;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteSink;
+import com.google.common.io.ByteSource;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.FileWriteMode;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1Pod;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.tasklogs.LogUtils;
+import org.apache.druid.indexing.overlord.BaseRestorableTaskRunner;
+import org.apache.druid.indexing.overlord.PortFinder;
+import org.apache.druid.indexing.overlord.TaskRunnerUtils;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.k8s.middlemanager.common.K8sApiClient;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.server.metrics.MonitorsConfig;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Runs tasks in separate processes using the "internal peon" verb.
+ */
+public class K8sForkingTaskRunner
+    extends 
BaseRestorableTaskRunner<K8sForkingTaskRunner.K8sForkingTaskRunnerWorkItem>
+    implements TaskLogStreamer
+
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(K8sForkingTaskRunner.class);
+  private static final String CHILD_PROPERTY_PREFIX = 
"druid.indexer.fork.property.";
+  private static final String DRUID_INDEXER_NAMESPACE = 
"druid.indexer.namespace";
+  private static final String DRUID_INDEXER_IMAGE = "druid.indexer.image";
+  private static final String DRUID_INDEXER_SERVICE_ACCOUNT_NAME = 
"druid.indexer.serviceAccountName";
+  private static final String DRUID_INDEXER_DEFAULT_POD_CPU = 
"druid.indexer.default.pod.cpu";
+  private static final String DRUID_INDEXER_DEFAULT_POD_MEMORY = 
"druid.indexer.default.pod.memory";
+  private static final String DRUID_INDEXER_RUNNER_HOST_PATH = 
"druid.indexer.runner.hostPath";
+  private static final String DRUID_INDEXER_RUNNER_MOUNT_PATH = 
"druid.indexer.runner.mountPath";
+  private static final String DRUID_PEON_JAVA_OPTS = "druid.peon.javaOpts";
+  private static final String DRUID_PEON_POD_MEMORY = "druid.peon.pod.memory";
+  private static final String DRUID_PEON_POD_CPU = "druid.peon.pod.cpu";
+  private static final String LABEL_KEY = "druid.ingest.task.id";

Review comment:
       task Runner seems to be hardcoding most of the parameters/configs to be 
used by the task Runner, Is it possible to use something like Helm charts and 
templates ? The goal is for the devOps person to maintain those templates and 
from druid side, the task runner would make use of those templates to create a 
Job ?

##########
File path: 
extensions-contrib/kubernetes-middlemanager-extensions/src/main/java/org/apache/druid/k8s/middlemanager/K8sForkingTaskRunner.java
##########
@@ -0,0 +1,970 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.middlemanager;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteSink;
+import com.google.common.io.ByteSource;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.FileWriteMode;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1Pod;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.tasklogs.LogUtils;
+import org.apache.druid.indexing.overlord.BaseRestorableTaskRunner;
+import org.apache.druid.indexing.overlord.PortFinder;
+import org.apache.druid.indexing.overlord.TaskRunnerUtils;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.k8s.middlemanager.common.K8sApiClient;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.server.metrics.MonitorsConfig;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Runs tasks in separate processes using the "internal peon" verb.
+ */
+public class K8sForkingTaskRunner
+    extends 
BaseRestorableTaskRunner<K8sForkingTaskRunner.K8sForkingTaskRunnerWorkItem>
+    implements TaskLogStreamer
+
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(K8sForkingTaskRunner.class);
+  private static final String CHILD_PROPERTY_PREFIX = 
"druid.indexer.fork.property.";
+  private static final String DRUID_INDEXER_NAMESPACE = 
"druid.indexer.namespace";
+  private static final String DRUID_INDEXER_IMAGE = "druid.indexer.image";
+  private static final String DRUID_INDEXER_SERVICE_ACCOUNT_NAME = 
"druid.indexer.serviceAccountName";
+  private static final String DRUID_INDEXER_DEFAULT_POD_CPU = 
"druid.indexer.default.pod.cpu";
+  private static final String DRUID_INDEXER_DEFAULT_POD_MEMORY = 
"druid.indexer.default.pod.memory";
+  private static final String DRUID_INDEXER_RUNNER_HOST_PATH = 
"druid.indexer.runner.hostPath";
+  private static final String DRUID_INDEXER_RUNNER_MOUNT_PATH = 
"druid.indexer.runner.mountPath";
+  private static final String DRUID_PEON_JAVA_OPTS = "druid.peon.javaOpts";
+  private static final String DRUID_PEON_POD_MEMORY = "druid.peon.pod.memory";
+  private static final String DRUID_PEON_POD_CPU = "druid.peon.pod.cpu";
+  private static final String LABEL_KEY = "druid.ingest.task.id";
+  private final ForkingTaskRunnerConfig config;
+  private final Properties props;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final ListeningExecutorService exec;
+  private final PortFinder portFinder;
+  private final StartupLoggingConfig startupLoggingConfig;
+  private final K8sApiClient k8sApiClient;
+
+  private volatile boolean stopping = false;
+  private final String nameSpace;
+  private final String image;
+  private final String defaultPodCPU;
+  private final String defaultPodMemory;
+  private final String serviceAccountName;
+
+  @Inject
+  public K8sForkingTaskRunner(
+      ForkingTaskRunnerConfig config,
+      TaskConfig taskConfig,
+      WorkerConfig workerConfig,
+      Properties props,
+      TaskLogPusher taskLogPusher,
+      ObjectMapper jsonMapper,
+      @Self DruidNode node,
+      StartupLoggingConfig startupLoggingConfig,
+      K8sApiClient k8sApiClient
+  )
+  {
+    super(jsonMapper, taskConfig);
+    this.config = config;
+    this.props = props;
+    this.taskLogPusher = taskLogPusher;
+    this.node = node;
+    this.portFinder = new PortFinder(config.getStartPort(), 
config.getEndPort(), config.getPorts());
+    this.startupLoggingConfig = startupLoggingConfig;
+    this.exec = MoreExecutors.listeningDecorator(
+        Execs.multiThreaded(workerConfig.getCapacity(), 
"forking-task-runner-%d")
+    );
+    this.k8sApiClient = k8sApiClient;
+    this.nameSpace = props.getProperty(DRUID_INDEXER_NAMESPACE, "default");
+    this.image = props.getProperty(DRUID_INDEXER_IMAGE, "druid/cluster:v1");
+    this.serviceAccountName = 
props.getProperty(DRUID_INDEXER_SERVICE_ACCOUNT_NAME, "default");
+    this.defaultPodCPU = props.getProperty(DRUID_INDEXER_DEFAULT_POD_CPU, "1");
+    this.defaultPodMemory = 
props.getProperty(DRUID_INDEXER_DEFAULT_POD_MEMORY, "2G");
+
+    assert image != null;
+  }
+
+  @Override
+  public ListenableFuture<TaskStatus> run(final Task task)
+  {
+    synchronized (tasks) {
+      tasks.computeIfAbsent(
+          task.getId(), k ->
+          new K8sForkingTaskRunnerWorkItem(
+            task,
+            exec.submit(
+              new Callable<TaskStatus>() {
+                @Override
+                public TaskStatus call()
+                {
+                  final String attemptUUID = UUID.randomUUID().toString();
+                  final File taskDir = taskConfig.getTaskDir(task.getId());
+                  final File attemptDir = new File(taskDir, attemptUUID);
+
+                  final K8sProcessHolder processHolder;
+                  // POD_IP is defined in env when create peon pod.
+                  final String childHost = "$POD_IP";
+                  final String tmpFileLoc = "/druidTmp";
+                  int childPort = -1;
+                  int tlsChildPort = -1;
+
+                  if (node.isEnablePlaintextPort()) {
+                    childPort = portFinder.findUnusedPort();
+                  }
+
+                  if (node.isEnableTlsPort()) {
+                    tlsChildPort = portFinder.findUnusedPort();
+                  }
+
+                  TaskLocation taskLocation;
+
+                  try {
+                    final Closer closer = Closer.create();
+                    try {
+
+                      final File taskFile = new File(taskDir, "task.json");
+                      final File statusFile = new File(attemptDir, 
"status.json");
+
+                      final File logFile = new File(taskDir, "log");
+
+                      if (!logFile.exists()) {
+                        if (taskDir.exists()) {
+                          logFile.createNewFile();
+                        } else {
+                          taskDir.mkdirs();
+                          logFile.createNewFile();
+                        }
+                      }
+                      attemptDir.mkdirs();
+
+                      final File reportsFile = new File(attemptDir, 
"report.json");
+                      // time to adjust process holders
+                      synchronized (tasks) {
+                        // replace all the ": - . _" to "", try to reduce the 
length of pod name and meet pod naming specifications 63 charts.
+                        final String label_value_ori = StringUtils.toLowerCase(
+                                StringUtils.replace(
+                                        StringUtils.replace(
+                                                StringUtils.replace(
+                                                        StringUtils.replace(
+                                                                task.getId(),
+                                                                "_", ""),
+                                                        ":", ""),
+                                                ".", ""),
+                                        "-", ""));
+                        String label_value;

Review comment:
       nit: refactor and extract to sanitizeName method

##########
File path: 
extensions-contrib/kubernetes-middlemanager-extensions/src/main/java/org/apache/druid/k8s/middlemanager/K8sForkingTaskRunner.java
##########
@@ -0,0 +1,970 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.middlemanager;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteSink;
+import com.google.common.io.ByteSource;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.FileWriteMode;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1Pod;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.tasklogs.LogUtils;
+import org.apache.druid.indexing.overlord.BaseRestorableTaskRunner;
+import org.apache.druid.indexing.overlord.PortFinder;
+import org.apache.druid.indexing.overlord.TaskRunnerUtils;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.k8s.middlemanager.common.K8sApiClient;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.server.metrics.MonitorsConfig;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Runs tasks in separate processes using the "internal peon" verb.
+ */
+public class K8sForkingTaskRunner
+    extends 
BaseRestorableTaskRunner<K8sForkingTaskRunner.K8sForkingTaskRunnerWorkItem>
+    implements TaskLogStreamer
+
+{
+  private static final EmittingLogger LOGGER = new 
EmittingLogger(K8sForkingTaskRunner.class);
+  private static final String CHILD_PROPERTY_PREFIX = 
"druid.indexer.fork.property.";
+  private static final String DRUID_INDEXER_NAMESPACE = 
"druid.indexer.namespace";
+  private static final String DRUID_INDEXER_IMAGE = "druid.indexer.image";
+  private static final String DRUID_INDEXER_SERVICE_ACCOUNT_NAME = 
"druid.indexer.serviceAccountName";
+  private static final String DRUID_INDEXER_DEFAULT_POD_CPU = 
"druid.indexer.default.pod.cpu";
+  private static final String DRUID_INDEXER_DEFAULT_POD_MEMORY = 
"druid.indexer.default.pod.memory";
+  private static final String DRUID_INDEXER_RUNNER_HOST_PATH = 
"druid.indexer.runner.hostPath";
+  private static final String DRUID_INDEXER_RUNNER_MOUNT_PATH = 
"druid.indexer.runner.mountPath";
+  private static final String DRUID_PEON_JAVA_OPTS = "druid.peon.javaOpts";
+  private static final String DRUID_PEON_POD_MEMORY = "druid.peon.pod.memory";
+  private static final String DRUID_PEON_POD_CPU = "druid.peon.pod.cpu";
+  private static final String LABEL_KEY = "druid.ingest.task.id";
+  private final ForkingTaskRunnerConfig config;
+  private final Properties props;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final ListeningExecutorService exec;
+  private final PortFinder portFinder;
+  private final StartupLoggingConfig startupLoggingConfig;
+  private final K8sApiClient k8sApiClient;
+
+  private volatile boolean stopping = false;
+  private final String nameSpace;
+  private final String image;
+  private final String defaultPodCPU;
+  private final String defaultPodMemory;
+  private final String serviceAccountName;
+
+  @Inject
+  public K8sForkingTaskRunner(
+      ForkingTaskRunnerConfig config,
+      TaskConfig taskConfig,
+      WorkerConfig workerConfig,
+      Properties props,
+      TaskLogPusher taskLogPusher,
+      ObjectMapper jsonMapper,
+      @Self DruidNode node,
+      StartupLoggingConfig startupLoggingConfig,
+      K8sApiClient k8sApiClient
+  )
+  {
+    super(jsonMapper, taskConfig);
+    this.config = config;
+    this.props = props;
+    this.taskLogPusher = taskLogPusher;
+    this.node = node;
+    this.portFinder = new PortFinder(config.getStartPort(), 
config.getEndPort(), config.getPorts());
+    this.startupLoggingConfig = startupLoggingConfig;
+    this.exec = MoreExecutors.listeningDecorator(
+        Execs.multiThreaded(workerConfig.getCapacity(), 
"forking-task-runner-%d")
+    );
+    this.k8sApiClient = k8sApiClient;
+    this.nameSpace = props.getProperty(DRUID_INDEXER_NAMESPACE, "default");
+    this.image = props.getProperty(DRUID_INDEXER_IMAGE, "druid/cluster:v1");
+    this.serviceAccountName = 
props.getProperty(DRUID_INDEXER_SERVICE_ACCOUNT_NAME, "default");
+    this.defaultPodCPU = props.getProperty(DRUID_INDEXER_DEFAULT_POD_CPU, "1");
+    this.defaultPodMemory = 
props.getProperty(DRUID_INDEXER_DEFAULT_POD_MEMORY, "2G");
+
+    assert image != null;
+  }
+
+  @Override
+  public ListenableFuture<TaskStatus> run(final Task task)
+  {
+    synchronized (tasks) {
+      tasks.computeIfAbsent(
+          task.getId(), k ->
+          new K8sForkingTaskRunnerWorkItem(
+            task,
+            exec.submit(
+              new Callable<TaskStatus>() {
+                @Override
+                public TaskStatus call()
+                {
+                  final String attemptUUID = UUID.randomUUID().toString();
+                  final File taskDir = taskConfig.getTaskDir(task.getId());
+                  final File attemptDir = new File(taskDir, attemptUUID);
+
+                  final K8sProcessHolder processHolder;
+                  // POD_IP is defined in env when create peon pod.
+                  final String childHost = "$POD_IP";
+                  final String tmpFileLoc = "/druidTmp";
+                  int childPort = -1;
+                  int tlsChildPort = -1;
+
+                  if (node.isEnablePlaintextPort()) {
+                    childPort = portFinder.findUnusedPort();
+                  }
+
+                  if (node.isEnableTlsPort()) {
+                    tlsChildPort = portFinder.findUnusedPort();
+                  }
+
+                  TaskLocation taskLocation;
+
+                  try {
+                    final Closer closer = Closer.create();
+                    try {
+
+                      final File taskFile = new File(taskDir, "task.json");
+                      final File statusFile = new File(attemptDir, 
"status.json");
+
+                      final File logFile = new File(taskDir, "log");
+
+                      if (!logFile.exists()) {
+                        if (taskDir.exists()) {
+                          logFile.createNewFile();
+                        } else {
+                          taskDir.mkdirs();
+                          logFile.createNewFile();
+                        }
+                      }
+                      attemptDir.mkdirs();
+
+                      final File reportsFile = new File(attemptDir, 
"report.json");
+                      // time to adjust process holders
+                      synchronized (tasks) {
+                        // replace all the ": - . _" to "", try to reduce the 
length of pod name and meet pod naming specifications 63 charts.
+                        final String label_value_ori = StringUtils.toLowerCase(
+                                StringUtils.replace(
+                                        StringUtils.replace(
+                                                StringUtils.replace(
+                                                        StringUtils.replace(
+                                                                task.getId(),
+                                                                "_", ""),
+                                                        ":", ""),
+                                                ".", ""),
+                                        "-", ""));
+                        String label_value;
+                        if (label_value_ori.length() > 50) {
+                          label_value = 
label_value_ori.substring(label_value_ori.length() - 50);
+                        } else {
+                          label_value = label_value_ori;
+                        }
+
+                        final K8sForkingTaskRunnerWorkItem taskWorkItem = 
tasks.get(task.getId());
+
+                        if (taskWorkItem == null) {
+                          LOGGER.makeAlert("TaskInfo 
disappeared!").addData("task", task.getId()).emit();
+                          throw new ISE("TaskInfo disappeared for task[%s]!", 
task.getId());
+                        }
+
+                        if (taskWorkItem.shutdown) {
+                          throw new IllegalStateException("Task has been shut 
down!");
+                        }
+
+                        if (taskWorkItem.processHolder != null) {
+                          LOGGER.makeAlert("TaskInfo already has a 
processHolder")
+                                .addData("task", task.getId())
+                                .emit();
+                          throw new ISE("TaskInfo already has processHolder 
for task[%s]!", task.getId());
+                        }
+
+                        final List<String> command = new ArrayList<>();
+                        final String taskClasspath;
+                        if (task.getClasspathPrefix() != null && 
!task.getClasspathPrefix().isEmpty()) {
+                          taskClasspath = Joiner.on(File.pathSeparator).join(
+                            task.getClasspathPrefix(),
+                            config.getClasspath()
+                          );
+                        } else {
+                          taskClasspath = config.getClasspath();
+                        }
+
+                        command.add(config.getJavaCommand());
+                        command.add("-cp");
+                        command.add(taskClasspath);
+
+                        String peonPodJavaOpts = 
task.getContextValue(DRUID_PEON_JAVA_OPTS, "");
+
+                        // users can set a separate JVM config for each task 
through context
+                        if (peonPodJavaOpts.isEmpty()) {
+                          LOGGER.info("Get JavaOpts From 
ForkingTaskRunnerConfig [%s]", config.getJavaOpts());
+                          Iterables.addAll(command, new 
K8sQuotableWhiteSpaceSplitter(config.getJavaOpts()));
+                        } else {
+                          LOGGER.info("Get JavaOpts From Task Context [%s]", 
peonPodJavaOpts);
+                          Iterables.addAll(command, new 
K8sQuotableWhiteSpaceSplitter(peonPodJavaOpts));
+                        }
+
+                        Iterables.addAll(command, config.getJavaOptsArray());
+
+                        // Override task specific javaOpts
+                        Object taskJavaOpts = task.getContextValue(
+                            ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
+                        );
+                        if (taskJavaOpts != null) {
+                          Iterables.addAll(
+                              command,
+                              new K8sQuotableWhiteSpaceSplitter((String) 
taskJavaOpts)
+                          );
+                        }
+
+                        // useed for local deepStorage
+                        String hostPath = 
props.getProperty(DRUID_INDEXER_RUNNER_HOST_PATH, "");
+                        String mountPath = 
props.getProperty(DRUID_INDEXER_RUNNER_MOUNT_PATH, "");
+
+                        for (String propName : props.stringPropertyNames()) {
+                          for (String allowedPrefix : 
config.getAllowedPrefixes()) {
+                            // See https://github.com/apache/druid/issues/1841
+                            if (propName.startsWith(allowedPrefix)
+                                && 
!ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
+                                && 
!ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
+                            ) {
+                              // remove 
druid-kubernetes-middlemanager-extensions in druid.extensions.loadList for peon 
pod
+                              if 
(propName.contains("druid.extensions.loadList")) {
+                                String[] splits = StringUtils.replace(
+                                        StringUtils.replace(
+                                                props.getProperty(propName), 
"[", ""),
+                                        "]", "")
+                                        .split(",");
+                                ArrayList<String> loadList = new ArrayList<>();
+                                for (String extension : splits) {
+                                  if 
(!extension.contains("druid-kubernetes-middlemanager-extensions")) {
+                                    loadList.add(extension);
+                                  }
+                                }
+                                command.add(
+                                        StringUtils.format(
+                                                "-D%s=%s",
+                                                propName,
+                                                loadList
+                                        )
+                                );
+                              } else {
+                                command.add(
+                                        StringUtils.format(
+                                                "-D%s=%s",
+                                                propName,
+                                                props.getProperty(propName)
+                                        )
+                                );
+                              }
+                            }
+                          }
+                        }
+
+                        // Override child JVM specific properties
+                        for (String propName : props.stringPropertyNames()) {
+                          if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
+                            command.add(
+                                StringUtils.format(
+                                "-D%s=%s",
+                                
propName.substring(CHILD_PROPERTY_PREFIX.length()),
+                                props.getProperty(propName)
+                              )
+                            );
+                          }
+                        }
+
+                        // Override task specific properties
+                        final Map<String, Object> context = task.getContext();
+                        if (context != null) {
+                          for (String propName : context.keySet()) {
+                            if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
+                              command.add(
+                                  StringUtils.format(
+                                  "-D%s=%s",
+                                  
propName.substring(CHILD_PROPERTY_PREFIX.length()),
+                                  task.getContextValue(propName)
+                                )
+                              );
+                            }
+                          }
+                        }
+
+                        // Add dataSource, taskId and taskType for metrics or 
logging
+                        command.add(
+                            StringUtils.format(
+                            "-D%s%s=%s",
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
+                            DruidMetrics.DATASOURCE,
+                            task.getDataSource()
+                          )
+                        );
+                        command.add(
+                            StringUtils.format(
+                            "-D%s%s=%s",
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
+                            DruidMetrics.TASK_ID,
+                            task.getId()
+                          )
+                        );
+                        command.add(
+                            StringUtils.format(
+                            "-D%s%s=%s",
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
+                            DruidMetrics.TASK_TYPE,
+                            task.getType()
+                          )
+                        );
+
+                        command.add(StringUtils.format("-Ddruid.host=%s", 
childHost));
+                        
command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
+                        // don't support tlsPort for now
+                        command.add(StringUtils.format("-Ddruid.tlsPort=%d", 
tlsChildPort));
+
+                        // Let tasks know where they are running on.
+                        // This information is used in native parallel 
indexing with shuffle.

Review comment:
       curious: How does shuffle work with K8sTaskRunner ? IIRC shuffle 
implementation relies on MM to be available and serve data processed by task 
even after the task finishes. In this case since the task pod is deleted after 
the task finishes, it won't work with shuffle enabled. This expected behavior 
needs to be verified and documented. 

##########
File path: 
extensions-contrib/kubernetes-middlemanager-extensions/src/main/java/org/apache/druid/k8s/middlemanager/common/DefaultK8sApiClient.java
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.middlemanager.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import io.kubernetes.client.PodLogs;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
+import io.kubernetes.client.openapi.models.V1ConfigMapList;
+import io.kubernetes.client.openapi.models.V1ConfigMapVolumeSource;
+import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarBuilder;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1OwnerReference;
+import io.kubernetes.client.openapi.models.V1OwnerReferenceBuilder;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodBuilder;
+import io.kubernetes.client.openapi.models.V1PodList;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.util.Yaml;
+import io.kubernetes.client.util.generic.GenericKubernetesApi;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Concrete {@link K8sApiClient} impl using k8s-client java lib.
+ */
+public class DefaultK8sApiClient implements K8sApiClient
+{
+  private static final Logger LOGGER = new Logger(DefaultK8sApiClient.class);
+
+  private final ApiClient realK8sClient;
+  private CoreV1Api coreV1Api;
+  private final ObjectMapper jsonMapper;
+  private GenericKubernetesApi<V1Pod, V1PodList> podClient;
+  private PodLogs logs;
+  private String podName;
+  private String podUID;
+
+  @Inject
+  public DefaultK8sApiClient(ApiClient realK8sClient, @Json ObjectMapper 
jsonMapper)
+  {
+    this.realK8sClient = realK8sClient;
+    this.coreV1Api = new CoreV1Api(realK8sClient);
+    this.jsonMapper = jsonMapper;
+    this.podClient = new GenericKubernetesApi<>(V1Pod.class, V1PodList.class, 
"", "v1", "pods", realK8sClient);
+    this.logs = new PodLogs(realK8sClient);
+    this.podName = System.getenv("POD_NAME");
+    this.podUID = System.getenv("POD_UID");
+  }
+
+  public void setCoreV1Api(CoreV1Api coreV1Api)
+  {
+    this.coreV1Api = coreV1Api;
+  }
+
+  public void setPodClient(GenericKubernetesApi<V1Pod, V1PodList> podClient)
+  {
+    this.podClient = podClient;
+  }
+
+  public void setPodLogsClient(PodLogs logs)
+  {
+    this.logs = logs;
+  }
+
+  public void setPodName(String name)
+  {
+    this.podName = name;
+  }
+
+  public void setPodUID(String uid)
+  {
+    this.podUID = uid;
+  }
+
+  /**
+   * Let middlemanager to create peon pod and set peon pod OwnerReference to 
middlemanager
+   * @param taskID used for pod name
+   * @param image peon pod image
+   * @param namespace peon pod image
+   * @param labels peon pod labels used for pod disscovery
+   * @param resourceLimit set request = limit here
+   * @param taskDir location for task.json
+   * @param args running commands
+   * @param childPort peon port
+   * @param tlsChildPort not support yet
+   * @param tempLoc configmap mount location
+   * @param peonPodRestartPolicy Never and lifecycle is crontroled by 
middlemanger
+   * @param hostPath peon pod host in K8s network
+   * @param mountPath configmap mountPath
+   * @return peon pod created. could be null is pod creation failed.
+   */
+  @Override
+  public V1Pod createPod(String taskID, String image, String namespace, 
Map<String, String> labels, Map<String, Quantity> resourceLimit, File taskDir, 
List<String> args, int childPort, int tlsChildPort, String tempLoc, String 
peonPodRestartPolicy, String hostPath, String mountPath, String 
serviceAccountName)
+  {
+    try {
+      final String configMapVolumeName = "task-json-vol-tmp";
+      V1VolumeMount configMapVolumeMount = new 
V1VolumeMount().name(configMapVolumeName).mountPath(tempLoc);
+      V1ConfigMapVolumeSource configMapVolume = new 
V1ConfigMapVolumeSource().defaultMode(420).name(taskID);
+
+      ArrayList<V1VolumeMount> v1VolumeMounts = new ArrayList<>();
+      v1VolumeMounts.add(configMapVolumeMount);
+
+      String commands = buildCommands(args);
+
+      V1OwnerReference owner = new V1OwnerReferenceBuilder()
+              .withName(podName)
+              .withApiVersion("v1")
+              .withUid(podUID)
+              .withKind("Pod")
+              .withController(true)
+              .build();
+
+      V1EnvVar podIpEnv = new V1EnvVarBuilder()
+              .withName("POD_IP")
+              .withNewValueFrom()
+              .withFieldRef(new 
V1ObjectFieldSelector().fieldPath("status.podIP"))
+              .endValueFrom()
+              .build();
+
+      V1EnvVar podNameEnv = new V1EnvVarBuilder()
+              .withName("POD_NAME")
+              .withNewValueFrom()
+              .withFieldRef(new 
V1ObjectFieldSelector().fieldPath("metadata.name"))
+              .endValueFrom()
+              .build();
+
+      V1EnvVar podNameSpaceEnv = new V1EnvVarBuilder()
+              .withName("POD_NAMESPACE")
+              .withNewValueFrom()
+              .withFieldRef(new 
V1ObjectFieldSelector().fieldPath("metadata.namespace"))
+              .endValueFrom()
+              .build();
+
+      V1EnvVar task_id = new V1EnvVarBuilder()
+              .withName("TASK_ID")
+              .withNewValue(taskID)
+              .build();
+
+      V1EnvVar mmPodName = new V1EnvVarBuilder()
+              .withName("MM_POD_NAME")
+              .withNewValue(podName)
+              .build();
+
+      V1EnvVar mmNamespace = new V1EnvVarBuilder()
+              .withName("MM_NAMESPACE")
+              .withNewValue(namespace)
+              .build();
+
+      V1EnvVar task_dir = new V1EnvVarBuilder()
+              .withName("TASK_DIR")
+              .withNewValue(taskDir.getAbsolutePath()).build();
+
+      V1EnvVar task_json_tmp_location = new V1EnvVarBuilder()
+              .withName("TASK_JSON_TMP_LOCATION")
+              .withNewValue(tempLoc + "/task.json").build();
+
+      V1Pod pod;
+
+      if (!mountPath.isEmpty() && !hostPath.isEmpty()) {
+
+        final String hostVolumeName = "data-volume-for-druid-local";
+        V1VolumeMount hostVolumeMount = new 
V1VolumeMount().name(hostVolumeName).mountPath(mountPath);
+        v1VolumeMounts.add(hostVolumeMount);
+
+        pod = new V1PodBuilder()
+                .withNewMetadata()
+                .withOwnerReferences(owner)
+                .withNamespace(namespace)
+                .withName(taskID)
+                .withLabels(labels)
+                .endMetadata()
+                .withNewSpec()
+                .withNewSecurityContext()
+                .withFsGroup(0L)
+                .withRunAsGroup(0L)
+                .withRunAsUser(0L)
+                .endSecurityContext()
+                .addNewVolume()
+                .withNewName(configMapVolumeName)
+                .withConfigMap(configMapVolume)
+                .endVolume()
+                .addNewVolume()
+                .withNewName(hostVolumeName)
+                .withNewHostPath()
+                .withNewPath(hostPath)
+                .withNewType("")
+                .endHostPath()
+                .endVolume()
+                .withNewRestartPolicy(peonPodRestartPolicy)
+                .addNewContainer()
+                .withPorts(new 
V1ContainerPort().protocol("TCP").containerPort(childPort).name("http"))
+                .withNewSecurityContext()
+                .withNewPrivileged(true)
+                .endSecurityContext()
+                .withCommand("/bin/sh", "-c")
+                .withArgs(commands)
+                .withName("peon")
+                .withImage(image)
+                .withImagePullPolicy("IfNotPresent")
+                .withVolumeMounts(v1VolumeMounts)
+                .withEnv(ImmutableList.of(podIpEnv, task_id, task_dir, 
task_json_tmp_location, mmPodName, mmNamespace, podNameEnv, podNameSpaceEnv))
+                .withNewResources()
+                .withRequests(resourceLimit)
+                .withLimits(resourceLimit)
+                .endResources()
+                .endContainer()
+                .endSpec()
+                .build();
+      } else {
+        pod = new V1PodBuilder()
+                .withNewMetadata()
+                .withOwnerReferences(owner)
+                .withNamespace(namespace)
+                .withName(taskID)
+                .withLabels(labels)
+                .endMetadata()
+                .withNewSpec()
+                .withNewSecurityContext()
+                .withFsGroup(0L)
+                .withRunAsGroup(0L)
+                .withRunAsUser(0L)
+                .endSecurityContext()
+                .addNewVolume()
+                .withNewName(configMapVolumeName)
+                .withConfigMap(configMapVolume)
+                .endVolume()
+                .withNewRestartPolicy(peonPodRestartPolicy)
+                .addNewContainer()
+                .withPorts(new 
V1ContainerPort().protocol("TCP").containerPort(childPort).name("http"))
+                .withNewSecurityContext()
+                .withNewPrivileged(true)
+                .endSecurityContext()
+                .withCommand("/bin/sh", "-c")
+                .withArgs(commands)
+                .withName("peon")
+                .withImage(image)
+                .withImagePullPolicy("IfNotPresent")
+                .withVolumeMounts(v1VolumeMounts)
+                .withEnv(ImmutableList.of(podIpEnv, task_id, task_dir, 
task_json_tmp_location, mmPodName, mmNamespace, podNameEnv, podNameSpaceEnv))
+                .withNewResources()
+                .withRequests(resourceLimit)
+                .withLimits(resourceLimit)
+                .endResources()
+                .endContainer()
+                .endSpec()
+                .build();
+      }
+
+      return coreV1Api.createNamespacedPod(namespace, pod, null, null, null);
+    }
+    catch (ApiException ex) {
+      LOGGER.warn(ex, "Failed to create pod[%s/%s], code[%d], error[%s].", 
namespace, taskID, ex.getCode(), ex.getResponseBody());
+    }
+    return null;
+  }
+
+  /**
+   * convert commands into usable String:
+   * 1. remove all the \n, \r, \t
+   * 2. convert property=abc;def to property="abc;def"
+   * 3. add prepareTaskFiles to prepare necessary files like task.json before 
running commands.
+   * 4. convert property=[a,b,c] to property="[\"a\",\"b\",\"c\"]"
+   * 5. convert " to \"
+   * @param args commands
+   * @return a runnable commands in pod.
+   */
+  private String buildCommands(List<String> args)
+  {
+    for (int i = 0; i < args.size(); i++) {
+      String value = args.get(i);
+      args.set(i, StringUtils.replace(value, "\n", ""));
+      args.set(i, StringUtils.replace(value, "\r", ""));
+      args.set(i, StringUtils.replace(value, "\t", ""));
+      args.set(i, StringUtils.replace(value, "\"", "\\\""));
+      String[] splits = args.get(i).split("=");
+      if (splits.length > 1 && splits[1].contains(";")) {
+        args.set(i, splits[0] + "=" + "\"" + splits[1] + "\"");
+      }
+
+      if (splits.length > 1 && (splits[1].startsWith("[") || 
splits[1].startsWith("{"))) {
+        args.set(i, splits[0] + "=" + "\"" + splits[1] + "\"");
+      }
+    }
+
+    StringBuilder builder = new StringBuilder();
+    for (String arg : args) {
+      builder.append(arg).append(" ");
+    }
+
+    String javaCommands = builder.toString().substring(0, 
builder.toString().length() - 1);
+    String reportFile = args.get(args.size() - 1);
+    String statusFile = args.get(args.size() - 2);
+    final String postAction = ";cd `dirname " + reportFile
+            + "`;kubectl cp report.json $MM_NAMESPACE/$MM_POD_NAME:" + 
reportFile
+            + ";kubectl cp status.json $MM_NAMESPACE/$MM_POD_NAME:" + 
statusFile + ";";
+
+    // prepare necessary files based on /druid.sh in dockerImage
+    final String prepareTaskFiles = "mkdir -p /tmp/conf/;test -d 
/tmp/conf/druid && rm -r /tmp/conf/druid;cp -r /opt/druid/conf/druid 
/tmp/conf/druid;mkdir -p $TASK_DIR; cp $TASK_JSON_TMP_LOCATION $TASK_DIR;";
+    return prepareTaskFiles + javaCommands + postAction;
+  }
+
+  /**
+   * create a task.json configmap for peon pod.
+   */
+  @Override
+  public V1ConfigMap createConfigMap(String namespace, String configMapName, 
Map<String, String> labels, Map<String, String> data)
+  {
+    V1OwnerReference owner = new V1OwnerReferenceBuilder()
+            .withName(podName)
+            .withApiVersion("v1")
+            .withUid(podUID)
+            .withKind("Pod")
+            .withController(true)
+            .build();
+
+    V1ConfigMap configMap = new V1ConfigMapBuilder()
+            .withNewMetadata()
+            .withOwnerReferences(owner)
+            .withName(configMapName)
+            .withLabels(labels)
+            .endMetadata()
+            .withData(data)
+            .build();
+
+    try {
+      return coreV1Api.createNamespacedConfigMap(namespace, configMap, null, 
null, null);
+    }
+    catch (ApiException ex) {
+      LOGGER.warn(ex, "Failed to create configMap[%s/%s], code[%d], 
error[%s].", namespace, configMapName, ex.getCode(), ex.getResponseBody());
+    }
+    return null;
+  }
+
+  @Override
+  public Boolean configMapIsExist(String namespace, String labelSelector)
+  {
+    try {
+      V1ConfigMapList v1ConfigMapList = 
coreV1Api.listNamespacedConfigMap(namespace, null, null, null, null, 
labelSelector, null, null, null, null);
+      if (v1ConfigMapList == null) {
+        return false;
+      }
+      return !v1ConfigMapList.getItems().isEmpty();
+    }
+    catch (ApiException ex) {
+      LOGGER.warn(ex, "Failed to get configMap[%s/%s], code[%d], error[%s].", 
namespace, labelSelector, ex.getCode(), ex.getResponseBody());
+      return false;
+    }
+  }
+
+  /**
+   * There are five status for pod, including "Pending", "Running", 
"Succeeded", "Failed", "Unknown"
+   * Just care about Pending status here.
+   * @param peonPod
+   * @param labelSelector
+   */
+  @Override
+  public void waitForPodRunning(V1Pod peonPod, String labelSelector)
+  {
+    String namespace = peonPod.getMetadata().getNamespace();
+    String podName = peonPod.getMetadata().getName();
+    try {
+      V1PodList v1PodList = coreV1Api.listNamespacedPod(namespace, null, null, 
null, null, labelSelector, null, null, null, null);
+      while (v1PodList.getItems().isEmpty() || (v1PodList.getItems().size() > 
0 && getPodStatus(v1PodList.getItems().get(0)).equalsIgnoreCase("Pending"))) {
+        LOGGER.info("Still waiting for pod Running [%s/%s]", namespace, 
podName);
+        Thread.sleep(3 * 1000);
+        v1PodList = 
coreV1Api.listNamespacedPod(peonPod.getMetadata().getNamespace(), null, null, 
null, null, labelSelector, null, null, null, null);
+      }
+      LOGGER.info("Peon Pod Running : %s", Yaml.dump(peonPod));
+    }
+    catch (ApiException ex) {
+      LOGGER.warn(ex, "Exception to wait for pod Running[%s/%s], code[%d], 
error[%s].", namespace, labelSelector, ex.getCode(), ex.getResponseBody());
+    }
+    catch (Exception ex) {
+      LOGGER.warn(ex, "Exception when wait for pod Running [%s/%s]", 
namespace, podName);
+    }
+  }
+
+  @Override
+  public InputStream getPodLogs(V1Pod peonPod)
+  {
+    String namespace = peonPod.getMetadata().getNamespace();
+    String podName = peonPod.getMetadata().getName();
+    InputStream is = null;
+    try {
+      is = logs.streamNamespacedPodLog(peonPod);
+    }
+    catch (ApiException ex) {
+      LOGGER.warn(ex, "Exception to get pod logs [%s/%s], code[%d], 
error[%s].", namespace, podName, ex.getCode(), ex.getResponseBody());
+    }
+    catch (IOException ex) {
+      LOGGER.warn(ex, "Error when get pod logs [%s/%s].", namespace, podName);
+    }
+    return is;
+  }
+
+  @Override
+  public String waitForPodFinished(V1Pod peonPod)
+  {
+    String phase = getPodStatus(peonPod);
+    String namespace = "";
+    String name = "";
+    try {
+      namespace = peonPod.getMetadata().getNamespace();
+      name = peonPod.getMetadata().getName();
+
+      while (!phase.equalsIgnoreCase("Failed") && 
!phase.equalsIgnoreCase("Succeeded")) {
+        LOGGER.info("Still wait for peon pod finished [%s/%s] current status 
is [%s]", namespace, name, phase);
+        Thread.sleep(3 * 1000);

Review comment:
       is there a better alternative here ? does k8s support listening/watching 
to some pod lifecycle events ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to