nishantmonu51 commented on a change in pull request #10910: URL: https://github.com/apache/druid/pull/10910#discussion_r603927681
########## File path: extensions-contrib/kubernetes-middlemanager-extensions/src/main/java/org/apache/druid/k8s/middlemanager/K8sForkingTaskRunner.java ########## @@ -0,0 +1,970 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.middlemanager; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.CharMatcher; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.io.ByteSink; +import com.google.common.io.ByteSource; +import com.google.common.io.ByteStreams; +import com.google.common.io.FileWriteMode; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Inject; +import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.openapi.models.V1Pod; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.tasklogs.LogUtils; +import org.apache.druid.indexing.overlord.BaseRestorableTaskRunner; +import org.apache.druid.indexing.overlord.PortFinder; +import org.apache.druid.indexing.overlord.TaskRunnerUtils; +import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.k8s.middlemanager.common.K8sApiClient; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.server.metrics.MonitorsConfig; +import org.apache.druid.tasklogs.TaskLogPusher; +import org.apache.druid.tasklogs.TaskLogStreamer; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Runs tasks in separate processes using the "internal peon" verb. + */ +public class K8sForkingTaskRunner + extends BaseRestorableTaskRunner<K8sForkingTaskRunner.K8sForkingTaskRunnerWorkItem> + implements TaskLogStreamer + +{ + private static final EmittingLogger LOGGER = new EmittingLogger(K8sForkingTaskRunner.class); + private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property."; + private static final String DRUID_INDEXER_NAMESPACE = "druid.indexer.namespace"; + private static final String DRUID_INDEXER_IMAGE = "druid.indexer.image"; + private static final String DRUID_INDEXER_SERVICE_ACCOUNT_NAME = "druid.indexer.serviceAccountName"; + private static final String DRUID_INDEXER_DEFAULT_POD_CPU = "druid.indexer.default.pod.cpu"; + private static final String DRUID_INDEXER_DEFAULT_POD_MEMORY = "druid.indexer.default.pod.memory"; + private static final String DRUID_INDEXER_RUNNER_HOST_PATH = "druid.indexer.runner.hostPath"; + private static final String DRUID_INDEXER_RUNNER_MOUNT_PATH = "druid.indexer.runner.mountPath"; + private static final String DRUID_PEON_JAVA_OPTS = "druid.peon.javaOpts"; + private static final String DRUID_PEON_POD_MEMORY = "druid.peon.pod.memory"; + private static final String DRUID_PEON_POD_CPU = "druid.peon.pod.cpu"; + private static final String LABEL_KEY = "druid.ingest.task.id"; Review comment: task Runner seems to be hardcoding most of the parameters/configs to be used by the task Runner, Is it possible to use something like Helm charts and templates ? The goal is for the devOps person to maintain those templates and from druid side, the task runner would make use of those templates to create a Job ? ########## File path: extensions-contrib/kubernetes-middlemanager-extensions/src/main/java/org/apache/druid/k8s/middlemanager/K8sForkingTaskRunner.java ########## @@ -0,0 +1,970 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.middlemanager; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.CharMatcher; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.io.ByteSink; +import com.google.common.io.ByteSource; +import com.google.common.io.ByteStreams; +import com.google.common.io.FileWriteMode; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Inject; +import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.openapi.models.V1Pod; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.tasklogs.LogUtils; +import org.apache.druid.indexing.overlord.BaseRestorableTaskRunner; +import org.apache.druid.indexing.overlord.PortFinder; +import org.apache.druid.indexing.overlord.TaskRunnerUtils; +import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.k8s.middlemanager.common.K8sApiClient; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.server.metrics.MonitorsConfig; +import org.apache.druid.tasklogs.TaskLogPusher; +import org.apache.druid.tasklogs.TaskLogStreamer; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Runs tasks in separate processes using the "internal peon" verb. + */ +public class K8sForkingTaskRunner + extends BaseRestorableTaskRunner<K8sForkingTaskRunner.K8sForkingTaskRunnerWorkItem> + implements TaskLogStreamer + +{ + private static final EmittingLogger LOGGER = new EmittingLogger(K8sForkingTaskRunner.class); + private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property."; + private static final String DRUID_INDEXER_NAMESPACE = "druid.indexer.namespace"; + private static final String DRUID_INDEXER_IMAGE = "druid.indexer.image"; + private static final String DRUID_INDEXER_SERVICE_ACCOUNT_NAME = "druid.indexer.serviceAccountName"; + private static final String DRUID_INDEXER_DEFAULT_POD_CPU = "druid.indexer.default.pod.cpu"; + private static final String DRUID_INDEXER_DEFAULT_POD_MEMORY = "druid.indexer.default.pod.memory"; + private static final String DRUID_INDEXER_RUNNER_HOST_PATH = "druid.indexer.runner.hostPath"; + private static final String DRUID_INDEXER_RUNNER_MOUNT_PATH = "druid.indexer.runner.mountPath"; + private static final String DRUID_PEON_JAVA_OPTS = "druid.peon.javaOpts"; + private static final String DRUID_PEON_POD_MEMORY = "druid.peon.pod.memory"; + private static final String DRUID_PEON_POD_CPU = "druid.peon.pod.cpu"; + private static final String LABEL_KEY = "druid.ingest.task.id"; + private final ForkingTaskRunnerConfig config; + private final Properties props; + private final TaskLogPusher taskLogPusher; + private final DruidNode node; + private final ListeningExecutorService exec; + private final PortFinder portFinder; + private final StartupLoggingConfig startupLoggingConfig; + private final K8sApiClient k8sApiClient; + + private volatile boolean stopping = false; + private final String nameSpace; + private final String image; + private final String defaultPodCPU; + private final String defaultPodMemory; + private final String serviceAccountName; + + @Inject + public K8sForkingTaskRunner( + ForkingTaskRunnerConfig config, + TaskConfig taskConfig, + WorkerConfig workerConfig, + Properties props, + TaskLogPusher taskLogPusher, + ObjectMapper jsonMapper, + @Self DruidNode node, + StartupLoggingConfig startupLoggingConfig, + K8sApiClient k8sApiClient + ) + { + super(jsonMapper, taskConfig); + this.config = config; + this.props = props; + this.taskLogPusher = taskLogPusher; + this.node = node; + this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts()); + this.startupLoggingConfig = startupLoggingConfig; + this.exec = MoreExecutors.listeningDecorator( + Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d") + ); + this.k8sApiClient = k8sApiClient; + this.nameSpace = props.getProperty(DRUID_INDEXER_NAMESPACE, "default"); + this.image = props.getProperty(DRUID_INDEXER_IMAGE, "druid/cluster:v1"); + this.serviceAccountName = props.getProperty(DRUID_INDEXER_SERVICE_ACCOUNT_NAME, "default"); + this.defaultPodCPU = props.getProperty(DRUID_INDEXER_DEFAULT_POD_CPU, "1"); + this.defaultPodMemory = props.getProperty(DRUID_INDEXER_DEFAULT_POD_MEMORY, "2G"); + + assert image != null; + } + + @Override + public ListenableFuture<TaskStatus> run(final Task task) + { + synchronized (tasks) { + tasks.computeIfAbsent( + task.getId(), k -> + new K8sForkingTaskRunnerWorkItem( + task, + exec.submit( + new Callable<TaskStatus>() { + @Override + public TaskStatus call() + { + final String attemptUUID = UUID.randomUUID().toString(); + final File taskDir = taskConfig.getTaskDir(task.getId()); + final File attemptDir = new File(taskDir, attemptUUID); + + final K8sProcessHolder processHolder; + // POD_IP is defined in env when create peon pod. + final String childHost = "$POD_IP"; + final String tmpFileLoc = "/druidTmp"; + int childPort = -1; + int tlsChildPort = -1; + + if (node.isEnablePlaintextPort()) { + childPort = portFinder.findUnusedPort(); + } + + if (node.isEnableTlsPort()) { + tlsChildPort = portFinder.findUnusedPort(); + } + + TaskLocation taskLocation; + + try { + final Closer closer = Closer.create(); + try { + + final File taskFile = new File(taskDir, "task.json"); + final File statusFile = new File(attemptDir, "status.json"); + + final File logFile = new File(taskDir, "log"); + + if (!logFile.exists()) { + if (taskDir.exists()) { + logFile.createNewFile(); + } else { + taskDir.mkdirs(); + logFile.createNewFile(); + } + } + attemptDir.mkdirs(); + + final File reportsFile = new File(attemptDir, "report.json"); + // time to adjust process holders + synchronized (tasks) { + // replace all the ": - . _" to "", try to reduce the length of pod name and meet pod naming specifications 63 charts. + final String label_value_ori = StringUtils.toLowerCase( + StringUtils.replace( + StringUtils.replace( + StringUtils.replace( + StringUtils.replace( + task.getId(), + "_", ""), + ":", ""), + ".", ""), + "-", "")); + String label_value; Review comment: nit: refactor and extract to sanitizeName method ########## File path: extensions-contrib/kubernetes-middlemanager-extensions/src/main/java/org/apache/druid/k8s/middlemanager/K8sForkingTaskRunner.java ########## @@ -0,0 +1,970 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.middlemanager; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.CharMatcher; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.io.ByteSink; +import com.google.common.io.ByteSource; +import com.google.common.io.ByteStreams; +import com.google.common.io.FileWriteMode; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Inject; +import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.openapi.models.V1Pod; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.tasklogs.LogUtils; +import org.apache.druid.indexing.overlord.BaseRestorableTaskRunner; +import org.apache.druid.indexing.overlord.PortFinder; +import org.apache.druid.indexing.overlord.TaskRunnerUtils; +import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.k8s.middlemanager.common.K8sApiClient; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.server.metrics.MonitorsConfig; +import org.apache.druid.tasklogs.TaskLogPusher; +import org.apache.druid.tasklogs.TaskLogStreamer; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Runs tasks in separate processes using the "internal peon" verb. + */ +public class K8sForkingTaskRunner + extends BaseRestorableTaskRunner<K8sForkingTaskRunner.K8sForkingTaskRunnerWorkItem> + implements TaskLogStreamer + +{ + private static final EmittingLogger LOGGER = new EmittingLogger(K8sForkingTaskRunner.class); + private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property."; + private static final String DRUID_INDEXER_NAMESPACE = "druid.indexer.namespace"; + private static final String DRUID_INDEXER_IMAGE = "druid.indexer.image"; + private static final String DRUID_INDEXER_SERVICE_ACCOUNT_NAME = "druid.indexer.serviceAccountName"; + private static final String DRUID_INDEXER_DEFAULT_POD_CPU = "druid.indexer.default.pod.cpu"; + private static final String DRUID_INDEXER_DEFAULT_POD_MEMORY = "druid.indexer.default.pod.memory"; + private static final String DRUID_INDEXER_RUNNER_HOST_PATH = "druid.indexer.runner.hostPath"; + private static final String DRUID_INDEXER_RUNNER_MOUNT_PATH = "druid.indexer.runner.mountPath"; + private static final String DRUID_PEON_JAVA_OPTS = "druid.peon.javaOpts"; + private static final String DRUID_PEON_POD_MEMORY = "druid.peon.pod.memory"; + private static final String DRUID_PEON_POD_CPU = "druid.peon.pod.cpu"; + private static final String LABEL_KEY = "druid.ingest.task.id"; + private final ForkingTaskRunnerConfig config; + private final Properties props; + private final TaskLogPusher taskLogPusher; + private final DruidNode node; + private final ListeningExecutorService exec; + private final PortFinder portFinder; + private final StartupLoggingConfig startupLoggingConfig; + private final K8sApiClient k8sApiClient; + + private volatile boolean stopping = false; + private final String nameSpace; + private final String image; + private final String defaultPodCPU; + private final String defaultPodMemory; + private final String serviceAccountName; + + @Inject + public K8sForkingTaskRunner( + ForkingTaskRunnerConfig config, + TaskConfig taskConfig, + WorkerConfig workerConfig, + Properties props, + TaskLogPusher taskLogPusher, + ObjectMapper jsonMapper, + @Self DruidNode node, + StartupLoggingConfig startupLoggingConfig, + K8sApiClient k8sApiClient + ) + { + super(jsonMapper, taskConfig); + this.config = config; + this.props = props; + this.taskLogPusher = taskLogPusher; + this.node = node; + this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts()); + this.startupLoggingConfig = startupLoggingConfig; + this.exec = MoreExecutors.listeningDecorator( + Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d") + ); + this.k8sApiClient = k8sApiClient; + this.nameSpace = props.getProperty(DRUID_INDEXER_NAMESPACE, "default"); + this.image = props.getProperty(DRUID_INDEXER_IMAGE, "druid/cluster:v1"); + this.serviceAccountName = props.getProperty(DRUID_INDEXER_SERVICE_ACCOUNT_NAME, "default"); + this.defaultPodCPU = props.getProperty(DRUID_INDEXER_DEFAULT_POD_CPU, "1"); + this.defaultPodMemory = props.getProperty(DRUID_INDEXER_DEFAULT_POD_MEMORY, "2G"); + + assert image != null; + } + + @Override + public ListenableFuture<TaskStatus> run(final Task task) + { + synchronized (tasks) { + tasks.computeIfAbsent( + task.getId(), k -> + new K8sForkingTaskRunnerWorkItem( + task, + exec.submit( + new Callable<TaskStatus>() { + @Override + public TaskStatus call() + { + final String attemptUUID = UUID.randomUUID().toString(); + final File taskDir = taskConfig.getTaskDir(task.getId()); + final File attemptDir = new File(taskDir, attemptUUID); + + final K8sProcessHolder processHolder; + // POD_IP is defined in env when create peon pod. + final String childHost = "$POD_IP"; + final String tmpFileLoc = "/druidTmp"; + int childPort = -1; + int tlsChildPort = -1; + + if (node.isEnablePlaintextPort()) { + childPort = portFinder.findUnusedPort(); + } + + if (node.isEnableTlsPort()) { + tlsChildPort = portFinder.findUnusedPort(); + } + + TaskLocation taskLocation; + + try { + final Closer closer = Closer.create(); + try { + + final File taskFile = new File(taskDir, "task.json"); + final File statusFile = new File(attemptDir, "status.json"); + + final File logFile = new File(taskDir, "log"); + + if (!logFile.exists()) { + if (taskDir.exists()) { + logFile.createNewFile(); + } else { + taskDir.mkdirs(); + logFile.createNewFile(); + } + } + attemptDir.mkdirs(); + + final File reportsFile = new File(attemptDir, "report.json"); + // time to adjust process holders + synchronized (tasks) { + // replace all the ": - . _" to "", try to reduce the length of pod name and meet pod naming specifications 63 charts. + final String label_value_ori = StringUtils.toLowerCase( + StringUtils.replace( + StringUtils.replace( + StringUtils.replace( + StringUtils.replace( + task.getId(), + "_", ""), + ":", ""), + ".", ""), + "-", "")); + String label_value; + if (label_value_ori.length() > 50) { + label_value = label_value_ori.substring(label_value_ori.length() - 50); + } else { + label_value = label_value_ori; + } + + final K8sForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId()); + + if (taskWorkItem == null) { + LOGGER.makeAlert("TaskInfo disappeared!").addData("task", task.getId()).emit(); + throw new ISE("TaskInfo disappeared for task[%s]!", task.getId()); + } + + if (taskWorkItem.shutdown) { + throw new IllegalStateException("Task has been shut down!"); + } + + if (taskWorkItem.processHolder != null) { + LOGGER.makeAlert("TaskInfo already has a processHolder") + .addData("task", task.getId()) + .emit(); + throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId()); + } + + final List<String> command = new ArrayList<>(); + final String taskClasspath; + if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { + taskClasspath = Joiner.on(File.pathSeparator).join( + task.getClasspathPrefix(), + config.getClasspath() + ); + } else { + taskClasspath = config.getClasspath(); + } + + command.add(config.getJavaCommand()); + command.add("-cp"); + command.add(taskClasspath); + + String peonPodJavaOpts = task.getContextValue(DRUID_PEON_JAVA_OPTS, ""); + + // users can set a separate JVM config for each task through context + if (peonPodJavaOpts.isEmpty()) { + LOGGER.info("Get JavaOpts From ForkingTaskRunnerConfig [%s]", config.getJavaOpts()); + Iterables.addAll(command, new K8sQuotableWhiteSpaceSplitter(config.getJavaOpts())); + } else { + LOGGER.info("Get JavaOpts From Task Context [%s]", peonPodJavaOpts); + Iterables.addAll(command, new K8sQuotableWhiteSpaceSplitter(peonPodJavaOpts)); + } + + Iterables.addAll(command, config.getJavaOptsArray()); + + // Override task specific javaOpts + Object taskJavaOpts = task.getContextValue( + ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY + ); + if (taskJavaOpts != null) { + Iterables.addAll( + command, + new K8sQuotableWhiteSpaceSplitter((String) taskJavaOpts) + ); + } + + // useed for local deepStorage + String hostPath = props.getProperty(DRUID_INDEXER_RUNNER_HOST_PATH, ""); + String mountPath = props.getProperty(DRUID_INDEXER_RUNNER_MOUNT_PATH, ""); + + for (String propName : props.stringPropertyNames()) { + for (String allowedPrefix : config.getAllowedPrefixes()) { + // See https://github.com/apache/druid/issues/1841 + if (propName.startsWith(allowedPrefix) + && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) + && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) + ) { + // remove druid-kubernetes-middlemanager-extensions in druid.extensions.loadList for peon pod + if (propName.contains("druid.extensions.loadList")) { + String[] splits = StringUtils.replace( + StringUtils.replace( + props.getProperty(propName), "[", ""), + "]", "") + .split(","); + ArrayList<String> loadList = new ArrayList<>(); + for (String extension : splits) { + if (!extension.contains("druid-kubernetes-middlemanager-extensions")) { + loadList.add(extension); + } + } + command.add( + StringUtils.format( + "-D%s=%s", + propName, + loadList + ) + ); + } else { + command.add( + StringUtils.format( + "-D%s=%s", + propName, + props.getProperty(propName) + ) + ); + } + } + } + } + + // Override child JVM specific properties + for (String propName : props.stringPropertyNames()) { + if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { + command.add( + StringUtils.format( + "-D%s=%s", + propName.substring(CHILD_PROPERTY_PREFIX.length()), + props.getProperty(propName) + ) + ); + } + } + + // Override task specific properties + final Map<String, Object> context = task.getContext(); + if (context != null) { + for (String propName : context.keySet()) { + if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { + command.add( + StringUtils.format( + "-D%s=%s", + propName.substring(CHILD_PROPERTY_PREFIX.length()), + task.getContextValue(propName) + ) + ); + } + } + } + + // Add dataSource, taskId and taskType for metrics or logging + command.add( + StringUtils.format( + "-D%s%s=%s", + MonitorsConfig.METRIC_DIMENSION_PREFIX, + DruidMetrics.DATASOURCE, + task.getDataSource() + ) + ); + command.add( + StringUtils.format( + "-D%s%s=%s", + MonitorsConfig.METRIC_DIMENSION_PREFIX, + DruidMetrics.TASK_ID, + task.getId() + ) + ); + command.add( + StringUtils.format( + "-D%s%s=%s", + MonitorsConfig.METRIC_DIMENSION_PREFIX, + DruidMetrics.TASK_TYPE, + task.getType() + ) + ); + + command.add(StringUtils.format("-Ddruid.host=%s", childHost)); + command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort)); + // don't support tlsPort for now + command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort)); + + // Let tasks know where they are running on. + // This information is used in native parallel indexing with shuffle. Review comment: curious: How does shuffle work with K8sTaskRunner ? IIRC shuffle implementation relies on MM to be available and serve data processed by task even after the task finishes. In this case since the task pod is deleted after the task finishes, it won't work with shuffle enabled. This expected behavior needs to be verified and documented. ########## File path: extensions-contrib/kubernetes-middlemanager-extensions/src/main/java/org/apache/druid/k8s/middlemanager/common/DefaultK8sApiClient.java ########## @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.middlemanager.common; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import io.kubernetes.client.PodLogs; +import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1ConfigMap; +import io.kubernetes.client.openapi.models.V1ConfigMapBuilder; +import io.kubernetes.client.openapi.models.V1ConfigMapList; +import io.kubernetes.client.openapi.models.V1ConfigMapVolumeSource; +import io.kubernetes.client.openapi.models.V1ContainerPort; +import io.kubernetes.client.openapi.models.V1EnvVar; +import io.kubernetes.client.openapi.models.V1EnvVarBuilder; +import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1OwnerReference; +import io.kubernetes.client.openapi.models.V1OwnerReferenceBuilder; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodBuilder; +import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.openapi.models.V1VolumeMount; +import io.kubernetes.client.util.Yaml; +import io.kubernetes.client.util.generic.GenericKubernetesApi; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Concrete {@link K8sApiClient} impl using k8s-client java lib. + */ +public class DefaultK8sApiClient implements K8sApiClient +{ + private static final Logger LOGGER = new Logger(DefaultK8sApiClient.class); + + private final ApiClient realK8sClient; + private CoreV1Api coreV1Api; + private final ObjectMapper jsonMapper; + private GenericKubernetesApi<V1Pod, V1PodList> podClient; + private PodLogs logs; + private String podName; + private String podUID; + + @Inject + public DefaultK8sApiClient(ApiClient realK8sClient, @Json ObjectMapper jsonMapper) + { + this.realK8sClient = realK8sClient; + this.coreV1Api = new CoreV1Api(realK8sClient); + this.jsonMapper = jsonMapper; + this.podClient = new GenericKubernetesApi<>(V1Pod.class, V1PodList.class, "", "v1", "pods", realK8sClient); + this.logs = new PodLogs(realK8sClient); + this.podName = System.getenv("POD_NAME"); + this.podUID = System.getenv("POD_UID"); + } + + public void setCoreV1Api(CoreV1Api coreV1Api) + { + this.coreV1Api = coreV1Api; + } + + public void setPodClient(GenericKubernetesApi<V1Pod, V1PodList> podClient) + { + this.podClient = podClient; + } + + public void setPodLogsClient(PodLogs logs) + { + this.logs = logs; + } + + public void setPodName(String name) + { + this.podName = name; + } + + public void setPodUID(String uid) + { + this.podUID = uid; + } + + /** + * Let middlemanager to create peon pod and set peon pod OwnerReference to middlemanager + * @param taskID used for pod name + * @param image peon pod image + * @param namespace peon pod image + * @param labels peon pod labels used for pod disscovery + * @param resourceLimit set request = limit here + * @param taskDir location for task.json + * @param args running commands + * @param childPort peon port + * @param tlsChildPort not support yet + * @param tempLoc configmap mount location + * @param peonPodRestartPolicy Never and lifecycle is crontroled by middlemanger + * @param hostPath peon pod host in K8s network + * @param mountPath configmap mountPath + * @return peon pod created. could be null is pod creation failed. + */ + @Override + public V1Pod createPod(String taskID, String image, String namespace, Map<String, String> labels, Map<String, Quantity> resourceLimit, File taskDir, List<String> args, int childPort, int tlsChildPort, String tempLoc, String peonPodRestartPolicy, String hostPath, String mountPath, String serviceAccountName) + { + try { + final String configMapVolumeName = "task-json-vol-tmp"; + V1VolumeMount configMapVolumeMount = new V1VolumeMount().name(configMapVolumeName).mountPath(tempLoc); + V1ConfigMapVolumeSource configMapVolume = new V1ConfigMapVolumeSource().defaultMode(420).name(taskID); + + ArrayList<V1VolumeMount> v1VolumeMounts = new ArrayList<>(); + v1VolumeMounts.add(configMapVolumeMount); + + String commands = buildCommands(args); + + V1OwnerReference owner = new V1OwnerReferenceBuilder() + .withName(podName) + .withApiVersion("v1") + .withUid(podUID) + .withKind("Pod") + .withController(true) + .build(); + + V1EnvVar podIpEnv = new V1EnvVarBuilder() + .withName("POD_IP") + .withNewValueFrom() + .withFieldRef(new V1ObjectFieldSelector().fieldPath("status.podIP")) + .endValueFrom() + .build(); + + V1EnvVar podNameEnv = new V1EnvVarBuilder() + .withName("POD_NAME") + .withNewValueFrom() + .withFieldRef(new V1ObjectFieldSelector().fieldPath("metadata.name")) + .endValueFrom() + .build(); + + V1EnvVar podNameSpaceEnv = new V1EnvVarBuilder() + .withName("POD_NAMESPACE") + .withNewValueFrom() + .withFieldRef(new V1ObjectFieldSelector().fieldPath("metadata.namespace")) + .endValueFrom() + .build(); + + V1EnvVar task_id = new V1EnvVarBuilder() + .withName("TASK_ID") + .withNewValue(taskID) + .build(); + + V1EnvVar mmPodName = new V1EnvVarBuilder() + .withName("MM_POD_NAME") + .withNewValue(podName) + .build(); + + V1EnvVar mmNamespace = new V1EnvVarBuilder() + .withName("MM_NAMESPACE") + .withNewValue(namespace) + .build(); + + V1EnvVar task_dir = new V1EnvVarBuilder() + .withName("TASK_DIR") + .withNewValue(taskDir.getAbsolutePath()).build(); + + V1EnvVar task_json_tmp_location = new V1EnvVarBuilder() + .withName("TASK_JSON_TMP_LOCATION") + .withNewValue(tempLoc + "/task.json").build(); + + V1Pod pod; + + if (!mountPath.isEmpty() && !hostPath.isEmpty()) { + + final String hostVolumeName = "data-volume-for-druid-local"; + V1VolumeMount hostVolumeMount = new V1VolumeMount().name(hostVolumeName).mountPath(mountPath); + v1VolumeMounts.add(hostVolumeMount); + + pod = new V1PodBuilder() + .withNewMetadata() + .withOwnerReferences(owner) + .withNamespace(namespace) + .withName(taskID) + .withLabels(labels) + .endMetadata() + .withNewSpec() + .withNewSecurityContext() + .withFsGroup(0L) + .withRunAsGroup(0L) + .withRunAsUser(0L) + .endSecurityContext() + .addNewVolume() + .withNewName(configMapVolumeName) + .withConfigMap(configMapVolume) + .endVolume() + .addNewVolume() + .withNewName(hostVolumeName) + .withNewHostPath() + .withNewPath(hostPath) + .withNewType("") + .endHostPath() + .endVolume() + .withNewRestartPolicy(peonPodRestartPolicy) + .addNewContainer() + .withPorts(new V1ContainerPort().protocol("TCP").containerPort(childPort).name("http")) + .withNewSecurityContext() + .withNewPrivileged(true) + .endSecurityContext() + .withCommand("/bin/sh", "-c") + .withArgs(commands) + .withName("peon") + .withImage(image) + .withImagePullPolicy("IfNotPresent") + .withVolumeMounts(v1VolumeMounts) + .withEnv(ImmutableList.of(podIpEnv, task_id, task_dir, task_json_tmp_location, mmPodName, mmNamespace, podNameEnv, podNameSpaceEnv)) + .withNewResources() + .withRequests(resourceLimit) + .withLimits(resourceLimit) + .endResources() + .endContainer() + .endSpec() + .build(); + } else { + pod = new V1PodBuilder() + .withNewMetadata() + .withOwnerReferences(owner) + .withNamespace(namespace) + .withName(taskID) + .withLabels(labels) + .endMetadata() + .withNewSpec() + .withNewSecurityContext() + .withFsGroup(0L) + .withRunAsGroup(0L) + .withRunAsUser(0L) + .endSecurityContext() + .addNewVolume() + .withNewName(configMapVolumeName) + .withConfigMap(configMapVolume) + .endVolume() + .withNewRestartPolicy(peonPodRestartPolicy) + .addNewContainer() + .withPorts(new V1ContainerPort().protocol("TCP").containerPort(childPort).name("http")) + .withNewSecurityContext() + .withNewPrivileged(true) + .endSecurityContext() + .withCommand("/bin/sh", "-c") + .withArgs(commands) + .withName("peon") + .withImage(image) + .withImagePullPolicy("IfNotPresent") + .withVolumeMounts(v1VolumeMounts) + .withEnv(ImmutableList.of(podIpEnv, task_id, task_dir, task_json_tmp_location, mmPodName, mmNamespace, podNameEnv, podNameSpaceEnv)) + .withNewResources() + .withRequests(resourceLimit) + .withLimits(resourceLimit) + .endResources() + .endContainer() + .endSpec() + .build(); + } + + return coreV1Api.createNamespacedPod(namespace, pod, null, null, null); + } + catch (ApiException ex) { + LOGGER.warn(ex, "Failed to create pod[%s/%s], code[%d], error[%s].", namespace, taskID, ex.getCode(), ex.getResponseBody()); + } + return null; + } + + /** + * convert commands into usable String: + * 1. remove all the \n, \r, \t + * 2. convert property=abc;def to property="abc;def" + * 3. add prepareTaskFiles to prepare necessary files like task.json before running commands. + * 4. convert property=[a,b,c] to property="[\"a\",\"b\",\"c\"]" + * 5. convert " to \" + * @param args commands + * @return a runnable commands in pod. + */ + private String buildCommands(List<String> args) + { + for (int i = 0; i < args.size(); i++) { + String value = args.get(i); + args.set(i, StringUtils.replace(value, "\n", "")); + args.set(i, StringUtils.replace(value, "\r", "")); + args.set(i, StringUtils.replace(value, "\t", "")); + args.set(i, StringUtils.replace(value, "\"", "\\\"")); + String[] splits = args.get(i).split("="); + if (splits.length > 1 && splits[1].contains(";")) { + args.set(i, splits[0] + "=" + "\"" + splits[1] + "\""); + } + + if (splits.length > 1 && (splits[1].startsWith("[") || splits[1].startsWith("{"))) { + args.set(i, splits[0] + "=" + "\"" + splits[1] + "\""); + } + } + + StringBuilder builder = new StringBuilder(); + for (String arg : args) { + builder.append(arg).append(" "); + } + + String javaCommands = builder.toString().substring(0, builder.toString().length() - 1); + String reportFile = args.get(args.size() - 1); + String statusFile = args.get(args.size() - 2); + final String postAction = ";cd `dirname " + reportFile + + "`;kubectl cp report.json $MM_NAMESPACE/$MM_POD_NAME:" + reportFile + + ";kubectl cp status.json $MM_NAMESPACE/$MM_POD_NAME:" + statusFile + ";"; + + // prepare necessary files based on /druid.sh in dockerImage + final String prepareTaskFiles = "mkdir -p /tmp/conf/;test -d /tmp/conf/druid && rm -r /tmp/conf/druid;cp -r /opt/druid/conf/druid /tmp/conf/druid;mkdir -p $TASK_DIR; cp $TASK_JSON_TMP_LOCATION $TASK_DIR;"; + return prepareTaskFiles + javaCommands + postAction; + } + + /** + * create a task.json configmap for peon pod. + */ + @Override + public V1ConfigMap createConfigMap(String namespace, String configMapName, Map<String, String> labels, Map<String, String> data) + { + V1OwnerReference owner = new V1OwnerReferenceBuilder() + .withName(podName) + .withApiVersion("v1") + .withUid(podUID) + .withKind("Pod") + .withController(true) + .build(); + + V1ConfigMap configMap = new V1ConfigMapBuilder() + .withNewMetadata() + .withOwnerReferences(owner) + .withName(configMapName) + .withLabels(labels) + .endMetadata() + .withData(data) + .build(); + + try { + return coreV1Api.createNamespacedConfigMap(namespace, configMap, null, null, null); + } + catch (ApiException ex) { + LOGGER.warn(ex, "Failed to create configMap[%s/%s], code[%d], error[%s].", namespace, configMapName, ex.getCode(), ex.getResponseBody()); + } + return null; + } + + @Override + public Boolean configMapIsExist(String namespace, String labelSelector) + { + try { + V1ConfigMapList v1ConfigMapList = coreV1Api.listNamespacedConfigMap(namespace, null, null, null, null, labelSelector, null, null, null, null); + if (v1ConfigMapList == null) { + return false; + } + return !v1ConfigMapList.getItems().isEmpty(); + } + catch (ApiException ex) { + LOGGER.warn(ex, "Failed to get configMap[%s/%s], code[%d], error[%s].", namespace, labelSelector, ex.getCode(), ex.getResponseBody()); + return false; + } + } + + /** + * There are five status for pod, including "Pending", "Running", "Succeeded", "Failed", "Unknown" + * Just care about Pending status here. + * @param peonPod + * @param labelSelector + */ + @Override + public void waitForPodRunning(V1Pod peonPod, String labelSelector) + { + String namespace = peonPod.getMetadata().getNamespace(); + String podName = peonPod.getMetadata().getName(); + try { + V1PodList v1PodList = coreV1Api.listNamespacedPod(namespace, null, null, null, null, labelSelector, null, null, null, null); + while (v1PodList.getItems().isEmpty() || (v1PodList.getItems().size() > 0 && getPodStatus(v1PodList.getItems().get(0)).equalsIgnoreCase("Pending"))) { + LOGGER.info("Still waiting for pod Running [%s/%s]", namespace, podName); + Thread.sleep(3 * 1000); + v1PodList = coreV1Api.listNamespacedPod(peonPod.getMetadata().getNamespace(), null, null, null, null, labelSelector, null, null, null, null); + } + LOGGER.info("Peon Pod Running : %s", Yaml.dump(peonPod)); + } + catch (ApiException ex) { + LOGGER.warn(ex, "Exception to wait for pod Running[%s/%s], code[%d], error[%s].", namespace, labelSelector, ex.getCode(), ex.getResponseBody()); + } + catch (Exception ex) { + LOGGER.warn(ex, "Exception when wait for pod Running [%s/%s]", namespace, podName); + } + } + + @Override + public InputStream getPodLogs(V1Pod peonPod) + { + String namespace = peonPod.getMetadata().getNamespace(); + String podName = peonPod.getMetadata().getName(); + InputStream is = null; + try { + is = logs.streamNamespacedPodLog(peonPod); + } + catch (ApiException ex) { + LOGGER.warn(ex, "Exception to get pod logs [%s/%s], code[%d], error[%s].", namespace, podName, ex.getCode(), ex.getResponseBody()); + } + catch (IOException ex) { + LOGGER.warn(ex, "Error when get pod logs [%s/%s].", namespace, podName); + } + return is; + } + + @Override + public String waitForPodFinished(V1Pod peonPod) + { + String phase = getPodStatus(peonPod); + String namespace = ""; + String name = ""; + try { + namespace = peonPod.getMetadata().getNamespace(); + name = peonPod.getMetadata().getName(); + + while (!phase.equalsIgnoreCase("Failed") && !phase.equalsIgnoreCase("Succeeded")) { + LOGGER.info("Still wait for peon pod finished [%s/%s] current status is [%s]", namespace, name, phase); + Thread.sleep(3 * 1000); Review comment: is there a better alternative here ? does k8s support listening/watching to some pod lifecycle events ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
