kezhenxu94 commented on code in PR #13550:
URL: 
https://github.com/apache/dolphinscheduler/pull/13550#discussion_r1111140454


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java:
##########
@@ -27,18 +27,29 @@ public class K8sTaskExecutionContext implements 
Serializable {
 
     private String configYaml;
 
+    private String namespace;
+
     public String getConfigYaml() {
         return configYaml;
     }
 
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }

Review Comment:
   I think this class can be immutable, can you revise this?



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.am;
+
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
+
+import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
+
+@Slf4j
+@AutoService(ApplicationManager.class)
+public class KubernetesApplicationManager implements ApplicationManager {
+
+    private static final String PENDING = "Pending";
+    private static final String RUNNING = "Running";
+    private static final String FINISH = "Succeeded";
+    private static final String FAILED = "Failed";
+    private static final String UNKNOWN = "Unknown";
+
+    /**
+     * cache k8s client for same task
+     */
+    private final Map<String, KubernetesClient> cacheClientMap = new 
ConcurrentHashMap<>();
+
+    @Override
+    public boolean killApplication(ApplicationManagerContext 
applicationManagerContext) throws TaskException {
+        KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext =
+                (KubernetesApplicationManagerContext) 
applicationManagerContext;
+
+        boolean isKill;
+        String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
+        FilterWatchListDeletable<Pod, PodList> watchList = 
getDriverPod(kubernetesApplicationManagerContext);
+        try {
+            if (getApplicationStatus(kubernetesApplicationManagerContext, 
watchList).isFailure()) {
+                log.error("Driver pod is in FAILED or UNKNOWN status.");
+                isKill = false;
+            } else {
+                watchList.delete();
+                isKill = true;
+            }
+        } catch (Exception e) {
+            throw new TaskException("Failed to kill Kubernetes application 
with label " + labelValue, e);
+        } finally {
+            // remove client cache after killing application
+            removeCache(labelValue);
+        }
+
+        return isKill;
+    }
+
+    @Override
+    public ResourceManagerType getResourceManagerType() {
+        return ResourceManagerType.KUBERNETES;
+    }
+
+    /**
+     * get driver pod
+     *
+     * @param kubernetesApplicationManagerContext
+     * @return
+     */
+    private FilterWatchListDeletable<Pod, PodList> 
getDriverPod(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+        KubernetesClient client = 
getClient(kubernetesApplicationManagerContext);
+        String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
+        FilterWatchListDeletable<Pod, PodList> watchList =
+                client.pods()
+                        
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
+                        .withLabel(UNIQUE_LABEL_NAME, labelValue);
+        List<Pod> podList = watchList.list().getItems();
+        if (podList.size() != 1) {
+            log.warn("Expected driver pod 1, but get {}.", podList.size());
+        }
+        return watchList;
+    }
+
+    /**
+     * create client or get from cache map
+     *
+     * @param kubernetesApplicationManagerContext
+     * @return
+     */
+    private KubernetesClient getClient(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+        K8sTaskExecutionContext k8sTaskExecutionContext =
+                
kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
+        Config config = 
Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml());
+        return 
cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
+                key -> new DefaultKubernetesClient(config));
+    }
+
+    public void removeCache(String cacheKey) {
+        KubernetesClient client = cacheClientMap.get(cacheKey);
+        if (Objects.nonNull(client)) {
+            client.close();
+        }
+        cacheClientMap.remove(cacheKey);

Review Comment:
   ```suggestion
           try (KubernetesClient ignored = cacheClientMap.remove(cacheKey)) {
           }
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.am;
+
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
+
+import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
+
+@Slf4j
+@AutoService(ApplicationManager.class)
+public class KubernetesApplicationManager implements ApplicationManager {
+
+    private static final String PENDING = "Pending";
+    private static final String RUNNING = "Running";
+    private static final String FINISH = "Succeeded";
+    private static final String FAILED = "Failed";
+    private static final String UNKNOWN = "Unknown";
+
+    /**
+     * cache k8s client for same task
+     */
+    private final Map<String, KubernetesClient> cacheClientMap = new 
ConcurrentHashMap<>();
+
+    @Override
+    public boolean killApplication(ApplicationManagerContext 
applicationManagerContext) throws TaskException {
+        KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext =
+                (KubernetesApplicationManagerContext) 
applicationManagerContext;
+
+        boolean isKill;
+        String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
+        FilterWatchListDeletable<Pod, PodList> watchList = 
getDriverPod(kubernetesApplicationManagerContext);
+        try {
+            if (getApplicationStatus(kubernetesApplicationManagerContext, 
watchList).isFailure()) {
+                log.error("Driver pod is in FAILED or UNKNOWN status.");
+                isKill = false;
+            } else {
+                watchList.delete();
+                isKill = true;
+            }
+        } catch (Exception e) {
+            throw new TaskException("Failed to kill Kubernetes application 
with label " + labelValue, e);
+        } finally {
+            // remove client cache after killing application
+            removeCache(labelValue);
+        }
+
+        return isKill;
+    }
+
+    @Override
+    public ResourceManagerType getResourceManagerType() {
+        return ResourceManagerType.KUBERNETES;
+    }
+
+    /**
+     * get driver pod
+     *
+     * @param kubernetesApplicationManagerContext
+     * @return
+     */
+    private FilterWatchListDeletable<Pod, PodList> 
getDriverPod(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+        KubernetesClient client = 
getClient(kubernetesApplicationManagerContext);
+        String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
+        FilterWatchListDeletable<Pod, PodList> watchList =
+                client.pods()
+                        
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
+                        .withLabel(UNIQUE_LABEL_NAME, labelValue);
+        List<Pod> podList = watchList.list().getItems();
+        if (podList.size() != 1) {
+            log.warn("Expected driver pod 1, but get {}.", podList.size());
+        }
+        return watchList;
+    }
+
+    /**
+     * create client or get from cache map
+     *
+     * @param kubernetesApplicationManagerContext
+     * @return
+     */
+    private KubernetesClient getClient(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+        K8sTaskExecutionContext k8sTaskExecutionContext =
+                
kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
+        Config config = 
Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml());
+        return 
cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
+                key -> new DefaultKubernetesClient(config));

Review Comment:
   ```suggestion
           return 
cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
                   key -> new 
DefaultKubernetesClient(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())));
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.am;
+
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+
+import lombok.Data;
+
+@Data
+public class KubernetesApplicationManagerContext implements 
ApplicationManagerContext {

Review Comment:
   Make this class immutable by adding `final` to the fields and add 
`@RequiredArgsConstructor` to replace the constructor?



##########
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceManagerType.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
+public enum ResourceManagerType {
+
+    YARN(0, "yarn"),
+    KUBERNETES(1, "kubernetes");
+
+    ResourceManagerType(int code, String descp) {
+        this.code = code;
+        this.descp = descp;
+    }
+
+    @EnumValue
+    private final int code;
+    private final String descp;
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getDescp() {
+        return descp;
+    }

Review Comment:
   I think all these are unnecessary



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.am;
+
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
+
+import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
+
+@Slf4j
+@AutoService(ApplicationManager.class)
+public class KubernetesApplicationManager implements ApplicationManager {
+
+    private static final String PENDING = "Pending";
+    private static final String RUNNING = "Running";
+    private static final String FINISH = "Succeeded";
+    private static final String FAILED = "Failed";
+    private static final String UNKNOWN = "Unknown";
+
+    /**
+     * cache k8s client for same task
+     */
+    private final Map<String, KubernetesClient> cacheClientMap = new 
ConcurrentHashMap<>();
+
+    @Override
+    public boolean killApplication(ApplicationManagerContext 
applicationManagerContext) throws TaskException {
+        KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext =
+                (KubernetesApplicationManagerContext) 
applicationManagerContext;
+
+        boolean isKill;
+        String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
+        FilterWatchListDeletable<Pod, PodList> watchList = 
getDriverPod(kubernetesApplicationManagerContext);
+        try {
+            if (getApplicationStatus(kubernetesApplicationManagerContext, 
watchList).isFailure()) {
+                log.error("Driver pod is in FAILED or UNKNOWN status.");
+                isKill = false;
+            } else {
+                watchList.delete();
+                isKill = true;
+            }
+        } catch (Exception e) {
+            throw new TaskException("Failed to kill Kubernetes application 
with label " + labelValue, e);
+        } finally {
+            // remove client cache after killing application
+            removeCache(labelValue);
+        }
+
+        return isKill;
+    }
+
+    @Override
+    public ResourceManagerType getResourceManagerType() {
+        return ResourceManagerType.KUBERNETES;
+    }
+
+    /**
+     * get driver pod
+     *
+     * @param kubernetesApplicationManagerContext
+     * @return
+     */
+    private FilterWatchListDeletable<Pod, PodList> 
getDriverPod(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+        KubernetesClient client = 
getClient(kubernetesApplicationManagerContext);
+        String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
+        FilterWatchListDeletable<Pod, PodList> watchList =
+                client.pods()
+                        
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
+                        .withLabel(UNIQUE_LABEL_NAME, labelValue);
+        List<Pod> podList = watchList.list().getItems();
+        if (podList.size() != 1) {
+            log.warn("Expected driver pod 1, but get {}.", podList.size());
+        }
+        return watchList;
+    }
+
+    /**
+     * create client or get from cache map
+     *
+     * @param kubernetesApplicationManagerContext
+     * @return
+     */
+    private KubernetesClient getClient(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+        K8sTaskExecutionContext k8sTaskExecutionContext =
+                
kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
+        Config config = 
Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml());
+        return 
cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
+                key -> new DefaultKubernetesClient(config));
+    }
+
+    public void removeCache(String cacheKey) {
+        KubernetesClient client = cacheClientMap.get(cacheKey);
+        if (Objects.nonNull(client)) {
+            client.close();
+        }
+        cacheClientMap.remove(cacheKey);
+    }
+
+    /**
+     * get application execution status
+     *
+     * @param kubernetesApplicationManagerContext
+     * @return TaskExecutionStatus  SUCCESS / FAILURE
+     * @throws TaskException
+     */
+    public TaskExecutionStatus 
getApplicationStatus(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) throws TaskException {
+        return getApplicationStatus(kubernetesApplicationManagerContext, null);
+    }
+
+    /**
+     * get application (driver pod) status
+     *
+     * @param kubernetesApplicationManagerContext
+     * @param watchList
+     * @return
+     * @throws TaskException
+     */
+    private TaskExecutionStatus 
getApplicationStatus(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext,
+                                                     
FilterWatchListDeletable<Pod, PodList> watchList) throws TaskException {
+        String phase;
+        try {
+            if (Objects.isNull(watchList)) {
+                watchList = getDriverPod(kubernetesApplicationManagerContext);
+            }
+            List<Pod> driverPod = watchList.list().getItems();
+            if (!driverPod.isEmpty()) {
+                // cluster mode
+                Pod driver = driverPod.get(0);
+                phase = driver.getStatus().getPhase();
+            } else {
+                // client mode
+                phase = FINISH;
+            }
+        } catch (Exception e) {
+            throw new TaskException("Failed to get Kubernetes application 
status", e);
+        }
+
+        return phase.equals(FAILED) || phase.equals(UNKNOWN) ? 
TaskExecutionStatus.FAILURE
+                : TaskExecutionStatus.SUCCESS;
+    }
+
+    /**
+     * collect pod's log
+     *
+     * @param kubernetesApplicationManagerContext
+     * @return
+     */
+    public String collectPodLog(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+        try (KubernetesClient client = 
getClient(kubernetesApplicationManagerContext)) {

Review Comment:
   Here the `client` is closed automatically, but in `killApplication` (line 
192), the `client` will be closed again, (`killApplication` --> `removeCache`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to