Radeity commented on code in PR #13550: URL: https://github.com/apache/dolphinscheduler/pull/13550#discussion_r1111377216
########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.am; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME; + +import org.apache.dolphinscheduler.common.enums.ResourceManagerType; +import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import com.google.auto.service.AutoService; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; + +@Slf4j +@AutoService(ApplicationManager.class) +public class KubernetesApplicationManager implements ApplicationManager { + + private static final String PENDING = "Pending"; + private static final String RUNNING = "Running"; + private static final String FINISH = "Succeeded"; + private static final String FAILED = "Failed"; + private static final String UNKNOWN = "Unknown"; + + /** + * cache k8s client for same task + */ + private final Map<String, KubernetesClient> cacheClientMap = new ConcurrentHashMap<>(); + + @Override + public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException { + KubernetesApplicationManagerContext kubernetesApplicationManagerContext = + (KubernetesApplicationManagerContext) applicationManagerContext; + + boolean isKill; + String labelValue = kubernetesApplicationManagerContext.getLabelValue(); + FilterWatchListDeletable<Pod, PodList> watchList = getDriverPod(kubernetesApplicationManagerContext); + try { + if (getApplicationStatus(kubernetesApplicationManagerContext, watchList).isFailure()) { + log.error("Driver pod is in FAILED or UNKNOWN status."); + isKill = false; + } else { + watchList.delete(); + isKill = true; + } + } catch (Exception e) { + throw new TaskException("Failed to kill Kubernetes application with label " + labelValue, e); + } finally { + // remove client cache after killing application + removeCache(labelValue); + } + + return isKill; + } + + @Override + public ResourceManagerType getResourceManagerType() { + return ResourceManagerType.KUBERNETES; + } + + /** + * get driver pod + * + * @param kubernetesApplicationManagerContext + * @return + */ + private FilterWatchListDeletable<Pod, PodList> getDriverPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + KubernetesClient client = getClient(kubernetesApplicationManagerContext); + String labelValue = kubernetesApplicationManagerContext.getLabelValue(); + FilterWatchListDeletable<Pod, PodList> watchList = + client.pods() + .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) + .withLabel(UNIQUE_LABEL_NAME, labelValue); + List<Pod> podList = watchList.list().getItems(); + if (podList.size() != 1) { + log.warn("Expected driver pod 1, but get {}.", podList.size()); + } + return watchList; + } + + /** + * create client or get from cache map + * + * @param kubernetesApplicationManagerContext + * @return + */ + private KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + K8sTaskExecutionContext k8sTaskExecutionContext = + kubernetesApplicationManagerContext.getK8sTaskExecutionContext(); + Config config = Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml()); + return cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(), + key -> new DefaultKubernetesClient(config)); + } + + public void removeCache(String cacheKey) { + KubernetesClient client = cacheClientMap.get(cacheKey); + if (Objects.nonNull(client)) { + client.close(); + } + cacheClientMap.remove(cacheKey); + } + + /** + * get application execution status + * + * @param kubernetesApplicationManagerContext + * @return TaskExecutionStatus SUCCESS / FAILURE + * @throws TaskException + */ + public TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) throws TaskException { + return getApplicationStatus(kubernetesApplicationManagerContext, null); + } + + /** + * get application (driver pod) status + * + * @param kubernetesApplicationManagerContext + * @param watchList + * @return + * @throws TaskException + */ + private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerContext kubernetesApplicationManagerContext, + FilterWatchListDeletable<Pod, PodList> watchList) throws TaskException { + String phase; + try { + if (Objects.isNull(watchList)) { + watchList = getDriverPod(kubernetesApplicationManagerContext); + } + List<Pod> driverPod = watchList.list().getItems(); + if (!driverPod.isEmpty()) { + // cluster mode + Pod driver = driverPod.get(0); + phase = driver.getStatus().getPhase(); + } else { + // client mode + phase = FINISH; + } + } catch (Exception e) { + throw new TaskException("Failed to get Kubernetes application status", e); + } + + return phase.equals(FAILED) || phase.equals(UNKNOWN) ? TaskExecutionStatus.FAILURE + : TaskExecutionStatus.SUCCESS; + } + + /** + * collect pod's log + * + * @param kubernetesApplicationManagerContext + * @return + */ + public String collectPodLog(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + try (KubernetesClient client = getClient(kubernetesApplicationManagerContext)) { Review Comment: It should be closed in `killApplication`, cuz when master do failover, `collectPodLog` will not be called, i forgot to remove it, thx! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
