This is an automated email from the ASF dual-hosted git repository.
pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 9c5993d SUBMARINE-1188. Using generic api to replace old version api
in k8s submitter
9c5993d is described below
commit 9c5993d1c42a087896c6f0b2881650804988fe16
Author: noidname01 <[email protected]>
AuthorDate: Wed Feb 23 13:26:39 2022 +0800
SUBMARINE-1188. Using generic api to replace old version api in k8s
submitter
### What is this PR for?
With new api, we can simplified the code and make it more readable
### What type of PR is it?
[Improvement ]
### Todos
None
### What is the Jira issue?
https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-1188
### How should this be tested?
Run submarine-k8s test to test the whole function.
### Screenshots (if appropriate)
None
### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? No
Author: noidname01 <[email protected]>
Signed-off-by: Kevin <[email protected]>
Closes #887 from noidname01/SUBMARINE-1188 and squashes the following
commits:
12799f82 [noidname01] notebook handler
b690688c [noidname01] delete comments
1817b633 [noidname01] fix some bug
6ee58568 [noidname01] modify the Info
e4b3a353 [noidname01] upgrade most of api in submitter
3c9c6174 [noidname01] WIP
---
.../submarine/serve/istio/IstioVirtualService.java | 3 +-
.../serve/istio/IstioVirtualServiceList.java | 45 +-
.../submarine/serve/seldon/SeldonDeployment.java | 3 +-
.../serve/seldon/SeldonDeploymentList.java | 45 +-
.../experiment/{TensorboardInfo.java => Info.java} | 12 +-
.../server/api/experiment/MlflowInfo.java | 17 +-
.../server/api/experiment/TensorboardInfo.java | 25 +-
.../server/k8s/agent/handler/NotebookHandler.java | 84 ++--
.../server/submitter/k8s/K8sSubmitter.java | 460 +++++++++------------
.../server/submitter/k8s/model/MLJob.java | 3 +-
.../server/submitter/k8s/model/NotebookCR.java | 5 +-
.../server/submitter/k8s/model/NotebookCRList.java | 12 +-
.../k8s/model/ingressroute/IngressRoute.java | 3 +-
.../IngressRouteList.java} | 44 +-
.../PyTorchJobList.java} | 44 +-
.../k8s/model/pytorchjob/PyTorchJobSpec.java | 12 +-
.../{NotebookCRList.java => tfjob/TFJobList.java} | 43 +-
.../k8s/model/tfjob/TFJobReplicaType.java | 1 -
.../submitter/k8s/model/tfjob/TFJobSpec.java | 12 +-
.../submitter/k8s/parser/ExperimentSpecParser.java | 7 +-
.../server/submitter/k8s/util/MLJobConverter.java | 9 +-
21 files changed, 344 insertions(+), 545 deletions(-)
diff --git
a/submarine-serve/src/main/java/org/apache/submarine/serve/istio/IstioVirtualService.java
b/submarine-serve/src/main/java/org/apache/submarine/serve/istio/IstioVirtualService.java
index 955e112..54d5526 100644
---
a/submarine-serve/src/main/java/org/apache/submarine/serve/istio/IstioVirtualService.java
+++
b/submarine-serve/src/main/java/org/apache/submarine/serve/istio/IstioVirtualService.java
@@ -19,13 +19,14 @@
package org.apache.submarine.serve.istio;
import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import org.apache.submarine.serve.utils.IstioConstants;
import java.util.ArrayList;
import java.util.List;
-public class IstioVirtualService {
+public class IstioVirtualService implements KubernetesObject {
@SerializedName("apiVersion")
private String apiVersion = IstioConstants.API_VERSION;
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
b/submarine-serve/src/main/java/org/apache/submarine/serve/istio/IstioVirtualServiceList.java
similarity index 62%
copy from
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
copy to
submarine-serve/src/main/java/org/apache/submarine/serve/istio/IstioVirtualServiceList.java
index 3c236e6..ea33a2b 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
+++
b/submarine-serve/src/main/java/org/apache/submarine/serve/istio/IstioVirtualServiceList.java
@@ -17,27 +17,18 @@
* under the License.
*/
-package org.apache.submarine.server.submitter.k8s.model;
+package org.apache.submarine.serve.istio;
import com.google.gson.annotations.SerializedName;
+import java.util.List;
import io.kubernetes.client.common.KubernetesListObject;
+import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ListMeta;
+import org.apache.submarine.serve.utils.IstioConstants;
-import java.util.List;
-
-public class NotebookCRList implements KubernetesListObject{
-
- public static final String CRD_NOTEBOOK_VERSION_V1 = "v1alpha1";
- public static final String CRD_NOTEBOOK_GROUP_V1 = "kubeflow.org";
- public static final String CRD_APIVERSION_V1 = CRD_NOTEBOOK_GROUP_V1 + "/" +
CRD_NOTEBOOK_VERSION_V1;
- public static final String CRD_NOTEBOOK_LIST_KIND_V1 = "NotebookList";
+public class IstioVirtualServiceList implements KubernetesListObject{
- public NotebookCRList() {
- setApiVersion(CRD_APIVERSION_V1);
- setKind(CRD_NOTEBOOK_LIST_KIND_V1);
- }
-
@SerializedName("apiVersion")
private String apiVersion;
@@ -46,35 +37,27 @@ public class NotebookCRList implements KubernetesListObject{
@SerializedName("metadata")
private V1ListMeta metadata;
-
+
@SerializedName("items")
- private List<NotebookCR> items;
-
- public void setApiVersion(String apiVersion) {
- this.apiVersion = apiVersion;
- }
+ private List<IstioVirtualService> items;
- public void setKind(String kind) {
- this.kind = kind;
+ @Override
+ public V1ListMeta getMetadata() {
+ return metadata;
}
- public List<NotebookCR> getItems() {
+ @Override
+ public List<? extends KubernetesObject> getItems() {
return items;
}
@Override
public String getApiVersion() {
- return apiVersion;
+ return IstioConstants.API_VERSION;
}
@Override
public String getKind() {
-
- return kind;
- }
-
- @Override
- public V1ListMeta getMetadata() {
- return metadata;
+ return IstioConstants.KIND + "List";
}
}
diff --git
a/submarine-serve/src/main/java/org/apache/submarine/serve/seldon/SeldonDeployment.java
b/submarine-serve/src/main/java/org/apache/submarine/serve/seldon/SeldonDeployment.java
index 30c29dd..6f9a610 100644
---
a/submarine-serve/src/main/java/org/apache/submarine/serve/seldon/SeldonDeployment.java
+++
b/submarine-serve/src/main/java/org/apache/submarine/serve/seldon/SeldonDeployment.java
@@ -19,13 +19,14 @@
package org.apache.submarine.serve.seldon;
import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import org.apache.submarine.serve.utils.SeldonConstants;
import java.util.ArrayList;
import java.util.List;
-public class SeldonDeployment {
+public class SeldonDeployment implements KubernetesObject {
@SerializedName("apiVersion")
private String apiVersion = SeldonConstants.API_VERSION;
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
b/submarine-serve/src/main/java/org/apache/submarine/serve/seldon/SeldonDeploymentList.java
similarity index 62%
copy from
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
copy to
submarine-serve/src/main/java/org/apache/submarine/serve/seldon/SeldonDeploymentList.java
index 3c236e6..318542e 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
+++
b/submarine-serve/src/main/java/org/apache/submarine/serve/seldon/SeldonDeploymentList.java
@@ -17,27 +17,18 @@
* under the License.
*/
-package org.apache.submarine.server.submitter.k8s.model;
+package org.apache.submarine.serve.seldon;
import com.google.gson.annotations.SerializedName;
+import java.util.List;
import io.kubernetes.client.common.KubernetesListObject;
+import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ListMeta;
+import org.apache.submarine.serve.utils.SeldonConstants;
-import java.util.List;
-
-public class NotebookCRList implements KubernetesListObject{
-
- public static final String CRD_NOTEBOOK_VERSION_V1 = "v1alpha1";
- public static final String CRD_NOTEBOOK_GROUP_V1 = "kubeflow.org";
- public static final String CRD_APIVERSION_V1 = CRD_NOTEBOOK_GROUP_V1 + "/" +
CRD_NOTEBOOK_VERSION_V1;
- public static final String CRD_NOTEBOOK_LIST_KIND_V1 = "NotebookList";
+public class SeldonDeploymentList implements KubernetesListObject{
- public NotebookCRList() {
- setApiVersion(CRD_APIVERSION_V1);
- setKind(CRD_NOTEBOOK_LIST_KIND_V1);
- }
-
@SerializedName("apiVersion")
private String apiVersion;
@@ -46,35 +37,27 @@ public class NotebookCRList implements KubernetesListObject{
@SerializedName("metadata")
private V1ListMeta metadata;
-
+
@SerializedName("items")
- private List<NotebookCR> items;
-
- public void setApiVersion(String apiVersion) {
- this.apiVersion = apiVersion;
- }
+ private List<SeldonDeployment> items;
- public void setKind(String kind) {
- this.kind = kind;
+ @Override
+ public V1ListMeta getMetadata() {
+ return metadata;
}
- public List<NotebookCR> getItems() {
+ @Override
+ public List<? extends KubernetesObject> getItems() {
return items;
}
@Override
public String getApiVersion() {
- return apiVersion;
+ return SeldonConstants.API_VERSION;
}
@Override
public String getKind() {
-
- return kind;
- }
-
- @Override
- public V1ListMeta getMetadata() {
- return metadata;
+ return SeldonConstants.KIND + "List";
}
}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/TensorboardInfo.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/Info.java
similarity index 83%
copy from
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/TensorboardInfo.java
copy to
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/Info.java
index 2c29ab7..42a3589 100644
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/TensorboardInfo.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/Info.java
@@ -19,11 +19,11 @@
package org.apache.submarine.server.api.experiment;
-public class TensorboardInfo {
+public class Info {
public boolean available;
public String url;
- public TensorboardInfo(boolean available, String url) {
+ public Info(boolean available, String url) {
this.available = available;
this.url = url;
}
@@ -43,12 +43,4 @@ public class TensorboardInfo {
public void setUrl(String url) {
this.url = url;
}
-
- @Override
- public String toString() {
- return "TensorboardInfo{" +
- "available=" + available +
- ", url='" + url + '\'' +
- '}';
- }
}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/MlflowInfo.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/MlflowInfo.java
index 3605a39..b85b83f 100644
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/MlflowInfo.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/MlflowInfo.java
@@ -19,21 +19,10 @@
package org.apache.submarine.server.api.experiment;
-public class MlflowInfo {
- public boolean available;
- public String url;
+public class MlflowInfo extends Info{
- public MlflowInfo(boolean available, String url) {
- this.available = available;
- this.url = url;
- }
-
- public String getUrl() {
- return url;
- }
-
- public void setUrl(String url) {
- this.url = url;
+ public MlflowInfo(Info info) {
+ super(info.isAvailable(), info.getUrl());
}
@Override
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/TensorboardInfo.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/TensorboardInfo.java
index 2c29ab7..96002eb 100644
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/TensorboardInfo.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/experiment/TensorboardInfo.java
@@ -19,29 +19,10 @@
package org.apache.submarine.server.api.experiment;
-public class TensorboardInfo {
- public boolean available;
- public String url;
+public class TensorboardInfo extends Info{
- public TensorboardInfo(boolean available, String url) {
- this.available = available;
- this.url = url;
- }
-
- public boolean isAvailable() {
- return available;
- }
-
- public void setAvailable(boolean available) {
- this.available = available;
- }
-
- public String getUrl() {
- return url;
- }
-
- public void setUrl(String url) {
- this.url = url;
+ public TensorboardInfo(Info info) {
+ super(info.isAvailable(), info.getUrl());
}
@Override
diff --git
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java
index 2db0fe2..79611bf 100644
---
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java
+++
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java
@@ -21,32 +21,36 @@ package org.apache.submarine.server.k8s.agent.handler;
import java.io.IOException;
+import io.kubernetes.client.openapi.models.CoreV1EventList;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodList;
+import io.kubernetes.client.util.generic.options.ListOptions;
import org.apache.submarine.server.api.common.CustomResourceType;
import org.apache.submarine.server.api.notebook.Notebook;
import org.apache.submarine.server.k8s.agent.util.RestClient;
import org.apache.submarine.server.submitter.k8s.model.NotebookCR;
+import org.apache.submarine.server.submitter.k8s.model.NotebookCRList;
import org.apache.submarine.server.submitter.k8s.util.NotebookUtils;
+import io.kubernetes.client.util.generic.GenericKubernetesApi;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.CoreV1Event;
-import io.kubernetes.client.openapi.models.V1PodList;
-import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.Watch.Response;
import io.kubernetes.client.util.Watchable;
-import okhttp3.Call;
public class NotebookHandler extends CustomResourceHandler {
private static final Logger LOG =
LoggerFactory.getLogger(NotebookHandler.class);
private Watchable<CoreV1Event> watcher;
- private CustomObjectsApi customObjectsApi;
+ private GenericKubernetesApi<V1Pod, V1PodList> podClient;
+ private GenericKubernetesApi<CoreV1Event, CoreV1EventList> eventClient;
+ private GenericKubernetesApi<NotebookCR, NotebookCRList> notebookCRClient;
private String podName;
public NotebookHandler() throws IOException {
@@ -62,20 +66,32 @@ public class NotebookHandler extends CustomResourceHandler {
this.crName = crName;
this.resourceId = resourceId;
+ podClient =
+ new GenericKubernetesApi<>(
+ V1Pod.class, V1PodList.class,
+ "", "v1", "pods", client);
+ eventClient =
+ new GenericKubernetesApi<>(
+ CoreV1Event.class, CoreV1EventList.class,
+ "", "v1", "events", client);
+ notebookCRClient =
+ new GenericKubernetesApi<>(
+ NotebookCR.class, NotebookCRList.class,
+ NotebookCR.CRD_NOTEBOOK_GROUP_V1,
NotebookCR.CRD_NOTEBOOK_VERSION_V1,
+ NotebookCR.CRD_NOTEBOOK_PLURAL_V1, client);
+
try {
- String podLabelSelector = String.format("%s=%s", NotebookCR.NOTEBOOK_ID,
- this.resourceId);
- V1PodList podList = this.coreV1Api.listNamespacedPod(namespace, null,
null, null, null,
- podLabelSelector, null, null, null, null, null);
- this.podName = podList.getItems().get(0).getMetadata().getName();
- String fieldSelector = String.format("involvedObject.name=%s",
this.podName);
+ ListOptions listOptions = new ListOptions();
+ String podLabelSelector = String.format("%s=%s", NotebookCR.NOTEBOOK_ID,
this.resourceId);
+ listOptions.setLabelSelector(podLabelSelector);
+ V1PodList podList = podClient.list(namespace,
listOptions).throwsApiException().getObject();
- Call call = coreV1Api.listNamespacedEventCall(namespace, null, null,
null, fieldSelector,
- null, null, null, null, null, true, null);
-
- watcher = Watch.createWatch(client, call, new
TypeToken<Response<CoreV1Event>>(){}.getType());
+ this.podName = podList.getItems().get(0).getMetadata().getName();
- customObjectsApi = new CustomObjectsApi();
+ listOptions = new ListOptions();
+ String fieldSelector = String.format("involvedObject.name=%s",
this.podName);
+ listOptions.setFieldSelector(fieldSelector);
+ watcher = eventClient.watch(namespace, listOptions);
} catch (ApiException e) {
e.printStackTrace();
@@ -86,57 +102,47 @@ public class NotebookHandler extends CustomResourceHandler
{
@Override
public void run() {
Notebook notebook = null;
- while (true) {
+ while (true) {
for (Response<CoreV1Event> event: watcher) {
String reason = event.object.getReason();
Object object = null;
try {
switch (reason) {
case "Created":
- case "Scheduled":
- object =
customObjectsApi.getNamespacedCustomObject(NotebookCR.CRD_NOTEBOOK_GROUP_V1,
- NotebookCR.CRD_NOTEBOOK_VERSION_V1,
- namespace, NotebookCR.CRD_NOTEBOOK_PLURAL_V1, crName);
+ case "Scheduled":
+ object = notebookCRClient.get(namespace,
crName).throwsApiException().getObject();
notebook = NotebookUtils.parseObject(object,
NotebookUtils.ParseOpt.PARSE_OPT_GET);
notebook.setStatus(Notebook.Status.STATUS_CREATING.getValue());
restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, notebook);
break;
case "Started":
- object =
customObjectsApi.getNamespacedCustomObject(NotebookCR.CRD_NOTEBOOK_GROUP_V1,
- NotebookCR.CRD_NOTEBOOK_VERSION_V1,
- namespace, NotebookCR.CRD_NOTEBOOK_PLURAL_V1, crName);
+ object = notebookCRClient.get(namespace,
crName).throwsApiException().getObject();
notebook = NotebookUtils.parseObject(object,
NotebookUtils.ParseOpt.PARSE_OPT_GET);
- notebook.setStatus(Notebook.Status.STATUS_RUNNING.getValue());
+ notebook.setStatus(Notebook.Status.STATUS_RUNNING.getValue());
restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, notebook);
break;
case "Failed":
- object =
customObjectsApi.getNamespacedCustomObject(NotebookCR.CRD_NOTEBOOK_GROUP_V1,
- NotebookCR.CRD_NOTEBOOK_VERSION_V1,
- namespace, NotebookCR.CRD_NOTEBOOK_PLURAL_V1, crName);
+ object = notebookCRClient.get(namespace,
crName).throwsApiException().getObject();
notebook = NotebookUtils.parseObject(object,
NotebookUtils.ParseOpt.PARSE_OPT_GET);
- notebook.setStatus(Notebook.Status.STATUS_FAILED.getValue());
+ notebook.setStatus(Notebook.Status.STATUS_FAILED.getValue());
restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, notebook);
break;
case "Pulling":
- object =
customObjectsApi.getNamespacedCustomObject(NotebookCR.CRD_NOTEBOOK_GROUP_V1,
- NotebookCR.CRD_NOTEBOOK_VERSION_V1,
- namespace, NotebookCR.CRD_NOTEBOOK_PLURAL_V1, crName);
+ object = notebookCRClient.get(namespace,
crName).throwsApiException().getObject();
notebook = NotebookUtils.parseObject(object,
NotebookUtils.ParseOpt.PARSE_OPT_GET);
- notebook.setStatus(Notebook.Status.STATUS_PULLING.getValue());
+ notebook.setStatus(Notebook.Status.STATUS_PULLING.getValue());
restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, notebook);
break;
case "Killing":
- object =
customObjectsApi.getNamespacedCustomObject(NotebookCR.CRD_NOTEBOOK_GROUP_V1,
- NotebookCR.CRD_NOTEBOOK_VERSION_V1,
- namespace, NotebookCR.CRD_NOTEBOOK_PLURAL_V1, crName);
+ object = notebookCRClient.get(namespace,
crName).throwsApiException().getObject();
notebook = NotebookUtils.parseObject(object,
NotebookUtils.ParseOpt.PARSE_OPT_GET);
-
notebook.setStatus(Notebook.Status.STATUS_TERMINATING.getValue());
+
notebook.setStatus(Notebook.Status.STATUS_TERMINATING.getValue());
restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, notebook);
LOG.info("Receive terminating event, exit progress");
return;
default:
- LOG.info(String.format("Unprocessed event type:%s", reason));
+ LOG.info(String.format("Unprocessed event type:%s", reason));
}
} catch (ApiException e) {
LOG.error("error while accessing k8s", e);
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
index 75f31b3..dee44f1 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
@@ -26,11 +26,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.function.Function;
-import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
@@ -41,34 +38,43 @@ import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.JSON;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.apis.CustomObjectsApi;
-import io.kubernetes.client.openapi.models.V1ConfigMap;
-import io.kubernetes.client.openapi.models.V1DeleteOptionsBuilder;
-import io.kubernetes.client.openapi.models.V1Deployment;
+import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.models.CoreV1Event;
import io.kubernetes.client.openapi.models.CoreV1EventList;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1ConfigMapList;
+import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimList;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1Status;
-import io.kubernetes.client.util.Watch;
+import io.kubernetes.client.util.generic.options.CreateOptions;
+import io.kubernetes.client.util.generic.options.DeleteOptions;
+import io.kubernetes.client.util.generic.options.ListOptions;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
+import io.kubernetes.client.util.generic.GenericKubernetesApi;
import org.apache.commons.lang3.StringUtils;
import org.apache.submarine.commons.utils.SubmarineConfVars;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.serve.istio.IstioVirtualService;
+import org.apache.submarine.serve.istio.IstioVirtualServiceList;
import org.apache.submarine.serve.pytorch.SeldonPytorchServing;
import org.apache.submarine.serve.seldon.SeldonDeployment;
+import org.apache.submarine.serve.seldon.SeldonDeploymentList;
import org.apache.submarine.serve.tensorflow.SeldonTFServing;
import org.apache.submarine.server.k8s.utils.K8sUtils;
+import org.apache.submarine.serve.utils.IstioConstants;
+import org.apache.submarine.serve.utils.SeldonConstants;
import org.apache.submarine.server.api.Submitter;
import org.apache.submarine.server.api.exception.InvalidSpecException;
import org.apache.submarine.server.api.experiment.Experiment;
import org.apache.submarine.server.api.experiment.ExperimentLog;
+import org.apache.submarine.server.api.experiment.Info;
import org.apache.submarine.server.api.experiment.MlflowInfo;
import org.apache.submarine.server.api.experiment.TensorboardInfo;
import org.apache.submarine.server.api.model.ServeSpec;
@@ -78,11 +84,15 @@ import org.apache.submarine.server.api.spec.ExperimentSpec;
import org.apache.submarine.server.api.spec.NotebookSpec;
import org.apache.submarine.server.submitter.k8s.model.MLJob;
import org.apache.submarine.server.submitter.k8s.model.NotebookCR;
+import org.apache.submarine.server.submitter.k8s.model.NotebookCRList;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobList;
+import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
+import
org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJobList;
import
org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRoute;
import
org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRouteSpec;
+import
org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRouteList;
import org.apache.submarine.server.submitter.k8s.model.ingressroute.SpecRoute;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
import org.apache.submarine.server.submitter.k8s.parser.ConfigmapSpecParser;
import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
import org.apache.submarine.server.submitter.k8s.parser.NotebookSpecParser;
@@ -91,6 +101,7 @@ import
org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
import org.apache.submarine.server.submitter.k8s.util.NotebookUtils;
import org.apache.submarine.server.submitter.k8s.util.OwnerReferenceUtils;
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,7 +138,26 @@ public class K8sSubmitter implements Submitter {
// K8s API client for CRD
- private CustomObjectsApi api;
+ private GenericKubernetesApi<V1Pod, V1PodList> podClient;
+
+ private GenericKubernetesApi<CoreV1Event, CoreV1EventList> eventClient;
+
+ private GenericKubernetesApi<V1PersistentVolumeClaim,
V1PersistentVolumeClaimList>
+ persistentVolumeClaimClient;
+
+ private GenericKubernetesApi<V1ConfigMap, V1ConfigMapList> configMapClient;
+
+ private GenericKubernetesApi<TFJob, TFJobList> tfJobClient;
+
+ private GenericKubernetesApi<PyTorchJob, PyTorchJobList> pyTorchJobClient;
+
+ private GenericKubernetesApi<NotebookCR, NotebookCRList> notebookCRClient;
+
+ private GenericKubernetesApi<IngressRoute, IngressRouteList>
ingressRouteClient;
+
+ private GenericKubernetesApi<SeldonDeployment, SeldonDeploymentList>
seldonDeploymentClient;
+
+ private GenericKubernetesApi<IstioVirtualService, IstioVirtualServiceList>
istioVirtualServiceClient;
private CoreV1Api coreApi;
@@ -153,16 +183,11 @@ public class K8sSubmitter implements Submitter {
throw new SubmarineRuntimeException(500, "Initialize K8s submitter
failed.");
}
} finally {
- // let watcher can wait until the next change
- client.setReadTimeout(0);
OkHttpClient httpClient = client.getHttpClient();
client.setHttpClient(httpClient);
Configuration.setDefaultApiClient(client);
}
- if (api == null) {
- api = new CustomObjectsApi();
- }
if (coreApi == null) {
coreApi = new CoreV1Api(client);
}
@@ -171,12 +196,61 @@ public class K8sSubmitter implements Submitter {
appsV1Api = new AppsV1Api();
}
- try {
- watchExperiment();
- } catch (Exception e){
- LOG.error("Experiment watch failed. " + e.getMessage(), e);
- }
-
+ podClient =
+ new GenericKubernetesApi<>(
+ V1Pod.class, V1PodList.class,
+ "", "v1", "pods", client);
+
+ eventClient =
+ new GenericKubernetesApi<>(
+ CoreV1Event.class, CoreV1EventList.class,
+ "", "v1", "events", client);
+
+ persistentVolumeClaimClient =
+ new GenericKubernetesApi<>(
+ V1PersistentVolumeClaim.class,
V1PersistentVolumeClaimList.class,
+ "", "v1", "persistentvolumeclaims", client);
+
+ configMapClient =
+ new GenericKubernetesApi<>(
+ V1ConfigMap.class, V1ConfigMapList.class,
+ "", "v1", "configmaps", client);
+
+ tfJobClient =
+ new GenericKubernetesApi<>(
+ TFJob.class, TFJobList.class,
+ TFJob.CRD_TF_GROUP_V1, TFJob.CRD_TF_VERSION_V1,
+ TFJob.CRD_TF_PLURAL_V1, client);
+
+ pyTorchJobClient =
+ new GenericKubernetesApi<>(
+ PyTorchJob.class, PyTorchJobList.class,
+ PyTorchJob.CRD_PYTORCH_GROUP_V1,
PyTorchJob.CRD_PYTORCH_VERSION_V1,
+ PyTorchJob.CRD_PYTORCH_PLURAL_V1, client);
+
+ notebookCRClient =
+ new GenericKubernetesApi<>(
+ NotebookCR.class, NotebookCRList.class,
+ NotebookCR.CRD_NOTEBOOK_GROUP_V1,
NotebookCR.CRD_NOTEBOOK_VERSION_V1,
+ NotebookCR.CRD_NOTEBOOK_PLURAL_V1, client);
+
+ ingressRouteClient =
+ new GenericKubernetesApi<>(
+ IngressRoute.class, IngressRouteList.class,
+ IngressRoute.CRD_INGRESSROUTE_GROUP_V1,
IngressRoute.CRD_INGRESSROUTE_VERSION_V1,
+ IngressRoute.CRD_INGRESSROUTE_PLURAL_V1, client);
+
+ seldonDeploymentClient =
+ new GenericKubernetesApi<>(
+ SeldonDeployment.class, SeldonDeploymentList.class,
+ SeldonConstants.GROUP, SeldonConstants.VERSION,
+ SeldonConstants.PLURAL, client);
+
+ istioVirtualServiceClient =
+ new GenericKubernetesApi<>(
+ IstioVirtualService.class, IstioVirtualServiceList.class,
+ IstioConstants.GROUP, IstioConstants.VERSION,
+ IstioConstants.PLURAL, client);
}
@Override
@@ -186,9 +260,12 @@ public class K8sSubmitter implements Submitter {
MLJob mlJob = ExperimentSpecParser.parseJob(spec);
mlJob.getMetadata().setNamespace(getServerNamespace());
mlJob.getMetadata().setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
-
- Object object = api.createNamespacedCustomObject(mlJob.getGroup(),
mlJob.getVersion(),
- mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob,
"true", null, null);
+
+ Object object = mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)
+ ? tfJobClient.create(getServerNamespace(), (TFJob) mlJob,
+ new CreateOptions()).throwsApiException().getObject()
+ : pyTorchJobClient.create(getServerNamespace(), (PyTorchJob)
mlJob,
+ new CreateOptions()).throwsApiException().getObject();
experiment = parseExperimentResponseObject(object,
ParseOp.PARSE_OP_RESULT);
} catch (InvalidSpecException e) {
LOG.error("K8s submitter: parse Job object failed by " + e.getMessage(),
e);
@@ -207,9 +284,11 @@ public class K8sSubmitter implements Submitter {
try {
MLJob mlJob = ExperimentSpecParser.parseJob(spec);
mlJob.getMetadata().setNamespace(getServerNamespace());
-
- Object object = api.getNamespacedCustomObject(mlJob.getGroup(),
mlJob.getVersion(),
- mlJob.getMetadata().getNamespace(), mlJob.getPlural(),
mlJob.getMetadata().getName());
+ Object object = mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)
+ ? tfJobClient.get(getServerNamespace(),
mlJob.getMetadata().getName())
+ .throwsApiException().getObject()
+ : pyTorchJobClient.get(getServerNamespace(),
mlJob.getMetadata().getName())
+ .throwsApiException().getObject();
experiment = parseExperimentResponseObject(object,
ParseOp.PARSE_OP_RESULT);
} catch (InvalidSpecException e) {
@@ -228,9 +307,14 @@ public class K8sSubmitter implements Submitter {
MLJob mlJob = ExperimentSpecParser.parseJob(spec);
mlJob.getMetadata().setNamespace(getServerNamespace());
- Object object = api.patchNamespacedCustomObject(mlJob.getGroup(),
mlJob.getVersion(),
- mlJob.getMetadata().getNamespace(), mlJob.getPlural(),
mlJob.getMetadata().getName(),
- mlJob, null, null, false);
+ Object object = mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)
+ ? tfJobClient.patch(getServerNamespace(),
mlJob.getMetadata().getName(),
+ V1Patch.PATCH_FORMAT_JSON_PATCH,
+ new V1Patch(new Gson().toJson(((TFJob)
mlJob).getSpec()))).throwsApiException().getObject()
+ : pyTorchJobClient.patch(getServerNamespace(),
mlJob.getMetadata().getName(),
+ V1Patch.PATCH_FORMAT_JSON_PATCH,
+ new V1Patch(new Gson().toJson(((PyTorchJob)
mlJob).getSpec()))).throwsApiException().getObject()
+ ;
experiment = parseExperimentResponseObject(object,
ParseOp.PARSE_OP_RESULT);
} catch (InvalidSpecException e) {
throw new SubmarineRuntimeException(200, e.getMessage());
@@ -247,9 +331,13 @@ public class K8sSubmitter implements Submitter {
MLJob mlJob = ExperimentSpecParser.parseJob(spec);
mlJob.getMetadata().setNamespace(getServerNamespace());
- Object object = api.deleteNamespacedCustomObject(mlJob.getGroup(),
mlJob.getVersion(),
- mlJob.getMetadata().getNamespace(), mlJob.getPlural(),
mlJob.getMetadata().getName(), 0,
- false, null, null,
MLJobConverter.toDeleteOptionsFromMLJob(mlJob));
+ Object object = mlJob.getPlural().equals(TFJob.CRD_TF_PLURAL_V1)
+ ? tfJobClient.delete(getServerNamespace(),
mlJob.getMetadata().getName(),
+ MLJobConverter.toDeleteOptionsFromMLJob(mlJob))
+ .throwsApiException().getStatus()
+ : pyTorchJobClient.delete(getServerNamespace(),
mlJob.getMetadata().getName(),
+ MLJobConverter.toDeleteOptionsFromMLJob(mlJob))
+ .throwsApiException().getStatus();
experiment = parseExperimentResponseObject(object,
ParseOp.PARSE_OP_DELETE);
} catch (InvalidSpecException e) {
throw new SubmarineRuntimeException(200, e.getMessage());
@@ -283,11 +371,10 @@ public class K8sSubmitter implements Submitter {
ExperimentLog experimentLog = new ExperimentLog();
experimentLog.setExperimentId(id);
try {
- final V1PodList podList = coreApi.listNamespacedPod(
- getServerNamespace(),
- "false", false, null, null,
- getJobLabelSelector(spec), null, null,
- null, null, null);
+ ListOptions listOptions = new ListOptions();
+ listOptions.setLabelSelector(getJobLabelSelector(spec));
+ final V1PodList podList = podClient.list(getServerNamespace(),
listOptions)
+ .throwsApiException().getObject();
for (V1Pod pod : podList.getItems()) {
String podName = pod.getMetadata().getName();
experimentLog.addPodLog(podName, null);
@@ -303,12 +390,10 @@ public class K8sSubmitter implements Submitter {
ExperimentLog experimentLog = new ExperimentLog();
experimentLog.setExperimentId(id);
try {
- final V1PodList podList = coreApi.listNamespacedPod(
- getServerNamespace(),
- "false", false, null, null,
- getJobLabelSelector(spec), null, null,
- null, null, null);
-
+ ListOptions listOptions = new ListOptions();
+ listOptions.setLabelSelector(getJobLabelSelector(spec));
+ final V1PodList podList = podClient.list(getServerNamespace(),
listOptions)
+ .throwsApiException().getObject();
for (V1Pod pod : podList.getItems()) {
String podName = pod.getMetadata().getName();
String podLog = coreApi.readNamespacedPodLog(
@@ -328,36 +413,8 @@ public class K8sSubmitter implements Submitter {
public TensorboardInfo getTensorboardInfo() throws SubmarineRuntimeException
{
final String name = "submarine-tensorboard";
final String ingressRouteName = "submarine-tensorboard-ingressroute";
- String namespace = getServerNamespace();
-
try {
- V1Deployment deploy = appsV1Api.readNamespacedDeploymentStatus(name,
namespace, "true");
- boolean available = deploy.getStatus().getAvailableReplicas() > 0; // at
least one replica is running
-
- IngressRoute ingressRoute = new IngressRoute();
- V1ObjectMeta meta = new V1ObjectMeta();
- meta.setName(ingressRouteName);
- meta.setNamespace(namespace);
- ingressRoute.setMetadata(meta);
- Object object = api.getNamespacedCustomObject(
- ingressRoute.getGroup(), ingressRoute.getVersion(),
- ingressRoute.getMetadata().getNamespace(),
- ingressRoute.getPlural(), ingressRouteName
- );
-
- Gson gson = new JSON().getGson();
- String jsonString = gson.toJson(object);
- IngressRoute result = gson.fromJson(jsonString, IngressRoute.class);
-
-
- String route =
result.getSpec().getRoutes().stream().findFirst().get().getMatch();
-
- // replace "PathPrefix(`/tensorboard`)" with "/tensorboard/"
- String url = route.replace("PathPrefix(`", "").replace("`)", "/");
-
- TensorboardInfo tensorboardInfo = new TensorboardInfo(available, url);
-
- return tensorboardInfo;
+ return new TensorboardInfo(getInfo(name, ingressRouteName));
} catch (ApiException e) {
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
@@ -367,41 +424,32 @@ public class K8sSubmitter implements Submitter {
public MlflowInfo getMlflowInfo() throws SubmarineRuntimeException {
final String name = "submarine-mlflow";
final String ingressRouteName = "submarine-mlflow-ingressroute";
- String namespace = getServerNamespace();
-
try {
- V1Deployment deploy = appsV1Api.readNamespacedDeploymentStatus(name,
namespace, "true");
- boolean available = deploy.getStatus().getAvailableReplicas() > 0; // at
least one replica is running
-
- IngressRoute ingressRoute = new IngressRoute();
- V1ObjectMeta meta = new V1ObjectMeta();
- meta.setName(ingressRouteName);
- meta.setNamespace(namespace);
- ingressRoute.setMetadata(meta);
- Object object = api.getNamespacedCustomObject(
- ingressRoute.getGroup(), ingressRoute.getVersion(),
- ingressRoute.getMetadata().getNamespace(),
- ingressRoute.getPlural(), ingressRouteName
- );
+ return new MlflowInfo(getInfo(name, ingressRouteName));
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
+ }
+ }
- Gson gson = new JSON().getGson();
- String jsonString = gson.toJson(object);
- IngressRoute result = gson.fromJson(jsonString, IngressRoute.class);
+ public Info getInfo(String name, String ingressRouteName) throws
ApiException{
+ V1Deployment deploy = appsV1Api.readNamespacedDeploymentStatus(name,
getServerNamespace(), "true");
+ boolean available = deploy.getStatus().getAvailableReplicas() > 0; // at
least one replica is running
+ IngressRoute ingressRoute = new IngressRoute();
+ V1ObjectMeta meta = new V1ObjectMeta();
+ meta.setName(ingressRouteName);
+ meta.setNamespace(getServerNamespace());
+ ingressRoute.setMetadata(meta);
- String route =
result.getSpec().getRoutes().stream().findFirst().get().getMatch();
+ IngressRoute result = ingressRouteClient.get(getServerNamespace(),
ingressRouteName)
+ .throwsApiException().getObject();
- String url = route.replace("PathPrefix(`", "").replace("`)", "/");
+ String route =
result.getSpec().getRoutes().stream().findFirst().get().getMatch();
- MlflowInfo mlflowInfo = new MlflowInfo(available, url);
+ String url = route.replace("PathPrefix(`", "").replace("`)", "/");
- return mlflowInfo;
- } catch (ApiException e) {
- throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
- }
+ return new Info(available, url);
}
-
-
@Override
public Notebook createNotebook(NotebookSpec spec, String notebookId) throws
SubmarineRuntimeException {
Notebook notebook;
@@ -456,8 +504,7 @@ public class K8sSubmitter implements Submitter {
// create notebook custom resource
try {
- Object object = api.createNamespacedCustomObject(notebookCR.getGroup(),
notebookCR.getVersion(),
- namespace, notebookCR.getPlural(), notebookCR, "true", null, null);
+ Object object =
notebookCRClient.create(notebookCR).throwsApiException().getObject();
notebook = NotebookUtils.parseObject(object,
NotebookUtils.ParseOpt.PARSE_OPT_CREATE);
} catch (JsonSyntaxException e) {
LOG.error("K8s submitter: parse response object failed by " +
e.getMessage(), e);
@@ -469,7 +516,7 @@ public class K8sSubmitter implements Submitter {
if (needOverwrite) rollbackCreationConfigMap(namespace, configmap);
rollbackCreationPVC(namespace, workspacePvc, userPvc);
throw new SubmarineRuntimeException(e.getCode(), "K8s submitter: parse
Notebook object failed by " +
- e.getMessage());
+ e.getMessage());
}
// create notebook Traefik custom resource
@@ -494,23 +541,24 @@ public class K8sSubmitter implements Submitter {
try {
NotebookCR notebookCR = NotebookSpecParser.parseNotebook(spec, null,
null);
-
- Object object = api.getNamespacedCustomObject(notebookCR.getGroup(),
notebookCR.getVersion(),
- namespace,
- notebookCR.getPlural(), notebookCR.getMetadata().getName());
+ Object object = notebookCRClient.get(namespace,
notebookCR.getMetadata().getName())
+ .throwsApiException().getObject();
notebook = NotebookUtils.parseObject(object,
NotebookUtils.ParseOpt.PARSE_OPT_GET);
if
(notebook.getStatus().equals(Notebook.Status.STATUS_WAITING.toString())) {
LOG.info(String.format("notebook status: waiting; check the pods in
namespace:[%s] to "
+ "ensure is the waiting caused by image pulling", namespace));
String podLabelSelector = String.format("%s=%s",
NotebookCR.NOTEBOOK_ID,
spec.getMeta().getLabels().get(NotebookCR.NOTEBOOK_ID).toString());
-
- V1PodList podList = coreApi.listNamespacedPod(namespace, null, null,
null, null,
- podLabelSelector, null, null, null, null, null);
+ ListOptions listOptions = new ListOptions();
+ listOptions.setLabelSelector(podLabelSelector);
+ final V1PodList podList = podClient.list(getServerNamespace(),
listOptions)
+ .throwsApiException().getObject();
String podName = podList.getItems().get(0).getMetadata().getName();
+
String fieldSelector = String.format("involvedObject.name=%s",
podName);
- CoreV1EventList events = coreApi.listNamespacedEvent(namespace, null,
null, null, fieldSelector,
- null, null, null, null, null, null);
+ listOptions = new ListOptions();
+ listOptions.setFieldSelector(fieldSelector);
+ CoreV1EventList events = eventClient.list(namespace,
listOptions).throwsApiException().getObject();
CoreV1Event latestEvent =
events.getItems().get(events.getItems().size() - 1);
if (latestEvent.getReason().equalsIgnoreCase("Pulling")) {
@@ -542,10 +590,8 @@ public class K8sSubmitter implements Submitter {
String namespace = getServerNamespace();
NotebookCR notebookCR = NotebookSpecParser.parseNotebook(spec, null, null);
try {
- Object object = api.deleteNamespacedCustomObject(notebookCR.getGroup(),
notebookCR.getVersion(),
- namespace, notebookCR.getPlural(),
- notebookCR.getMetadata().getName(), null, null, null,
- null, new
V1DeleteOptionsBuilder().withApiVersion(notebookCR.getApiVersion()).build());
+ Object object = notebookCRClient.delete(namespace, name,
+
getDeleteOptions(notebookCR.getApiVersion())).throwsApiException().getStatus();
notebook = NotebookUtils.parseObject(object,
NotebookUtils.ParseOpt.PARSE_OPT_DELETE);
} catch (ApiException e) {
API_EXCEPTION_404_CONSUMER.apply(e);
@@ -583,10 +629,9 @@ public class K8sSubmitter implements Submitter {
String namespace = getServerNamespace();
try {
- Object object =
api.listNamespacedCustomObject(NotebookCR.CRD_NOTEBOOK_GROUP_V1,
- NotebookCR.CRD_NOTEBOOK_VERSION_V1, namespace,
NotebookCR.CRD_NOTEBOOK_PLURAL_V1,
- "true", null, null, NotebookCR.NOTEBOOK_OWNER_SELECTOR_KEY + "=" +
id,
- null, null, null, null);
+ ListOptions listOptions = new ListOptions();
+ listOptions.setLabelSelector(NotebookCR.NOTEBOOK_OWNER_SELECTOR_KEY +
"=" + id);
+ Object object = notebookCRClient.list(namespace,
listOptions).throwsApiException().getObject();
notebookList = NotebookUtils.parseObjectForList(object);
} catch (ApiException e) {
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
@@ -601,33 +646,19 @@ public class K8sSubmitter implements Submitter {
IstioVirtualService istioVirtualService = new
IstioVirtualService(spec.getModelName(),
spec.getModelVersion());
try {
- api.createNamespacedCustomObject(seldonDeployment.getGroup(),
- seldonDeployment.getVersion(),
- "default",
- seldonDeployment.getPlural(),
- seldonDeployment,
- "true", null, null);
+ seldonDeploymentClient.create("default", seldonDeployment, new
CreateOptions()).throwsApiException();
} catch (ApiException e) {
LOG.error(e.getMessage(), e);
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
try {
- api.createNamespacedCustomObject(istioVirtualService.getGroup(),
- istioVirtualService.getVersion(),
- "default",
- istioVirtualService.getPlural(),
- istioVirtualService,
- "true", null, null);
+ istioVirtualServiceClient.create("default", istioVirtualService, new
CreateOptions())
+ .throwsApiException();
} catch (ApiException e) {
LOG.error(e.getMessage(), e);
try {
- api.deleteNamespacedCustomObject(seldonDeployment.getGroup(),
- seldonDeployment.getVersion(),
- "default",
- seldonDeployment.getPlural(),
- seldonDeployment.getMetadata().getName(),
- null, null, null, null,
- new
V1DeleteOptionsBuilder().withApiVersion(seldonDeployment.getApiVersion()).build());
+ seldonDeploymentClient.delete("default",
seldonDeployment.getMetadata().getName(),
+
getDeleteOptions(seldonDeployment.getApiVersion())).throwsApiException();
} catch (ApiException e1) {
LOG.error(e1.getMessage(), e1);
}
@@ -642,125 +673,22 @@ public class K8sSubmitter implements Submitter {
IstioVirtualService istioVirtualService = new
IstioVirtualService(spec.getModelName(),
spec.getModelVersion());
try {
- api.deleteNamespacedCustomObject(seldonDeployment.getGroup(),
- seldonDeployment.getVersion(),
- "default",
- seldonDeployment.getPlural(),
- seldonDeployment.getMetadata().getName(), null, null, null,
- null, new
V1DeleteOptionsBuilder().withApiVersion(seldonDeployment.getApiVersion()).build());
- api.deleteNamespacedCustomObject(istioVirtualService.getGroup(),
- istioVirtualService.getVersion(),
- "default",
- istioVirtualService.getPlural(),
- istioVirtualService.getMetadata().getName(),
- null, null, null, null,
- new
V1DeleteOptionsBuilder().withApiVersion(istioVirtualService.getApiVersion()).build());
+ seldonDeploymentClient.delete("default",
seldonDeployment.getMetadata().getName(),
+
getDeleteOptions(seldonDeployment.getApiVersion())).throwsApiException();
+ istioVirtualServiceClient.delete("default",
istioVirtualService.getMetadata().getName(),
+
getDeleteOptions(istioVirtualService.getApiVersion())).throwsApiException();
} catch (ApiException e) {
LOG.error(e.getMessage(), e);
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
}
- public void watchExperiment() throws ApiException{
-
- ExecutorService experimentThread = Executors.newFixedThreadPool(2);
-
- try (Watch<MLJob> watchTF = Watch.createWatch(
- client,
- api.listNamespacedCustomObjectCall(
- TFJob.CRD_TF_GROUP_V1,
- TFJob.CRD_TF_VERSION_V1,
- getServerNamespace(),
- TFJob.CRD_TF_PLURAL_V1,
- "true",
- null,
- null,
- null,
- null,
- null,
- null,
- Boolean.TRUE,
- null
- ),
- new TypeToken<Watch.Response<MLJob>>() {
- }.getType()
- )) {
- experimentThread.execute(new Runnable() {
- @Override
- public void run() {
- try {
- LOG.info("Start watching on TFJobs...");
-
- for (Watch.Response<MLJob> experiment : watchTF) {
- LOG.info("{}", experiment.object.getStatus());
- }
- } finally {
- LOG.info("WATCH TFJob END");
- try {
- watchTF.close();
- } catch (Exception e) {
- LOG.error("{}", e.getMessage());
- }
- }
- }
- });
- } catch (Exception ex) {
- throw new RuntimeException();
- }
-
- try (Watch<MLJob> watchPytorch = Watch.createWatch(
- client,
- api.listNamespacedCustomObjectCall(
- PyTorchJob.CRD_PYTORCH_GROUP_V1,
- PyTorchJob.CRD_PYTORCH_VERSION_V1,
- getServerNamespace(),
- PyTorchJob.CRD_PYTORCH_PLURAL_V1,
- "true",
- null,
- null,
- null,
- null,
- null,
- null,
- Boolean.TRUE,
- null
- ),
- new TypeToken<Watch.Response<MLJob>>() {
- }.getType()
- )) {
- experimentThread.execute(new Runnable() {
- @Override
- public void run() {
- try {
- LOG.info("Start watching on PytorchJobs...");
-
- ;
- for (Watch.Response<MLJob> experiment : watchPytorch) {
- LOG.info("{}", experiment.object.getStatus());
- }
- } finally {
- LOG.info("WATCH PytorchJob END");
- try {
- watchPytorch.close();
- } catch (Exception e) {
- LOG.error("{}", e.getMessage());
- }
- }
- }
- });
- } catch (Exception ex) {
- throw new RuntimeException();
- }
- }
-
public void createPersistentVolumeClaim(String pvcName, String namespace,
String scName, String storage)
throws ApiException {
V1PersistentVolumeClaim pvc =
VolumeSpecParser.parsePersistentVolumeClaim(pvcName, scName, storage);
pvc.getMetadata().setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
try {
- V1PersistentVolumeClaim result =
coreApi.createNamespacedPersistentVolumeClaim(
- namespace, pvc, "true", null, null
- );
+ persistentVolumeClaimClient.create(namespace, pvc, new
CreateOptions()).throwsApiException();
} catch (ApiException e) {
LOG.error("Exception when creating persistent volume claim " +
e.getMessage(), e);
throw e;
@@ -774,11 +702,7 @@ public class K8sSubmitter implements Submitter {
but it can still work fine and delete the PVC
*/
try {
- V1PersistentVolumeClaim result =
coreApi.deleteNamespacedPersistentVolumeClaim(
- pvcName, namespace, "true",
- null, null, null,
- null, null
- );
+ persistentVolumeClaimClient.delete(namespace,
pvcName).throwsApiException();
} catch (ApiException e) {
LOG.error("Exception when deleting persistent volume claim " +
e.getMessage(), e);
API_EXCEPTION_404_CONSUMER.apply(e);
@@ -815,10 +739,8 @@ public class K8sSubmitter implements Submitter {
meta.setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
ingressRoute.setMetadata(meta);
ingressRoute.setSpec(parseIngressRouteSpec(meta.getNamespace(),
meta.getName()));
- api.createNamespacedCustomObject(
- ingressRoute.getGroup(), ingressRoute.getVersion(),
- ingressRoute.getMetadata().getNamespace(),
- ingressRoute.getPlural(), ingressRoute, "true", null, null);
+
+ ingressRouteClient.create(namespace, ingressRoute, new
CreateOptions()).throwsApiException();
} catch (ApiException e) {
LOG.error("K8s submitter: Create Traefik custom resource object failed
by " + e.getMessage(), e);
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
@@ -830,10 +752,8 @@ public class K8sSubmitter implements Submitter {
private void deleteIngressRoute(String namespace, String name) {
try {
- api.deleteNamespacedCustomObject(
- IngressRoute.CRD_INGRESSROUTE_GROUP_V1,
IngressRoute.CRD_INGRESSROUTE_VERSION_V1,
- namespace, IngressRoute.CRD_INGRESSROUTE_PLURAL_V1, name, null,
null, null,
- null, new
V1DeleteOptionsBuilder().withApiVersion(IngressRoute.CRD_APIVERSION_V1).build());
+ ingressRouteClient.delete(namespace, name,
+
getDeleteOptions(IngressRoute.CRD_APIVERSION_V1)).throwsApiException();
} catch (ApiException e) {
LOG.error("K8s submitter: Delete Traefik custom resource object failed
by " + e.getMessage(), e);
API_EXCEPTION_404_CONSUMER.apply(e);
@@ -885,7 +805,7 @@ public class K8sSubmitter implements Submitter {
V1ConfigMap configMap = ConfigmapSpecParser.parseConfigMap(name, values);
configMap.getMetadata().setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
try {
- coreApi.createNamespacedConfigMap(namespace, configMap, "true", null,
null);
+ configMapClient.create(namespace, configMap, new
CreateOptions()).throwsApiException();
} catch (ApiException e) {
LOG.error("Exception when creating configmap " + e.getMessage(), e);
throw e;
@@ -897,9 +817,7 @@ public class K8sSubmitter implements Submitter {
*/
public void deleteConfigMap(String namespace, String name) {
try {
- coreApi.deleteNamespacedConfigMap(name, namespace,
- "true", null, null, null,
- null, null);
+ configMapClient.delete(namespace, name).throwsApiException();
} catch (ApiException e) {
LOG.error("Exception when deleting config map " + e.getMessage(), e);
API_EXCEPTION_404_CONSUMER.apply(e);
@@ -925,10 +843,8 @@ public class K8sSubmitter implements Submitter {
private void rollbackCreationNotebook(NotebookCR notebookCR, String
namespace)
throws SubmarineRuntimeException {
try {
- Object object = api.deleteNamespacedCustomObject(notebookCR.getGroup(),
notebookCR.getVersion(),
- namespace, notebookCR.getPlural(),
- notebookCR.getMetadata().getName(), null, null, null, null,
- new
V1DeleteOptionsBuilder().withApiVersion(notebookCR.getApiVersion()).build());
+ notebookCRClient.delete(namespace, notebookCR.getMetadata().getName(),
+
getDeleteOptions(notebookCR.getApiVersion())).throwsApiException();
} catch (ApiException e) {
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
@@ -938,6 +854,12 @@ public class K8sSubmitter implements Submitter {
return K8sUtils.getNamespace();
}
+ private DeleteOptions getDeleteOptions(String apiVersion){
+ DeleteOptions deleteOptions = new DeleteOptions();
+ deleteOptions.setApiVersion(apiVersion);
+ return deleteOptions;
+ }
+
private enum ParseOp {
PARSE_OP_RESULT,
PARSE_OP_DELETE
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
index 7673836..176e69a 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
@@ -20,6 +20,7 @@
package org.apache.submarine.server.submitter.k8s.model;
import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1JobStatus;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
@@ -27,7 +28,7 @@ import io.kubernetes.client.openapi.models.V1ObjectMeta;
* The machine learning job for the CRD job.
* It be serialized as body input to k8s api client
*/
-public class MLJob {
+public class MLJob implements KubernetesObject{
@SerializedName("apiVersion")
private String apiVersion;
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCR.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCR.java
index ae495df..8eb38c7 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCR.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCR.java
@@ -28,7 +28,8 @@ public class NotebookCR implements KubernetesObject{
public static final String CRD_NOTEBOOK_VERSION_V1 = "v1";
public static final String CRD_NOTEBOOK_GROUP_V1 = "kubeflow.org";
- public static final String CRD_APIVERSION_V1 = CRD_NOTEBOOK_GROUP_V1 + "/" +
CRD_NOTEBOOK_VERSION_V1;
+ public static final String CRD_NOTEBOOK_APIVERSION_V1 =
+ CRD_NOTEBOOK_GROUP_V1 + "/" + CRD_NOTEBOOK_VERSION_V1;
public static final String CRD_NOTEBOOK_KIND_V1 = "Notebook";
public static final String CRD_NOTEBOOK_PLURAL_V1 = "notebooks";
public static final String NOTEBOOK_OWNER_SELECTOR_KEY = "notebook-owner-id";
@@ -56,7 +57,7 @@ public class NotebookCR implements KubernetesObject{
private NotebookStatus status;
public NotebookCR() {
- setApiVersion(CRD_APIVERSION_V1);
+ setApiVersion(CRD_NOTEBOOK_APIVERSION_V1);
setKind(CRD_NOTEBOOK_KIND_V1);
setPlural(CRD_NOTEBOOK_PLURAL_V1);
setGroup(CRD_NOTEBOOK_GROUP_V1);
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
index 3c236e6..fe66f45 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
@@ -58,6 +58,12 @@ public class NotebookCRList implements KubernetesListObject{
this.kind = kind;
}
+ @Override
+ public V1ListMeta getMetadata() {
+ return metadata;
+ }
+
+ @Override
public List<NotebookCR> getItems() {
return items;
}
@@ -72,9 +78,5 @@ public class NotebookCRList implements KubernetesListObject{
return kind;
}
-
- @Override
- public V1ListMeta getMetadata() {
- return metadata;
- }
}
+
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/ingressroute/IngressRoute.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/ingressroute/IngressRoute.java
index 35e0aa6..048e165 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/ingressroute/IngressRoute.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/ingressroute/IngressRoute.java
@@ -20,9 +20,10 @@
package org.apache.submarine.server.submitter.k8s.model.ingressroute;
import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
-public class IngressRoute {
+public class IngressRoute implements KubernetesObject {
public static final String CRD_INGRESSROUTE_GROUP_V1 = "traefik.containo.us";
public static final String CRD_INGRESSROUTE_VERSION_V1 = "v1alpha1";
public static final String CRD_APIVERSION_V1 = CRD_INGRESSROUTE_GROUP_V1 +
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/ingressroute/IngressRouteList.java
similarity index 62%
copy from
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
copy to
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/ingressroute/IngressRouteList.java
index 3c236e6..1bb856a 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/ingressroute/IngressRouteList.java
@@ -17,27 +17,17 @@
* under the License.
*/
-package org.apache.submarine.server.submitter.k8s.model;
+package org.apache.submarine.server.submitter.k8s.model.ingressroute;
import com.google.gson.annotations.SerializedName;
+import java.util.List;
import io.kubernetes.client.common.KubernetesListObject;
+import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ListMeta;
-import java.util.List;
-
-public class NotebookCRList implements KubernetesListObject{
-
- public static final String CRD_NOTEBOOK_VERSION_V1 = "v1alpha1";
- public static final String CRD_NOTEBOOK_GROUP_V1 = "kubeflow.org";
- public static final String CRD_APIVERSION_V1 = CRD_NOTEBOOK_GROUP_V1 + "/" +
CRD_NOTEBOOK_VERSION_V1;
- public static final String CRD_NOTEBOOK_LIST_KIND_V1 = "NotebookList";
+public class IngressRouteList implements KubernetesListObject{
- public NotebookCRList() {
- setApiVersion(CRD_APIVERSION_V1);
- setKind(CRD_NOTEBOOK_LIST_KIND_V1);
- }
-
@SerializedName("apiVersion")
private String apiVersion;
@@ -46,35 +36,27 @@ public class NotebookCRList implements KubernetesListObject{
@SerializedName("metadata")
private V1ListMeta metadata;
-
+
@SerializedName("items")
- private List<NotebookCR> items;
-
- public void setApiVersion(String apiVersion) {
- this.apiVersion = apiVersion;
- }
+ private List<IngressRoute> items;
- public void setKind(String kind) {
- this.kind = kind;
+ @Override
+ public V1ListMeta getMetadata() {
+ return metadata;
}
- public List<NotebookCR> getItems() {
+ @Override
+ public List<? extends KubernetesObject> getItems() {
return items;
}
@Override
public String getApiVersion() {
- return apiVersion;
+ return IngressRoute.CRD_APIVERSION_V1;
}
@Override
public String getKind() {
-
- return kind;
- }
-
- @Override
- public V1ListMeta getMetadata() {
- return metadata;
+ return IngressRoute.CRD_INGRESSROUTE_KIND_V1 + "List";
}
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobList.java
similarity index 62%
copy from
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
copy to
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobList.java
index 3c236e6..c6ea52b 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobList.java
@@ -17,27 +17,17 @@
* under the License.
*/
-package org.apache.submarine.server.submitter.k8s.model;
+package org.apache.submarine.server.submitter.k8s.model.pytorchjob;
import com.google.gson.annotations.SerializedName;
+import java.util.List;
import io.kubernetes.client.common.KubernetesListObject;
+import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ListMeta;
-import java.util.List;
-
-public class NotebookCRList implements KubernetesListObject{
-
- public static final String CRD_NOTEBOOK_VERSION_V1 = "v1alpha1";
- public static final String CRD_NOTEBOOK_GROUP_V1 = "kubeflow.org";
- public static final String CRD_APIVERSION_V1 = CRD_NOTEBOOK_GROUP_V1 + "/" +
CRD_NOTEBOOK_VERSION_V1;
- public static final String CRD_NOTEBOOK_LIST_KIND_V1 = "NotebookList";
+public class PyTorchJobList implements KubernetesListObject{
- public NotebookCRList() {
- setApiVersion(CRD_APIVERSION_V1);
- setKind(CRD_NOTEBOOK_LIST_KIND_V1);
- }
-
@SerializedName("apiVersion")
private String apiVersion;
@@ -46,35 +36,27 @@ public class NotebookCRList implements KubernetesListObject{
@SerializedName("metadata")
private V1ListMeta metadata;
-
+
@SerializedName("items")
- private List<NotebookCR> items;
-
- public void setApiVersion(String apiVersion) {
- this.apiVersion = apiVersion;
- }
+ private List<PyTorchJob> items;
- public void setKind(String kind) {
- this.kind = kind;
+ @Override
+ public V1ListMeta getMetadata() {
+ return metadata;
}
- public List<NotebookCR> getItems() {
+ @Override
+ public List<? extends KubernetesObject> getItems() {
return items;
}
@Override
public String getApiVersion() {
- return apiVersion;
+ return PyTorchJob.CRD_PYTORCH_API_VERSION_V1;
}
@Override
public String getKind() {
-
- return kind;
- }
-
- @Override
- public V1ListMeta getMetadata() {
- return metadata;
+ return PyTorchJob.CRD_PYTORCH_KIND_V1 + "List";
}
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
index bd0819f..a840bb6 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/pytorchjob/PyTorchJobSpec.java
@@ -21,26 +21,23 @@ package
org.apache.submarine.server.submitter.k8s.model.pytorchjob;
import com.google.gson.annotations.SerializedName;
import org.apache.submarine.server.submitter.k8s.model.MLJobReplicaSpec;
-import org.apache.submarine.server.submitter.k8s.model.MLJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.model.MLJobSpec;
import java.util.Map;
-public class PyTorchJobSpec extends MLJobSpec {
+public class PyTorchJobSpec {
/**
* Key: Master, Worker
*/
@SerializedName("pytorchReplicaSpecs")
- private Map<MLJobReplicaType, MLJobReplicaSpec> replicaSpecs;
+ private Map<PyTorchJobReplicaType, MLJobReplicaSpec> replicaSpecs;
/**
* Get the replica specs.
*
* @return map
*/
- @Override
- public Map<MLJobReplicaType, MLJobReplicaSpec> getReplicaSpecs() {
+ public Map<PyTorchJobReplicaType, MLJobReplicaSpec> getReplicaSpecs() {
return replicaSpecs;
}
@@ -49,9 +46,8 @@ public class PyTorchJobSpec extends MLJobSpec {
*
* @param replicaSpecs map
*/
- @Override
public void setReplicaSpecs(
- Map<MLJobReplicaType, MLJobReplicaSpec> replicaSpecs) {
+ Map<PyTorchJobReplicaType, MLJobReplicaSpec> replicaSpecs) {
this.replicaSpecs = replicaSpecs;
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobList.java
similarity index 62%
copy from
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
copy to
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobList.java
index 3c236e6..bcb89fd 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobList.java
@@ -17,27 +17,16 @@
* under the License.
*/
-package org.apache.submarine.server.submitter.k8s.model;
+package org.apache.submarine.server.submitter.k8s.model.tfjob;
import com.google.gson.annotations.SerializedName;
+import java.util.List;
import io.kubernetes.client.common.KubernetesListObject;
import io.kubernetes.client.openapi.models.V1ListMeta;
-import java.util.List;
-
-public class NotebookCRList implements KubernetesListObject{
-
- public static final String CRD_NOTEBOOK_VERSION_V1 = "v1alpha1";
- public static final String CRD_NOTEBOOK_GROUP_V1 = "kubeflow.org";
- public static final String CRD_APIVERSION_V1 = CRD_NOTEBOOK_GROUP_V1 + "/" +
CRD_NOTEBOOK_VERSION_V1;
- public static final String CRD_NOTEBOOK_LIST_KIND_V1 = "NotebookList";
+public class TFJobList implements KubernetesListObject{
- public NotebookCRList() {
- setApiVersion(CRD_APIVERSION_V1);
- setKind(CRD_NOTEBOOK_LIST_KIND_V1);
- }
-
@SerializedName("apiVersion")
private String apiVersion;
@@ -46,35 +35,27 @@ public class NotebookCRList implements KubernetesListObject{
@SerializedName("metadata")
private V1ListMeta metadata;
-
+
@SerializedName("items")
- private List<NotebookCR> items;
-
- public void setApiVersion(String apiVersion) {
- this.apiVersion = apiVersion;
- }
+ private List<TFJob> items;
- public void setKind(String kind) {
- this.kind = kind;
+ @Override
+ public V1ListMeta getMetadata() {
+ return metadata;
}
- public List<NotebookCR> getItems() {
+ @Override
+ public List<TFJob> getItems() {
return items;
}
@Override
public String getApiVersion() {
- return apiVersion;
+ return TFJob.CRD_TF_API_VERSION_V1;
}
@Override
public String getKind() {
-
- return kind;
- }
-
- @Override
- public V1ListMeta getMetadata() {
- return metadata;
+ return TFJob.CRD_TF_KIND_V1 + "List";
}
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobReplicaType.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobReplicaType.java
index dc75dcc..1ef10ca 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobReplicaType.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobReplicaType.java
@@ -62,7 +62,6 @@ public enum TFJobReplicaType implements MLJobReplicaType {
}
return names;
}
-
@Override
public String getTypeName() {
return this.typeName;
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
index e2f3d01..ae21b81 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
@@ -21,20 +21,18 @@ package
org.apache.submarine.server.submitter.k8s.model.tfjob;
import com.google.gson.annotations.SerializedName;
import org.apache.submarine.server.submitter.k8s.model.MLJobReplicaSpec;
-import org.apache.submarine.server.submitter.k8s.model.MLJobReplicaType;
-import org.apache.submarine.server.submitter.k8s.model.MLJobSpec;
import java.util.Map;
/**
* The replica spec of TFJob.
*/
-public class TFJobSpec extends MLJobSpec {
+public class TFJobSpec {
/**
* Key: Chief, Ps, Worker, Evaluator
*/
@SerializedName("tfReplicaSpecs")
- private Map<MLJobReplicaType, MLJobReplicaSpec> tfReplicaSpecs;
+ private Map<TFJobReplicaType, MLJobReplicaSpec> tfReplicaSpecs;
/**
@@ -42,8 +40,7 @@ public class TFJobSpec extends MLJobSpec {
*
* @return map
*/
- @Override
- public Map<MLJobReplicaType, MLJobReplicaSpec> getReplicaSpecs() {
+ public Map<TFJobReplicaType, MLJobReplicaSpec> getReplicaSpecs() {
return tfReplicaSpecs;
}
@@ -52,9 +49,8 @@ public class TFJobSpec extends MLJobSpec {
*
* @param tfReplicaSpecs map
*/
- @Override
public void setReplicaSpecs(
- Map<MLJobReplicaType, MLJobReplicaSpec> tfReplicaSpecs) {
+ Map<TFJobReplicaType, MLJobReplicaSpec> tfReplicaSpecs) {
this.tfReplicaSpecs = tfReplicaSpecs;
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
index dee6cb8..1c5edbe 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/ExperimentSpecParser.java
@@ -46,7 +46,6 @@ import
org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.CodeLo
import
org.apache.submarine.server.submitter.k8s.experiment.codelocalizer.SSHGitCodeLocalizer;
import org.apache.submarine.server.submitter.k8s.model.MLJob;
import org.apache.submarine.server.submitter.k8s.model.MLJobReplicaSpec;
-import org.apache.submarine.server.submitter.k8s.model.MLJobReplicaType;
import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
import
org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJobReplicaType;
import
org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJobSpec;
@@ -89,7 +88,7 @@ public class ExperimentSpecParser {
public static PyTorchJobSpec parsePyTorchJobSpec(ExperimentSpec
experimentSpec)
throws InvalidSpecException {
PyTorchJobSpec pyTorchJobSpec = new PyTorchJobSpec();
- Map<MLJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
+ Map<PyTorchJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new
HashMap<>();
for (Map.Entry<String, ExperimentTaskSpec> entry :
experimentSpec.getSpec().entrySet()) {
String replicaType = entry.getKey();
ExperimentTaskSpec taskSpec = entry.getValue();
@@ -129,7 +128,7 @@ public class ExperimentSpecParser {
private static TFJobSpec parseTFJobSpec(ExperimentSpec experimentSpec)
throws InvalidSpecException {
TFJobSpec tfJobSpec = new TFJobSpec();
- Map<MLJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
+ Map<TFJobReplicaType, MLJobReplicaSpec> replicaSpecMap = new HashMap<>();
for (Map.Entry<String, ExperimentTaskSpec> entry :
experimentSpec.getSpec().entrySet()) {
String replicaType = entry.getKey();
ExperimentTaskSpec taskSpec = entry.getValue();
@@ -320,7 +319,7 @@ public class ExperimentSpecParser {
templateSpec.setSpec(podSpec);
return templateSpec;
}
-
+
private static List<V1EnvVar> parseEnvVars(ExperimentTaskSpec spec,
Map<String, String> defaultEnvs) {
if (spec.getEnvVars() != null) {
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
index f6e4e8c..457596e 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
@@ -20,12 +20,11 @@
package org.apache.submarine.server.submitter.k8s.util;
import java.util.List;
-import io.kubernetes.client.openapi.models.V1DeleteOptions;
-import io.kubernetes.client.openapi.models.V1DeleteOptionsBuilder;
import io.kubernetes.client.openapi.models.V1JobCondition;
import io.kubernetes.client.openapi.models.V1JobStatus;
import io.kubernetes.client.openapi.models.V1Status;
import io.kubernetes.client.openapi.models.V1StatusDetails;
+import io.kubernetes.client.util.generic.options.DeleteOptions;
import org.apache.submarine.server.api.experiment.Experiment;
import org.apache.submarine.server.submitter.k8s.model.MLJob;
import org.joda.time.DateTime;
@@ -89,7 +88,9 @@ public class MLJobConverter {
return experiment;
}
- public static V1DeleteOptions toDeleteOptionsFromMLJob(MLJob job) {
- return new
V1DeleteOptionsBuilder().withApiVersion(job.getApiVersion()).build();
+ public static DeleteOptions toDeleteOptionsFromMLJob(MLJob job) {
+ DeleteOptions deleteOptions = new DeleteOptions();
+ deleteOptions.setApiVersion(job.getApiVersion());
+ return deleteOptions;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]