This is an automated email from the ASF dual-hosted git repository.
pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 15a699d3 SUBMARINE-1288. Fix issue that agent can not stop after
Kubeflow job is finished
15a699d3 is described below
commit 15a699d3b20afb99128c629cda22e6d65dbb9ccf
Author: cdmikechen <[email protected]>
AuthorDate: Sun Jun 26 15:31:10 2022 +0800
SUBMARINE-1288. Fix issue that agent can not stop after Kubeflow job is
finished
### What is this PR for?
With new training operator replaced, the Kubeflow CRD status conditions
reason description has changed. We need to modify the processing of the state,
so that the agent can stop correctly.
### What type of PR is it?
Bug Fix
### Todos
* [x] - Change the reason name in training operator
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-1288
### How should this be tested?
<!--
* First time? Setup Travis CI as described on
https://submarine.apache.org/contribution/contributions.html#continuous-integration
* Strongly recommended: add automated unit tests for any new or changed
behavior
* Outline any manual steps to test the PR here.
-->
### Screenshots (if appropriate)
No
### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? No
Author: cdmikechen <[email protected]>
Signed-off-by: Kevin <[email protected]>
Closes #972 from cdmikechen/SUBMARINE-1288 and squashes the following
commits:
e38ca1a3 [cdmikechen] Fix length checkstyle
259105ff [cdmikechen] remove istio sidecar
3a61aa03 [cdmikechen] add object null check
d2ffcbf2 [cdmikechen] SUBMARINE-1288. Fix issue that agent can not stop
after Kubeflow job is finished
---
.../k8s/agent/handler/PyTorchJobHandler.java | 51 ++++++++++++----------
.../server/k8s/agent/handler/TFJobHandler.java | 50 +++++++++++----------
.../server/submitter/k8s/model/AgentPod.java | 27 ++++++------
3 files changed, 69 insertions(+), 59 deletions(-)
diff --git
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
index 6e744b57..ffcc6246 100644
---
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
+++
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/PyTorchJobHandler.java
@@ -21,6 +21,7 @@ package org.apache.submarine.server.k8s.agent.handler;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
import org.apache.submarine.server.api.common.CustomResourceType;
import org.apache.submarine.server.api.experiment.Experiment;
@@ -31,7 +32,6 @@ import
org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.openapi.ApiException;
@@ -69,7 +69,7 @@ public class PyTorchJobHandler extends CustomResourceHandler {
String fieldSelector = String.format("involvedObject.name=%s",
resourceId);
LOG.info("fieldSelector:" + fieldSelector);
Call call = coreV1Api.listNamespacedEventCall(namespace, null, null,
null, fieldSelector,
- null, null, null, null, null, true, null);
+ null, null, null, null, null, true, null);
watcher = Watch.createWatch(client, call, new
TypeToken<Response<CoreV1Event>>(){}.getType());
} catch (ApiException e) {
@@ -80,30 +80,35 @@ public class PyTorchJobHandler extends
CustomResourceHandler {
@Override
public void run() {
- Gson gson = new Gson();
while (true) {
for (Response<CoreV1Event> event: watcher) {
- PyTorchJob job = pytorchJobClient.get(this.namespace,
this.resourceId).getObject();
- List<V1JobCondition> conditionList = job.getStatus().getConditions();
- V1JobCondition lastCondition = conditionList.get(conditionList.size()
- 1);
- Experiment experiment = MLJobConverter.toJobFromMLJob(job);
-
- this.restClient.callStatusUpdate(CustomResourceType.PyTorchJob,
resourceId, experiment);
- LOG.info(String.format("receiving condition:%s",
lastCondition.getReason()));
- LOG.info(String.format("current status of PyTorchjob:%s is %s",
resourceId, experiment.getStatus()));
-
- switch (lastCondition.getReason()) {
- case "PyTorchJobSucceeded":
- LOG.info(String.format("PyTorchjob:%s is succeeded, exit",
this.resourceId));
- return;
- case "PyTorchJobFailed":
- LOG.info(String.format("PyTorchjob:%s is failed, exit",
this.resourceId));
- return;
- default:
- break;
+ try {
+ PyTorchJob job = pytorchJobClient.get(this.namespace,
this.resourceId).getObject();
+ List<V1JobCondition> conditionList = job.getStatus().getConditions();
+ if (conditionList == null || conditionList.isEmpty()) continue;
+ V1JobCondition lastCondition =
conditionList.get(conditionList.size() - 1);
+ Experiment experiment = MLJobConverter.toJobFromMLJob(job);
+
+ this.restClient.callStatusUpdate(CustomResourceType.PyTorchJob,
resourceId, experiment);
+ LOG.info(String.format("receiving condition:%s",
lastCondition.getReason()));
+ LOG.info(String.format("current status of PyTorchjob:%s is %s",
resourceId,
+ experiment.getStatus()));
+
+ // The reason value can refer to
https://github.com/kubeflow/common/blob/master/pkg/util/status.go
+ switch (Objects.requireNonNull(lastCondition.getReason())) {
+ case "JobSucceeded":
+ LOG.info(String.format("PyTorchjob:%s is succeeded, exit",
this.resourceId));
+ return;
+ case "JobFailed":
+ LOG.info(String.format("PyTorchjob:%s is failed, exit",
this.resourceId));
+ return;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while processing the PyTorch event! " +
e.getMessage(), e);
}
-
- }
+ }
}
}
}
diff --git
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
index 13b157f0..dfd870ca 100644
---
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
+++
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/TFJobHandler.java
@@ -21,6 +21,7 @@ package org.apache.submarine.server.k8s.agent.handler;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
import org.apache.submarine.server.api.common.CustomResourceType;
import org.apache.submarine.server.api.experiment.Experiment;
@@ -50,7 +51,6 @@ public class TFJobHandler extends CustomResourceHandler {
super();
}
-
@Override
public void init(String serverHost, Integer serverPort,
String namespace, String crName, String resourceId) {
@@ -68,7 +68,7 @@ public class TFJobHandler extends CustomResourceHandler {
String fieldSelector = String.format("involvedObject.name=%s",
resourceId);
LOG.info("fieldSelector:" + fieldSelector);
Call call = coreV1Api.listNamespacedEventCall(namespace, null, null,
null, fieldSelector,
- null, null, null, null, null, true, null);
+ null, null, null, null, null, true, null);
watcher = Watch.createWatch(client, call, new
TypeToken<Response<CoreV1Event>>(){}.getType());
} catch (ApiException e) {
@@ -79,30 +79,34 @@ public class TFJobHandler extends CustomResourceHandler {
@Override
public void run() {
-
while (true) {
for (Response<CoreV1Event> event: watcher) {
- TFJob job = tfJobClient.get(this.namespace,
this.resourceId).getObject();
- List<V1JobCondition> conditionList = job.getStatus().getConditions();
- V1JobCondition lastCondition = conditionList.get(conditionList.size()
- 1);
- Experiment experiment = MLJobConverter.toJobFromMLJob(job);
-
- this.restClient.callStatusUpdate(CustomResourceType.TFJob, resourceId,
experiment);
- LOG.info(String.format("receiving condition:%s",
lastCondition.getReason()));
- LOG.info(String.format("current status of tfjob:%s is %s", resourceId,
experiment.getStatus()));
-
- switch (lastCondition.getReason()) {
- case "TFJobSucceeded":
- LOG.info(String.format("TfJob:%s is succeeded, exit",
this.resourceId));
- return;
- case "TFJobFailed":
- LOG.info(String.format("TfJob:%s is failed, exit",
this.resourceId));
- return;
- default:
- break;
+ try {
+ TFJob job = tfJobClient.get(this.namespace,
this.resourceId).getObject();
+ List<V1JobCondition> conditionList = job.getStatus().getConditions();
+ if (conditionList == null || conditionList.isEmpty()) continue;
+ V1JobCondition lastCondition =
conditionList.get(conditionList.size() - 1);
+ Experiment experiment = MLJobConverter.toJobFromMLJob(job);
+
+ this.restClient.callStatusUpdate(CustomResourceType.TFJob,
resourceId, experiment);
+ LOG.info(String.format("receiving condition:%s",
lastCondition.getReason()));
+ LOG.info(String.format("current status of tfjob:%s is %s",
resourceId, experiment.getStatus()));
+
+ // The reason value can refer to
https://github.com/kubeflow/common/blob/master/pkg/util/status.go
+ switch (Objects.requireNonNull(lastCondition.getReason())) {
+ case "JobSucceeded":
+ LOG.info(String.format("TfJob:%s is succeeded, exit",
this.resourceId));
+ return;
+ case "JobFailed":
+ LOG.info(String.format("TfJob:%s is failed, exit",
this.resourceId));
+ return;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while processing the TfJob event! " +
e.getMessage(), e);
}
-
- }
+ }
}
}
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
index 5b34be03..2455ce53 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/AgentPod.java
@@ -20,18 +20,16 @@
package org.apache.submarine.server.submitter.k8s.model;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import io.kubernetes.client.openapi.ApiException;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.server.api.common.CustomResourceType;
+import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1EnvVar;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodSpec;
import org.apache.submarine.server.submitter.k8s.client.K8sClient;
@@ -43,7 +41,7 @@ public class AgentPod extends V1Pod implements
K8sResource<AgentPod> {
private static final Logger LOG = LoggerFactory.getLogger(AgentPod.class);
- private static SubmarineConfiguration conf =
SubmarineConfiguration.getInstance();
+ private static final SubmarineConfiguration conf =
SubmarineConfiguration.getInstance();
private static final String AGENT_IMAGE =
"apache/submarine:agent-0.8.0-SNAPSHOT";
private static final String CONTAINER_NAME = "agent";
@@ -51,13 +49,16 @@ public class AgentPod extends V1Pod implements
K8sResource<AgentPod> {
CustomResourceType type,
String resourceId) {
super();
- V1ObjectMeta meta = new V1ObjectMeta();
- Map<String, String> labels = new HashMap<>();
- labels.put("app", type.toString().toLowerCase());
- meta.setName(getNormalizePodName(type, name, resourceId));
- meta.setNamespace(namespace);
- meta.setLabels(labels);
- this.setMetadata(meta);
+
+ V1ObjectMetaBuilder metaBuilder = new V1ObjectMetaBuilder();
+ metaBuilder.withName(getNormalizePodName(type, name, resourceId))
+ .withNamespace(namespace)
+ .addToLabels("app", type.toString().toLowerCase())
+ // There is no need to add istio sidecar. Otherwise, the pod may not
end normally
+ //
https://istio.io/latest/docs/setup/additional-setup/sidecar-injection/
+ // Controlling the injection policy Section
+ .addToAnnotations("sidecar.istio.io/inject", "false");
+ this.setMetadata(metaBuilder.build());
V1PodSpec spec = new V1PodSpec();
List<V1Container> containers = spec.getContainers();
@@ -133,7 +134,7 @@ public class AgentPod extends V1Pod implements
K8sResource<AgentPod> {
@Override
public AgentPod replace(K8sClient api) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]