This is an automated email from the ASF dual-hosted git repository.

pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 15a699d3 SUBMARINE-1288. Fix issue that agent can not stop after 
Kubeflow job is finished
15a699d3 is described below

commit 15a699d3b20afb99128c629cda22e6d65dbb9ccf
Author: cdmikechen <[email protected]>
AuthorDate: Sun Jun 26 15:31:10 2022 +0800

    SUBMARINE-1288. Fix issue that agent can not stop after Kubeflow job is 
finished
    
    ### What is this PR for?
    With new training operator replaced, the Kubeflow CRD status conditions 
reason description has changed. We need to modify the processing of the state, 
so that the agent can stop correctly.
    
    ### What type of PR is it?
    Bug Fix
    
    ### Todos
    * [x] - Change the reason name in training operator
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-1288
    
    ### How should this be tested?
    <!--
    * First time? Setup Travis CI as described on 
https://submarine.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed 
behavior
    * Outline any manual steps to test the PR here.
    -->
    ### Screenshots (if appropriate)
    No
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions? No
    * Does this need new documentation? No
    
    Author: cdmikechen <[email protected]>
    
    Signed-off-by: Kevin <[email protected]>
    
    Closes #972 from cdmikechen/SUBMARINE-1288 and squashes the following 
commits:
    
    e38ca1a3 [cdmikechen] Fix length checkstyle
    259105ff [cdmikechen] remove istio sidecar
    3a61aa03 [cdmikechen] add object null check
    d2ffcbf2 [cdmikechen] SUBMARINE-1288. Fix issue that agent can not stop 
after Kubeflow job is finished
---
 .../k8s/agent/handler/PyTorchJobHandler.java       | 51 ++++++++++++----------
 .../server/k8s/agent/handler/TFJobHandler.java     | 50 +++++++++++----------
 .../server/submitter/k8s/model/AgentPod.java       | 27 ++++++------
 3 files changed, 69 insertions(+), 59 deletions(-)

diff --git 
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
 
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
index 6e744b57..ffcc6246 100644
--- 
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
+++ 
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
@@ -21,6 +21,7 @@ package org.apache.submarine.server.k8s.agent.handler;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.submarine.server.api.common.CustomResourceType;
 import org.apache.submarine.server.api.experiment.Experiment;
@@ -31,7 +32,6 @@ import 
org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
 import io.kubernetes.client.openapi.ApiException;
