This is an automated email from the ASF dual-hosted git repository.
pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 7411c5d Submarine 1108. Refactor CRUD operation of Notebook
7411c5d is described below
commit 7411c5d1c8a6951d1f671774e1f68e2636a64332
Author: FatalLin <[email protected]>
AuthorDate: Mon Jan 31 11:59:27 2022 +0800
Submarine 1108. Refactor CRUD operation of Notebook
### What is this PR for?
Just like we mentioned in ticket, we tried to refactor the status sync-up
mechanism in a more k8s native way. So in this pr, I leveraged the sidecar
pattern to monitor the status of notebook object, this agent's responsibility
is monitoring the status of notebook from started to terminated, and each
change will be pass to submaine-server transparently via internal api.
### What type of PR is it?
Feature
### Todos
* [ ] - Task




### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-1108
### How should this be tested?
this refactoring should pass all the existed notebook operation tests.
### Screenshots (if appropriate)
### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? No
Author: FatalLin <[email protected]>
Signed-off-by: Kevin <[email protected]>
Closes #877 from FatalLin/SUBMARINE-1108 and squashes the following commits:
f1cc552d [FatalLin] rollback codes to fit testing behavior
ba2fdd5e [FatalLin] justify init status of notebook creation
6d797ed8 [FatalLin] fix
c5fa8a3d [FatalLin] format
87d98614 [FatalLin] remove unsed code section
71647660 [FatalLin] merge
225e06d6 [FatalLin] refactory of notebook crud opertion
f09dd69e [FatalLin] commit for pair programming
---
.github/workflows/deploy_docker_images.yml | 5 +
dev-support/docker-images/agent/build.sh | 1 +
dev-support/docker-images/submarine/build.sh | 1 +
.../submarine/submarine-observer-rbac.yaml | 44 +++++++++
.../artifacts/submarine/submarine-rbac.yaml | 33 +++++++
.../artifacts/submarine/submarine-server.yaml | 6 ++
submarine-cloud-v2/pkg/controller/controller.go | 7 ++
.../pkg/controller/submarine_observer_rbac.go | 104 ++++++++++++++++++++
.../submarine/server/api/notebook/Notebook.java | 5 +-
.../submarine/server/notebook/NotebookManager.java | 26 ++---
.../server-submitter/submarine-k8s-agent/pom.xml | 28 +-----
.../submarine/server/k8s/agent/SubmarineAgent.java | 6 +-
.../k8s/agent/handler/CustomResourceHandler.java | 84 ++++++++++++----
.../server/k8s/agent/handler/NotebookHandler.java | 107 +++++++++++++++++++++
.../server/submitter/k8s/K8sSubmitter.java | 12 +--
.../server/submitter/k8s/model/NotebookCR.java | 4 +-
.../server/submitter/k8s/model/NotebookCRList.java | 48 ++++++++-
.../submitter/k8s/parser/NotebookSpecParser.java | 51 +++++++++-
.../submitter/k8s/NotebookSpecParserTest.java | 2 +-
19 files changed, 502 insertions(+), 72 deletions(-)
diff --git a/.github/workflows/deploy_docker_images.yml
b/.github/workflows/deploy_docker_images.yml
index 3ec3653..287bfac 100644
--- a/.github/workflows/deploy_docker_images.yml
+++ b/.github/workflows/deploy_docker_images.yml
@@ -55,6 +55,11 @@ jobs:
- name: Push submarine-server docker image
run: docker push apache/submarine:server-$SUBMARINE_VERSION
+ - name: Build submarine agent
+ run: ./dev-support/docker-images/agent/build.sh
+ - name: Push submarine-agent docker image
+ run: docker push apache/submarine:agent-$SUBMARINE_VERSION
+
- name: Build submarine database
run: ./dev-support/docker-images/database/build.sh
- name: Push submarine-database docker image
diff --git a/dev-support/docker-images/agent/build.sh
b/dev-support/docker-images/agent/build.sh
index b791b36..5ef4a3e 100755
--- a/dev-support/docker-images/agent/build.sh
+++ b/dev-support/docker-images/agent/build.sh
@@ -46,6 +46,7 @@ cp ${SUBMARINE_HOME}/conf/submarine-site.xml
"${CURRENT_PATH}/tmp/"
# build image
cd ${CURRENT_PATH}
+
echo "Start building the ${SUBMARINE_IMAGE_NAME} docker image ..."
docker build -t ${SUBMARINE_IMAGE_NAME} .
diff --git a/dev-support/docker-images/submarine/build.sh
b/dev-support/docker-images/submarine/build.sh
index 86776c7..a20e556 100755
--- a/dev-support/docker-images/submarine/build.sh
+++ b/dev-support/docker-images/submarine/build.sh
@@ -58,6 +58,7 @@ cp ${SUBMARINE_HOME}/bin/submarine.sh "${CURRENT_PATH}/tmp/"
# build image
cd ${CURRENT_PATH}
echo "Start building the ${SUBMARINE_IMAGE_NAME} docker image ..."
+
docker build -t ${SUBMARINE_IMAGE_NAME} .
# clean temp file
diff --git
a/submarine-cloud-v2/artifacts/submarine/submarine-observer-rbac.yaml
b/submarine-cloud-v2/artifacts/submarine/submarine-observer-rbac.yaml
new file mode 100644
index 0000000..c3cd43c
--- /dev/null
+++ b/submarine-cloud-v2/artifacts/submarine/submarine-observer-rbac.yaml
@@ -0,0 +1,44 @@
+# Source: submarine/templates/rbac.yaml
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+ name: "submarine-observer"
+rules:
+- apiGroups:
+ - kubeflow.org
+ resources:
+ - tfjobs
+ - tfjobs/status
+ - pytorchjobs
+ - pytorchjobs/status
+ - notebooks
+ - notebooks/status
+ verbs:
+ - get
+ - list
+ - watch
+- apiGroups:
+ - ""
+ resources:
+ - pods
+ - pods/log
+ - services
+ - persistentvolumeclaims
+ - events
+ - configmaps
+ verbs:
+ - get
+ - list
+ - watch
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: "submarine-observer"
+subjects:
+- kind: ServiceAccount
+ name: "default"
+roleRef:
+ kind: Role
+ name: "submarine-observer"
+ apiGroup: rbac.authorization.k8s.io
\ No newline at end of file
diff --git a/submarine-cloud-v2/artifacts/submarine/submarine-rbac.yaml
b/submarine-cloud-v2/artifacts/submarine/submarine-rbac.yaml
index 08eaac1..0429e9c 100644
--- a/submarine-cloud-v2/artifacts/submarine/submarine-rbac.yaml
+++ b/submarine-cloud-v2/artifacts/submarine/submarine-rbac.yaml
@@ -100,6 +100,26 @@ rules:
- '*'
---
# Source: submarine/templates/rbac.yaml
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+ name: "observer"
+rules:
+- apiGroups:
+ - kubeflow.org
+ resources:
+ - tfjobs
+ - tfjobs/status
+ - pytorchjobs
+ - pytorchjobs/status
+ - notebooks
+ - notebooks/status
+ verbs:
+ - get
+ - list
+ - watch
+---
+# Source: submarine/templates/rbac.yaml
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
@@ -111,3 +131,16 @@ roleRef:
kind: Role
name: "submarine-server"
apiGroup: rbac.authorization.k8s.io
+---
+# Source: submarine/templates/rbac.yaml
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: "observer"
+subjects:
+- kind: ServiceAccount
+ name: "default"
+roleRef:
+ kind: Role
+ name: "observer"
+ apiGroup: rbac.authorization.k8s.io
diff --git a/submarine-cloud-v2/artifacts/submarine/submarine-server.yaml
b/submarine-cloud-v2/artifacts/submarine/submarine-server.yaml
index 60ca20b..783afb8 100644
--- a/submarine-cloud-v2/artifacts/submarine/submarine-server.yaml
+++ b/submarine-cloud-v2/artifacts/submarine/submarine-server.yaml
@@ -6,6 +6,12 @@ metadata:
name: "submarine-server"
---
# Source: submarine/templates/submarine-server.yaml
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ name: "default"
+---
+# Source: submarine/templates/submarine-server.yaml
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
diff --git a/submarine-cloud-v2/pkg/controller/controller.go
b/submarine-cloud-v2/pkg/controller/controller.go
index 9825f7a..d28fce3 100644
--- a/submarine-cloud-v2/pkg/controller/controller.go
+++ b/submarine-cloud-v2/pkg/controller/controller.go
@@ -54,6 +54,7 @@ const storageClassName = "submarine-storageclass"
const (
serverName = "submarine-server"
+ observerName = "submarine-observer"
databaseName = "submarine-database"
databasePort = 3306
tensorboardName = "submarine-tensorboard"
@@ -78,6 +79,7 @@ const (
serverYamlPath = artifactPath + "submarine-server.yaml"
tensorboardYamlPath = artifactPath +
"submarine-tensorboard.yaml"
rbacYamlPath = artifactPath + "submarine-rbac.yaml"
+ observerRbacYamlPath = artifactPath +
"submarine-observer-rbac.yaml"
)
var dependents = []string{serverName, databaseName, tensorboardName,
mlflowName, minioName}
@@ -442,6 +444,11 @@ func (c *Controller) createSubmarine(submarine
*v1alpha1.Submarine) error {
return err
}
+ err = c.createSubmarineObserverRBAC(submarine)
+ if err != nil && !errors.IsAlreadyExists(err) {
+ return err
+ }
+
err = c.createSubmarineTensorboard(submarine)
if err != nil && !errors.IsAlreadyExists(err) {
return err
diff --git a/submarine-cloud-v2/pkg/controller/submarine_observer_rbac.go
b/submarine-cloud-v2/pkg/controller/submarine_observer_rbac.go
new file mode 100644
index 0000000..5f65119
--- /dev/null
+++ b/submarine-cloud-v2/pkg/controller/submarine_observer_rbac.go
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+ "context"
+ "fmt"
+
+ v1alpha1
"github.com/apache/submarine/submarine-cloud-v2/pkg/apis/submarine/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ rbacv1 "k8s.io/api/rbac/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+)
+
+func newSubmarineObserverRole(submarine *v1alpha1.Submarine) *rbacv1.Role {
+ role, err := ParseRoleYaml(observerRbacYamlPath)
+ if err != nil {
+ klog.Info("[Error] ParseRole", err)
+ }
+ role.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
+ *metav1.NewControllerRef(submarine,
v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+ }
+
+ return role
+}
+
+func newSubmarineObserverRoleBinding(submarine *v1alpha1.Submarine)
*rbacv1.RoleBinding {
+ roleBinding, err := ParseRoleBindingYaml(observerRbacYamlPath)
+ if err != nil {
+ klog.Info("[Error] ParseRoleBinding", err)
+ }
+ roleBinding.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
+ *metav1.NewControllerRef(submarine,
v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+ }
+
+ return roleBinding
+}
+
+// createSubmarineObserverRBAC is a function to create RBAC for
submarine-observer which will be binded on service account: default.
+// Reference:
https://github.com/apache/submarine/blob/master/helm-charts/submarine/templates/rbac.yaml
+func (c *Controller) createSubmarineObserverRBAC(submarine
*v1alpha1.Submarine) error {
+ klog.Info("[createSubmarineServerRBAC]")
+
+ // Step1: Create Role
+ role, err := c.roleLister.Roles(submarine.Namespace).Get(observerName)
+ // If the resource doesn't exist, we'll create it
+ if errors.IsNotFound(err) {
+ role, err =
c.kubeclientset.RbacV1().Roles(submarine.Namespace).Create(context.TODO(),
newSubmarineObserverRole(submarine), metav1.CreateOptions{})
+ klog.Info(" Create Role: ", role.Name)
+ }
+
+ // If an error occurs during Get/Create, we'll requeue the item so we
can
+ // attempt processing again later. This could have been caused by a
+ // temporary network failure, or any other transient reason.
+ if err != nil {
+ return err
+ }
+
+ if !metav1.IsControlledBy(role, submarine) {
+ msg := fmt.Sprintf(MessageResourceExists, role.Name)
+ c.recorder.Event(submarine, corev1.EventTypeWarning,
ErrResourceExists, msg)
+ return fmt.Errorf(msg)
+ }
+
+ rolebinding, rolebinding_err :=
c.rolebindingLister.RoleBindings(submarine.Namespace).Get(observerName)
+ // If the resource doesn't exist, we'll create it
+ if errors.IsNotFound(rolebinding_err) {
+ rolebinding, rolebinding_err =
c.kubeclientset.RbacV1().RoleBindings(submarine.Namespace).Create(context.TODO(),
newSubmarineObserverRoleBinding(submarine), metav1.CreateOptions{})
+ klog.Info(" Create RoleBinding: ", rolebinding.Name)
+ }
+
+ // If an error occurs during Get/Create, we'll requeue the item so we
can
+ // attempt processing again later. This could have been caused by a
+ // temporary network failure, or any other transient reason.
+ if rolebinding_err != nil {
+ return rolebinding_err
+ }
+
+ if !metav1.IsControlledBy(rolebinding, submarine) {
+ msg := fmt.Sprintf(MessageResourceExists, rolebinding.Name)
+ c.recorder.Event(submarine, corev1.EventTypeWarning,
ErrResourceExists, msg)
+ return fmt.Errorf(msg)
+ }
+
+ return nil
+}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/notebook/Notebook.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/notebook/Notebook.java
index 6b048d2..b3204c1 100644
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/notebook/Notebook.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/notebook/Notebook.java
@@ -112,8 +112,9 @@ public class Notebook {
STATUS_RUNNING("running"),
STATUS_WAITING("waiting"),
STATUS_TERMINATING("terminating"),
- STATUS_NOT_FOUND("not_found"),
- STATUS_PULLING("pulling");
+ STATUS_PULLING("pulling"),
+ STATUS_FAILED("failed"),
+ STATUS_NOT_FOUND("not_found");
private String value;
Status(String value) {
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/notebook/NotebookManager.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/notebook/NotebookManager.java
index 17abae3..8dfd104 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/notebook/NotebookManager.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/notebook/NotebookManager.java
@@ -104,6 +104,7 @@ public class NotebookManager {
if (environment.getEnvironmentSpec() != null) {
notebookSpec.setEnvironment(environment.getEnvironmentSpec());
}
+ notebook.setStatus(Notebook.Status.STATUS_CREATING.getValue());
notebookService.insert(notebook);
return notebook;
}
@@ -118,11 +119,14 @@ public class NotebookManager {
public List<Notebook> listNotebooksByNamespace(String namespace) throws
SubmarineRuntimeException {
List<Notebook> notebookList = new ArrayList<>();
for (Notebook notebook : notebookService.selectAll()) {
- Notebook patchNotebook = submitter.findNotebook(notebook.getSpec());
- if (namespace == null || namespace.length() == 0
- ||
namespace.toLowerCase().equals(patchNotebook.getSpec().getMeta().getNamespace()))
{
- notebook.rebuild(patchNotebook);
- notebookList.add(notebook);
+ if (namespace == null || namespace.length() == 0 ){
+ if
(notebook.getStatus().equals(Notebook.Status.STATUS_CREATING.getValue())) {
+ Notebook patchNotebook = submitter.findNotebook(notebook.getSpec());
+ notebook.rebuild(patchNotebook);
+ notebookList.add(notebook);
+ } else {
+ notebookList.add(notebook);
+ }
}
}
return notebookList;
@@ -139,13 +143,13 @@ public class NotebookManager {
List<Notebook> notebookList = new ArrayList<>();
for (Notebook nb : serviceNotebooks) {
try {
- Notebook notebook = submitter.findNotebook(nb.getSpec());
- notebook.setNotebookId(nb.getNotebookId());
- notebook.setSpec(nb.getSpec());
- if (notebook.getCreatedTime() == null) {
- notebook.setCreatedTime(nb.getCreatedTime());
+ if (nb.getStatus().equals(Notebook.Status.STATUS_CREATING.getValue()))
{
+ Notebook patchNotebook = submitter.findNotebook(nb.getSpec());
+ nb.rebuild(patchNotebook);
+ notebookList.add(nb);
+ } else {
+ notebookList.add(nb);
}
- notebookList.add(notebook);
} catch (SubmarineRuntimeException e) {
LOG.error("Error when get notebook resource, skip this row!", e);
}
diff --git a/submarine-server/server-submitter/submarine-k8s-agent/pom.xml
b/submarine-server/server-submitter/submarine-k8s-agent/pom.xml
index 5eb624e..44beae0 100644
--- a/submarine-server/server-submitter/submarine-k8s-agent/pom.xml
+++ b/submarine-server/server-submitter/submarine-k8s-agent/pom.xml
@@ -27,9 +27,6 @@
<artifactId>submarine-k8s-agent</artifactId>
<name>Submarine: K8S Agent</name>
- <properties>
- <agent.k8s.client-java.version>11.0.1</agent.k8s.client-java.version>
- </properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
@@ -44,32 +41,9 @@
</dependency>
<dependency>
- <groupId>io.kubernetes</groupId>
- <artifactId>client-java</artifactId>
- <version>${agent.k8s.client-java.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.submarine</groupId>
- <artifactId>submarine-server-core</artifactId>
+ <artifactId>submarine-submitter-k8s</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>javax.annotation</groupId>
- <artifactId>javax.annotation-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcpkix-jdk15on</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15on</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
<profiles>
diff --git
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgent.java
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgent.java
index 47d626d..7e1aab2 100644
---
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgent.java
+++
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/SubmarineAgent.java
@@ -51,11 +51,11 @@ public class SubmarineAgent {
this.resourceId = resourceId;
this.type = CustomResourceType.valueOf(customResourceType);
this.handler = HandlerFactory.getHandler(this.type);
- this.handler.init(serverHost, serverPort, namespace,
customResourceType, customResourceName);
+ this.handler.init(serverHost, serverPort, namespace,
customResourceName, resourceId);
}
public void start() {
-
+ handler.run();
}
public static void main(String[] args) throws ClassNotFoundException,
InstantiationException, IllegalAccessException, IOException {
@@ -74,7 +74,7 @@ public class SubmarineAgent {
LOG.info(String.format("CUSTOM_RESOURCE_NAME:%s", customResourceName));
LOG.info(String.format("CUSTOM_RESOURCE_ID:%s", customResourceId));
- SubmarineAgent agent = new SubmarineAgent(serverHost, serverPort,
customResourceType,
+ SubmarineAgent agent = new SubmarineAgent(serverHost, serverPort,
namespace,
customResourceType, customResourceName, customResourceId);
agent.start();
diff --git
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/CustomResourceHandler.java
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/CustomResourceHandler.java
index 3e36908..e150e80 100644
---
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/CustomResourceHandler.java
+++
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/CustomResourceHandler.java
@@ -19,38 +19,66 @@
package org.apache.submarine.server.k8s.agent.handler;
+import java.io.FileReader;
import java.io.IOException;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.server.k8s.agent.util.RestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.util.Config;
+import io.kubernetes.client.openapi.apis.CustomObjectsApi;
+import io.kubernetes.client.util.ClientBuilder;
+import io.kubernetes.client.util.KubeConfig;
+import okhttp3.OkHttpClient;
public abstract class CustomResourceHandler {
- private CoreV1Api coreApi;
- private ApiClient client = null;
- private String namespace;
- private String crType;
- private String crName;
- private String serverHost;
- private Integer serverPort;
- private RestClient restClient;
+ private static final Logger LOG =
LoggerFactory.getLogger(CustomResourceHandler.class);
+ private static final String KUBECONFIG_ENV = "KUBECONFIG";
+ protected ApiClient client = null;
+ protected CustomObjectsApi customObjectsApi = null;
+ protected CoreV1Api coreV1Api = null;
+ protected String namespace;
+ protected String crType;
+ protected String crName;
+ protected String serverHost;
+ protected Integer serverPort;
+ protected String resourceId;
+ protected RestClient restClient;
public CustomResourceHandler() throws IOException {
- this.client = Config.defaultClient();
+ try {
+ String path = System.getenv(KUBECONFIG_ENV);
+ LOG.info("PATH:" + path);
+ KubeConfig config = KubeConfig.loadKubeConfig(new FileReader(path));
+ client = ClientBuilder.kubeconfig(config).build();
+ } catch (Exception e) {
+ LOG.info("Maybe in cluster mode, try to initialize the client again.");
+ try {
+ client = ClientBuilder.cluster().build();
+ } catch (IOException e1) {
+ LOG.error("Initialize K8s submitter failed. " + e.getMessage(), e1);
+ throw new SubmarineRuntimeException(500, "Initialize K8s submitter
failed.");
+ }
+ } finally {
+ // let watcher can wait until the next change
+ client.setReadTimeout(0);
+ OkHttpClient httpClient = client.getHttpClient();
+ this.client.setHttpClient(httpClient);
Configuration.setDefaultApiClient(client);
- this.coreApi = new CoreV1Api(this.client);
+ }
+
+ customObjectsApi = new CustomObjectsApi(client);
+ coreV1Api = new CoreV1Api(client);
}
public abstract void init(String serverHost, Integer serverPort,
- String namespace, String crType, String crName);
+ String namespace, String crName, String resourceId);
public abstract void run();
- public abstract void onAddEvent();
- public abstract void onModifyEvent();
- public abstract void onDeleteEvent();
public String getNamespace() {
return namespace;
@@ -75,7 +103,29 @@ public abstract class CustomResourceHandler {
public void setCrName(String crName) {
this.crName = crName;
}
-
-
+
+ public String getServerHost() {
+ return serverHost;
+ }
+
+ public void setServerHost(String serverHost) {
+ this.serverHost = serverHost;
+ }
+
+ public Integer getServerPort() {
+ return serverPort;
+ }
+
+ public void setServerPort(Integer serverPort) {
+ this.serverPort = serverPort;
+ }
+
+ public RestClient getRestClient() {
+ return restClient;
+ }
+
+ public void setRestClient(RestClient restClient) {
+ this.restClient = restClient;
+ }
}
diff --git
a/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java
new file mode 100644
index 0000000..c27850f
--- /dev/null
+++
b/submarine-server/server-submitter/submarine-k8s-agent/src/main/java/org/apache/submarine/server/k8s/agent/handler/NotebookHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.k8s.agent.handler;
+
+import java.io.IOException;
+
+import org.apache.submarine.server.api.common.CustomResourceType;
+import org.apache.submarine.server.api.notebook.Notebook;
+import org.apache.submarine.server.k8s.agent.util.RestClient;
+import org.apache.submarine.server.submitter.k8s.model.NotebookCR;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.reflect.TypeToken;
+
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.models.CoreV1Event;
+import io.kubernetes.client.openapi.models.V1PodList;
+import io.kubernetes.client.util.Watch;
+import io.kubernetes.client.util.Watch.Response;
+import io.kubernetes.client.util.Watchable;
+import okhttp3.Call;
+
+public class NotebookHandler extends CustomResourceHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(NotebookHandler.class);
+ private Watchable<CoreV1Event> watcher;
+ private String podName;
+ public NotebookHandler() throws IOException {
+ super();
+ }
+
+ @Override
+ public void init(String serverHost, Integer serverPort, String namespace,
+ String crName, String resourceId) {
+ this.serverHost = serverHost;
+ this.serverPort = serverPort;
+ this.namespace = namespace;
+ this.crName = crName;
+ this.resourceId = resourceId;
+
+ try {
+ String podLabelSelector = String.format("%s=%s", NotebookCR.NOTEBOOK_ID,
+ this.resourceId);
+ V1PodList podList = this.coreV1Api.listNamespacedPod(namespace, null,
null, null, null,
+ podLabelSelector, null, null, null, null, null);
+ this.podName = podList.getItems().get(0).getMetadata().getName();
+ String fieldSelector = String.format("involvedObject.name=%s",
this.podName);
+
+ Call call = coreV1Api.listNamespacedEventCall(namespace, null, null,
null, fieldSelector,
+ null, null, null, null, null, true, null);
+
+ watcher = Watch.createWatch(client, call, new
TypeToken<Response<CoreV1Event>>(){}.getType());
+
+ } catch (ApiException e) {
+ e.printStackTrace();
+ }
+ restClient = new RestClient(serverHost, serverPort);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ for (Response<CoreV1Event> event: watcher) {
+ String reason = event.object.getReason();
+ switch (reason) {
+ case "Created":
+ case "Scheduled":
+ restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, Notebook.Status.STATUS_CREATING.getValue());
+ break;
+ case "Started":
+ restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, Notebook.Status.STATUS_RUNNING.getValue());
+ break;
+ case "Failed":
+ restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, Notebook.Status.STATUS_FAILED.getValue());
+ break;
+ case "Pulling":
+ restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, Notebook.Status.STATUS_PULLING.getValue());
+ break;
+ case "Killing":
+ restClient.callStatusUpdate(CustomResourceType.Notebook,
this.resourceId, Notebook.Status.STATUS_TERMINATING.getValue());
+ LOG.info("Receive terminating event, exit progress");
+ return;
+ default:
+ LOG.info(String.format("Unprocessed event type:%s", reason));
+ }
+ }
+ }
+ }
+}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
index 94f7863..75f31b3 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java
@@ -33,6 +33,7 @@ import java.util.function.Function;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
+
import okhttp3.OkHttpClient;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
@@ -185,7 +186,7 @@ public class K8sSubmitter implements Submitter {
MLJob mlJob = ExperimentSpecParser.parseJob(spec);
mlJob.getMetadata().setNamespace(getServerNamespace());
mlJob.getMetadata().setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
-
+
Object object = api.createNamespacedCustomObject(mlJob.getGroup(),
mlJob.getVersion(),
mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob,
"true", null, null);
experiment = parseExperimentResponseObject(object,
ParseOp.PARSE_OP_RESULT);
@@ -415,8 +416,7 @@ public class K8sSubmitter implements Submitter {
// parse notebook custom resource
NotebookCR notebookCR;
try {
- notebookCR = NotebookSpecParser.parseNotebook(spec);
-
+ notebookCR = NotebookSpecParser.parseNotebook(spec, notebookId,
namespace);
notebookCR.getMetadata().setNamespace(namespace);
notebookCR.getMetadata().setOwnerReferences(OwnerReferenceUtils.getOwnerReference());
} catch (JsonSyntaxException e) {
@@ -484,7 +484,6 @@ public class K8sSubmitter implements Submitter {
throw new SubmarineRuntimeException(e.getCode(), "K8s submitter:
ingressroute for Notebook " +
"object failed by " + e.getMessage());
}
-
return notebook;
}
@@ -494,7 +493,7 @@ public class K8sSubmitter implements Submitter {
String namespace = getServerNamespace();
try {
- NotebookCR notebookCR = NotebookSpecParser.parseNotebook(spec);
+ NotebookCR notebookCR = NotebookSpecParser.parseNotebook(spec, null,
null);
Object object = api.getNamespacedCustomObject(notebookCR.getGroup(),
notebookCR.getVersion(),
namespace,
@@ -541,8 +540,7 @@ public class K8sSubmitter implements Submitter {
Notebook notebook = null;
final String name = spec.getMeta().getName();
String namespace = getServerNamespace();
- NotebookCR notebookCR = NotebookSpecParser.parseNotebook(spec);
-
+ NotebookCR notebookCR = NotebookSpecParser.parseNotebook(spec, null, null);
try {
Object object = api.deleteNamespacedCustomObject(notebookCR.getGroup(),
notebookCR.getVersion(),
namespace, notebookCR.getPlural(),
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCR.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCR.java
index 143d4a4..ae495df 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCR.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCR.java
@@ -20,9 +20,11 @@
package org.apache.submarine.server.submitter.k8s.model;
import com.google.gson.annotations.SerializedName;
+
+import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
-public class NotebookCR {
+public class NotebookCR implements KubernetesObject{
public static final String CRD_NOTEBOOK_VERSION_V1 = "v1";
public static final String CRD_NOTEBOOK_GROUP_V1 = "kubeflow.org";
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
index ccdca63..3c236e6 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/NotebookCRList.java
@@ -21,14 +21,60 @@ package org.apache.submarine.server.submitter.k8s.model;
import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.common.KubernetesListObject;
+import io.kubernetes.client.openapi.models.V1ListMeta;
+
import java.util.List;
-public class NotebookCRList {
+public class NotebookCRList implements KubernetesListObject{
+
+ public static final String CRD_NOTEBOOK_VERSION_V1 = "v1alpha1";
+ public static final String CRD_NOTEBOOK_GROUP_V1 = "kubeflow.org";
+ public static final String CRD_APIVERSION_V1 = CRD_NOTEBOOK_GROUP_V1 + "/" +
CRD_NOTEBOOK_VERSION_V1;
+ public static final String CRD_NOTEBOOK_LIST_KIND_V1 = "NotebookList";
+
+ public NotebookCRList() {
+ setApiVersion(CRD_APIVERSION_V1);
+ setKind(CRD_NOTEBOOK_LIST_KIND_V1);
+ }
+
+ @SerializedName("apiVersion")
+ private String apiVersion;
+ @SerializedName("kind")
+ private String kind;
+
+ @SerializedName("metadata")
+ private V1ListMeta metadata;
+
@SerializedName("items")
private List<NotebookCR> items;
+
+ public void setApiVersion(String apiVersion) {
+ this.apiVersion = apiVersion;
+ }
+
+ public void setKind(String kind) {
+ this.kind = kind;
+ }
public List<NotebookCR> getItems() {
return items;
}
+
+ @Override
+ public String getApiVersion() {
+ return apiVersion;
+ }
+
+ @Override
+ public String getKind() {
+
+ return kind;
+ }
+
+ @Override
+ public V1ListMeta getMetadata() {
+ return metadata;
+ }
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
index a0533f4..18300e7 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/NotebookSpecParser.java
@@ -34,6 +34,7 @@ import
io.kubernetes.client.openapi.models.V1PersistentVolumeClaimVolumeSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.submarine.commons.utils.SubmarineConfVars;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.api.common.CustomResourceType;
import org.apache.submarine.server.api.environment.Environment;
import org.apache.submarine.server.api.spec.EnvironmentSpec;
import org.apache.submarine.server.api.spec.KernelSpec;
@@ -63,10 +64,11 @@ public class NotebookSpecParser {
SubmarineConfiguration.getInstance();
- public static NotebookCR parseNotebook(NotebookSpec spec) {
+ public static NotebookCR parseNotebook(NotebookSpec spec, String notebookId,
String namespace) {
NotebookCR notebookCR = new NotebookCR();
notebookCR.setMetadata(parseMetadata(spec));
notebookCR.setSpec(parseNotebookCRSpec(spec));
+ appendSidecar(notebookCR, notebookId, namespace);
return notebookCR;
}
@@ -214,8 +216,9 @@ public class NotebookSpecParser {
containers.add(container);
podSpec.setContainers(containers);
podSpec.setVolumes(volumeList);
+ podSpec.setTerminationGracePeriodSeconds(120L);
podTemplateSpec.setSpec(podSpec);
-
+
return podTemplateSpec;
}
@@ -280,4 +283,48 @@ public class NotebookSpecParser {
+ "activation.\"; fi");
return condaVersionValidationCommand.toString();
}
+
+ private static void appendSidecar(NotebookCR notebookCR, String notebookId,
String namespace) {
+ NotebookCRSpec notebookCRSpec = notebookCR.getSpec();
+ List<V1Container> containers =
notebookCRSpec.getTemplate().getSpec().getContainers();
+ V1Container agentContainer = new V1Container();
+ agentContainer.setName("agent");
+ agentContainer.setImage("apache/submarine:sidecar-agent-0.7.0-SNAPSHOT");
+
+ List<V1EnvVar> envVarList = new ArrayList<>();
+ V1EnvVar crTypeVar = new V1EnvVar();
+ crTypeVar.setName("CUSTOM_RESOURCE_TYPE");
+ crTypeVar.setValue(CustomResourceType.Notebook.toString());
+
+ V1EnvVar crNameVar = new V1EnvVar();
+ crNameVar.setName("CUSTOM_RESOURCE_NAME");
+ crNameVar.setValue(notebookCR.getMetadata().getName());
+
+ V1EnvVar namespaceVar = new V1EnvVar();
+ namespaceVar.setName("NAMESPACE");
+ namespaceVar.setValue(namespace);
+
+ V1EnvVar serverHostVar = new V1EnvVar();
+ serverHostVar.setName("SERVER_HOST");
+ serverHostVar.setValue(conf.getServerServiceName());
+
+ V1EnvVar serverPortVar = new V1EnvVar();
+ serverPortVar.setName("SERVER_PORT");
+ serverPortVar.setValue(String.valueOf(conf.getServerPort()));
+
+ V1EnvVar customResourceIdVar = new V1EnvVar();
+ customResourceIdVar.setName("CUSTOM_RESOURCE_ID");
+ customResourceIdVar.setValue(notebookId);
+
+ envVarList.add(crTypeVar);
+ envVarList.add(crNameVar);
+ envVarList.add(namespaceVar);
+ envVarList.add(serverHostVar);
+ envVarList.add(serverPortVar);
+ envVarList.add(customResourceIdVar);
+
+ agentContainer.env(envVarList);
+
+ containers.add(0, agentContainer);
+ }
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
index 2bf0564..d493f23 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/NotebookSpecParserTest.java
@@ -41,7 +41,7 @@ public class NotebookSpecParserTest extends SpecBuilder {
@Test
public void testValidNotebook() throws IOException, URISyntaxException {
NotebookSpec notebookSpec = (NotebookSpec)
buildFromJsonFile(NotebookSpec.class, notebookReqFile);
- NotebookCR notebook = NotebookSpecParser.parseNotebook(notebookSpec);
+ NotebookCR notebook = NotebookSpecParser.parseNotebook(notebookSpec, null,
null);
validateMetadata(notebookSpec.getMeta(), notebook.getMetadata());
validateEnvironment(notebookSpec, notebook.getSpec());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]