@@ -69,7 +69,7 @@ public class PyTorchJobHandler extends CustomResourceHandler {
       String fieldSelector = String.format("involvedObject.name=%s", 
resourceId);
       LOG.info("fieldSelector:" + fieldSelector);
       Call call =  coreV1Api.listNamespacedEventCall(namespace, null, null, 
null, fieldSelector,
-            null, null, null, null, null, true, null);        
+            null, null, null, null, null, true, null);
 
       watcher = Watch.createWatch(client, call, new 
TypeToken<Response<CoreV1Event>>(){}.getType());
     } catch (ApiException e) {
@@ -80,30 +80,35 @@ public class PyTorchJobHandler extends 
CustomResourceHandler {
 
   @Override
   public void run() {
-    Gson gson = new Gson();
     while (true) {
       for (Response<CoreV1Event> event: watcher) {
-        PyTorchJob job = pytorchJobClient.get(this.namespace, 
this.resourceId).getObject();  
-        List<V1JobCondition> conditionList = job.getStatus().getConditions();
-        V1JobCondition lastCondition = conditionList.get(conditionList.size() 
- 1);
-        Experiment experiment = MLJobConverter.toJobFromMLJob(job);
-           
-        this.restClient.callStatusUpdate(CustomResourceType.PyTorchJob, 
resourceId, experiment);
-        LOG.info(String.format("receiving condition:%s", 
lastCondition.getReason()));
-        LOG.info(String.format("current status of PyTorchjob:%s is %s", 
resourceId, experiment.getStatus()));
-        
-        switch (lastCondition.getReason()) {
-          case "PyTorchJobSucceeded":
-            LOG.info(String.format("PyTorchjob:%s is succeeded, exit", 
this.resourceId));
-            return;
-          case "PyTorchJobFailed":
-            LOG.info(String.format("PyTorchjob:%s is failed, exit", 
this.resourceId));
-            return;
-          default:    
-            break;    
+        try {
+          PyTorchJob job = pytorchJobClient.get(this.namespace, 
this.resourceId).getObject();
+          List<V1JobCondition> conditionList = job.getStatus().getConditions();
+          if (conditionList == null || conditionList.isEmpty()) continue;
+          V1JobCondition lastCondition = 
conditionList.get(conditionList.size() - 1);
+          Experiment experiment = MLJobConverter.toJobFromMLJob(job);
+
+          this.restClient.callStatusUpdate(CustomResourceType.PyTorchJob, 
resourceId, experiment);
+          LOG.info(String.format("receiving condition:%s", 
lastCondition.getReason()));
+          LOG.info(String.format("current status of PyTorchjob:%s is %s", 
resourceId,
+              experiment.getStatus()));
+
+          // The reason value can refer to 
https://github.com/kubeflow/common/blob/master/pkg/util/status.go
+          switch (Objects.requireNonNull(lastCondition.getReason())) {
+            case "JobSucceeded":
+              LOG.info(String.format("PyTorchjob:%s is succeeded, exit", 
this.resourceId));
+              return;
+            case "JobFailed":
+              LOG.info(String.format("PyTorchjob:%s is failed, exit", 
this.resourceId));
+              return;
+            default:
+              break;
+          }
+        } catch (Exception e) {
+          LOG.error("Exception while processing the PyTorch event! " + 
e.getMessage(), e);
         }
-        
-      }   
+      }
     }
   }
 }
diff --git 
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
 
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
index 13b157f0..dfd870ca 100644
--- 
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
+++ 
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
@@ -21,6 +21,7 @@ package org.apache.submarine.server.k8s.agent.handler;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.submarine.server.api.common.CustomResourceType;
 import org.apache.submarine.server.api.experiment.Experiment;
@@ -50,7 +51,6 @@ public class TFJobHandler extends CustomResourceHandler {
     super();
   }
 
-
   @Override
   public void init(String serverHost, Integer serverPort,
           String namespace, String crName, String resourceId) {
@@ -68,7 +68,7 @@ public class TFJobHandler extends CustomResourceHandler {
       String fieldSelector = String.format("involvedObject.name=%s", 
resourceId);
       LOG.info("fieldSelector:" + fieldSelector);
       Call call =  coreV1Api.listNamespacedEventCall(namespace, null, null, 
null, fieldSelector,
-            null, null, null, null, null, true, null);        
+            null, null, null, null, null, true, null);
 
       watcher = Watch.createWatch(client, call, new 
TypeToken<Response<CoreV1Event>>(){}.getType());
     } catch (ApiException e) {
@@ -79,30 +79,34 @@ public class TFJobHandler extends CustomResourceHandler {
 
   @Override
   public void run() {
-
     while (true) {
       for (Response<CoreV1Event> event: watcher) {
-        TFJob job = tfJobClient.get(this.namespace, 
this.resourceId).getObject();  
-        List<V1JobCondition> conditionList = job.getStatus().getConditions();
-        V1JobCondition lastCondition = conditionList.get(conditionList.size() 
- 1);
-        Experiment experiment = MLJobConverter.toJobFromMLJob(job);
-        
-        this.restClient.callStatusUpdate(CustomResourceType.TFJob, resourceId, 
experiment);
-        LOG.info(String.format("receiving condition:%s", 
lastCondition.getReason()));
-        LOG.info(String.format("current status of tfjob:%s is %s", resourceId, 
experiment.getStatus()));
-        
-        switch (lastCondition.getReason()) {
-          case "TFJobSucceeded":
-            LOG.info(String.format("TfJob:%s is succeeded, exit", 
this.resourceId));
-            return;
-          case "TFJobFailed":
-            LOG.info(String.format("TfJob:%s is failed, exit", 
this.resourceId));
-            return;
-          default:    
-            break;    
+        try {
+          TFJob job = tfJobClient.get(this.namespace, 
this.resourceId).getObject();
+          List<V1JobCondition> conditionList = job.getStatus().getConditions();
+          if (conditionList == null || conditionList.isEmpty()) continue;
+          V1JobCondition lastCondition = 
conditionList.get(conditionList.size() - 1);
+          Experiment experiment = MLJobConverter.toJobFromMLJob(job);
+
+          this.restClient.callStatusUpdate(CustomResourceType.TFJob, 
resourceId, experiment);
+          LOG.info(String.format("receiving condition:%s", 
lastCondition.getReason()));
+          LOG.info(String.format("current status of tfjob:%s is %s", 
resourceId, experiment.getStatus()));
+
+          // The reason value can refer to 
https://github.com/kubeflow/common/blob/master/pkg/util/status.go
+          switch (Objects.requireNonNull(lastCondition.getReason())) {
+            case "JobSucceeded":
+              LOG.info(String.format("TfJob:%s is succeeded, exit", 
this.resourceId));
+              return;
+            case "JobFailed":
+              LOG.info(String.format("TfJob:%s is failed, exit", 
this.resourceId));
+              return;
+            default:
+              break;
+          }
+        } catch (Exception e) {
+          LOG.error("Exception while processing the TfJob event! " + 
e.getMessage(), e);
         }
-        
-      }   
+      }
     }
   }
 }
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
index 5b34be03..2455ce53 100644
--- 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
@@ -20,18 +20,16 @@
 package org.apache.submarine.server.submitter.k8s.model;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import io.kubernetes.client.openapi.ApiException;
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
 import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
 import org.apache.submarine.server.api.common.CustomResourceType;
 
+import io.kubernetes.client.openapi.ApiException;
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1EnvVar;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
 import io.kubernetes.client.openapi.models.V1Pod;
 import io.kubernetes.client.openapi.models.V1PodSpec;
 import org.apache.submarine.server.submitter.k8s.client.K8sClient;
@@ -43,7 +41,7 @@ public class AgentPod extends V1Pod implements 
K8sResource<AgentPod> {
 
   private static final Logger LOG = LoggerFactory.getLogger(AgentPod.class);
 
-  private static SubmarineConfiguration conf = 
SubmarineConfiguration.getInstance();
+  private static final SubmarineConfiguration conf = 
SubmarineConfiguration.getInstance();
   private static final String AGENT_IMAGE = 
"apache/submarine:agent-0.8.0-SNAPSHOT";
   private static final String CONTAINER_NAME = "agent";
 
@@ -51,13 +49,16 @@ public class AgentPod extends V1Pod implements 
K8sResource<AgentPod> {
                   CustomResourceType type,
                   String resourceId) {
     super();
-    V1ObjectMeta meta = new V1ObjectMeta();
-    Map<String, String> labels = new HashMap<>();
-    labels.put("app", type.toString().toLowerCase());
-    meta.setName(getNormalizePodName(type, name, resourceId));
-    meta.setNamespace(namespace);
-    meta.setLabels(labels);
-    this.setMetadata(meta);
+
+    V1ObjectMetaBuilder metaBuilder = new V1ObjectMetaBuilder();
+    metaBuilder.withName(getNormalizePodName(type, name, resourceId))
+        .withNamespace(namespace)
+        .addToLabels("app", type.toString().toLowerCase())
+        // There is no need to add istio sidecar. Otherwise, the pod may not 
end normally
+        // 
https://istio.io/latest/docs/setup/additional-setup/sidecar-injection/
+        // Controlling the injection policy Section
+        .addToAnnotations("sidecar.istio.io/inject", "false");
+    this.setMetadata(metaBuilder.build());
 
     V1PodSpec spec = new V1PodSpec();
     List<V1Container> containers = spec.getContainers();
@@ -133,7 +134,7 @@ public class AgentPod extends V1Pod implements 
K8sResource<AgentPod> {
 
   @Override
   public AgentPod replace(K8sClient api) {
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to