This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 56b2a56 SUBMARINE-455. Support CRUD job in submarine-server REST
56b2a56 is described below
commit 56b2a566696409d83f0b27c66890e9923cfdfedd
Author: Wanqiang Ji <[email protected]>
AuthorDate: Thu Apr 9 20:23:01 2020 +0800
SUBMARINE-455. Support CRUD job in submarine-server REST
### What is this PR for?
Nowadays the submarine-server only support create the job by REST. We
should implement the other operations.
- GET/PATCH/DELETE job by REST
- Fill more info for Job object, such as the job
status(accepted/created/running/succeeded) with time and the spec etc.
### What type of PR is it?
Feature
### Todos
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-455
### How should this be tested?
https://travis-ci.com/github/jiwq/submarine/builds/155090047
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Wanqiang Ji <[email protected]>
Closes #249 from jiwq/SUBMARINE-455 and squashes the following commits:
3049448 [Wanqiang Ji] SUBMARINE-455. Support CRUD job in submarine-server
REST
---
.gitignore | 1 +
conf/submarine-site.xml.template | 24 ++
dev-support/k8s/deploy-kubeflow-operators.sh | 161 +++++++++++
dev-support/k8s/pytorchjob/deployment.yaml | 2 +-
.../k8s/pytorchjob/deployment_v1.16plus.yaml | 2 +-
dev-support/k8s/pytorchjob/rbac.yaml | 4 +-
dev-support/k8s/pytorchjob/service.yaml | 2 +-
dev-support/k8s/tfjob/crd.yaml | 33 +--
dev-support/k8s/tfjob/operator/cluster-role.yaml | 109 ++++----
dev-support/k8s/tfjob/operator/deployment.yaml | 33 +--
dev-support/k8s/tfjob/operator/kustomization.yaml | 4 +-
docs/submarine-server/README.md | 306 +++++++++++++++++----
submarine-cloud/hack/integration-test.sh | 1 +
.../manifests/submarine-cluster/rbac.yaml | 5 +
.../submarine-cluster/submarine-server.yaml | 2 +-
.../utils/exception/SubmarineRuntimeException.java | 11 +
.../apache/submarine/server/api/JobHandler.java | 71 -----
.../api/exception/UnsupportedJobTypeException.java | 28 --
.../org/apache/submarine/server/api/job/Job.java | 136 +++++++--
.../org/apache/submarine/server/api/job/JobId.java | 5 +-
.../server/api/{ => job}/JobSubmitter.java | 38 ++-
.../org/apache/submarine/server/JobManager.java | 104 -------
.../apache/submarine/server/SubmarineServer.java | 3 +-
.../apache/submarine/server/SubmitterManager.java | 2 +-
.../apache/submarine/server/job/JobManager.java | 194 +++++++++++++
.../submarine/server/response/JsonResponse.java | 7 +-
.../submarine/server/rest/JobManagerRestApi.java | 92 ++++---
.../server/AbstractSubmarineServerTest.java | 11 +-
.../submarine/server/SubmarineServerTest.java | 4 -
.../server/rest/JobManagerRestApiTest.java | 181 ------------
.../server/submitter/k8s/K8sJobRequest.java | 89 ------
.../server/submitter/k8s/K8sJobSubmitter.java | 157 +++++------
.../server/submitter/k8s/model/MLJob.java | 12 +
.../server/submitter/k8s/util/MLJobConverter.java | 97 +++++++
.../server/submitter/k8s/JobSpecParserTest.java | 34 +--
.../server/submitter/k8s/K8SJobSubmitterTest.java | 152 +++-------
.../server/submitter/k8s/MLJobConverterTest.java | 107 +++++++
.../server/submitter/k8s/SpecBuilder.java | 52 ++++
.../src/test/resources/pytorch_job_mnist_gloo.json | 47 ----
.../src/test/resources/tf_job_mnist.json | 48 ----
.../src/test/resources/tf_job_mnist.yaml | 29 --
submarine-test/test-k8s/pom.xml | 16 ++
.../apache/submarine/rest/JobManagerRestApiIT.java | 279 +++++++++++++++++--
.../test/resources/pytorch/pt-mnist-patch-req.json | 28 ++
.../test/resources/pytorch/pt-mnist-patch-req.yaml | 21 ++
.../src/test/resources/pytorch/pt-mnist-req.json | 28 ++
.../src/test/resources/pytorch/pt-mnist-req.yaml | 21 ++
.../resources/tensorflow/tf-mnist-patch-req.json | 16 +-
.../resources/tensorflow/tf-mnist-patch-req.yaml | 10 +-
.../resources/{ => tensorflow}/tf-mnist-req.json | 10 +-
.../test/resources/tensorflow}/tf-mnist-req.yaml | 11 +-
51 files changed, 1739 insertions(+), 1101 deletions(-)
diff --git a/.gitignore b/.gitignore
index aa35d37..ebf34fc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -83,6 +83,7 @@ spark-1.*-bin-hadoop*
submarine-cloud/vendor/*
submarine-cloud/output/*
submarine-cloud/hack/conf/*
+submarine-cloud/hack/output/*
submarine-cloud/bin/*
submarine-security/spark-security/dependency-reduced-pom.xml
diff --git a/conf/submarine-site.xml.template b/conf/submarine-site.xml.template
index 4ebb047..e4caa2d 100755
--- a/conf/submarine-site.xml.template
+++ b/conf/submarine-site.xml.template
@@ -155,4 +155,28 @@
<description>Rpc server port</description>
</property>
+ <!-- Submarine Submitters Configuration -->
+ <property>
+ <name>submarine.submitters</name>
+ <value>k8s</value>
+ <description>Submitter list for the server</description>
+ </property>
+ <property>
+ <name>submarine.submitters.k8s.class</name>
+ <value>org.apache.submarine.server.submitter.k8s.K8sJobSubmitter</value>
+ <description>The entry class for the specified submitter</description>
+ </property>
+ <property>
+ <name>submarine.submitters.k8s.classpath</name>
+ <value>../lib/submitter/k8s/</value>
+ <description>The libs for Kubernetes submitter</description>
+ </property>
+
+ <!-- K8s Configuration -->
+ <property>
+ <name>submarine.k8s.kube.config</name>
+ <value>../conf/k8s/config</value>
+ <description>Kube config for kubernetes, you should get the config from
cluster</description>
+ </property>
+
</configuration>
diff --git a/dev-support/k8s/deploy-kubeflow-operators.sh
b/dev-support/k8s/deploy-kubeflow-operators.sh
new file mode 100755
index 0000000..d35ef86
--- /dev/null
+++ b/dev-support/k8s/deploy-kubeflow-operators.sh
@@ -0,0 +1,161 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -e
+
+readonly
TF_OPERATOR_IMAGE="gcr.io/kubeflow-images-public/tf_operator:v1.0.0-g92389064"
+readonly
PYTORCH_OPERATOR_IMAGE="gcr.io/kubeflow-images-public/pytorch-operator:v1.0.0-g047cf0f"
+readonly TF_MNIST_IMAGE="gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0"
+readonly PT_MNIST_IMAGE="apache/submarine:pytorch-dist-mnist-1.0"
+
+if [ -L "${BASH_SOURCE-$0}" ]; then
+ PWD=$(dirname "$(readlink "${BASH_SOURCE-$0}")")
+else
+ PWD=$(dirname "${BASH_SOURCE-$0}")
+fi
+CURRENT_PATH=$(cd "${PWD}">/dev/null; pwd)
+export CURRENT_PATH
+export SUBMARINE_HOME=${CURRENT_PATH}/../..
+# lib.sh use the ROOT variable
+export ROOT="${SUBMARINE_HOME}/submarine-cloud/"
+export KUBECONFIG="${HOME}/.kube/kind-config-${clusterName:-kind}"
+
+# shellcheck source=./../../submarine-cloud/hack/lib.sh
+source "${SUBMARINE_HOME}/submarine-cloud/hack/lib.sh"
+
+###########################################
+# Load local docker image into registry
+# Globals:
+# KIND_BIN
+# Arguments:
+# image
+###########################################
+function load_image_to_registry() {
+ if [[ ! $(docker inspect "$1" > /dev/null) ]] ; then
+ docker pull "$1"
+ fi
+ ${KIND_BIN} load docker-image "$1"
+}
+
+###########################################
+# Deploy tf-operator on K8s
+# Globals:
+# KUBECTL_BIN
+# CURRENT_PATH
+# TF_OPERATOR_IMAGE
+# Arguments:
+# useSample
+###########################################
+function deploy_tf_operator() {
+ load_image_to_registry "${TF_OPERATOR_IMAGE}"
+
+ ${KUBECTL_BIN} apply -f "${CURRENT_PATH}/tfjob/crd.yaml"
+ ${KUBECTL_BIN} kustomize "${CURRENT_PATH}/tfjob/operator" \
+ | ${KUBECTL_BIN} apply -f -
+
+ if [[ $1 == "true" ]]; then
+ load_image_to_registry "${TF_MNIST_IMAGE}"
+ fi
+}
+
+###########################################
+# Deploy tf-operator on K8s
+# Globals:
+# KUBECTL_BIN
+# CURRENT_PATH
+# PYTORCH_OPERATOR_IMAGE
+# Arguments:
+# useSample
+###########################################
+function deploy_pytorch_operator() {
+ load_image_to_registry "${PYTORCH_OPERATOR_IMAGE}"
+ ${KUBECTL_BIN} apply -f "${CURRENT_PATH}/pytorchjob"
+
+ if [[ $1 == "true" ]]; then
+ load_image_to_registry "${PT_MNIST_IMAGE}"
+ fi
+}
+
+###########################################
+# Print the usage information
+###########################################
+function usage() {
+ cat <<END
+
+This script aims to deploy the machine learning operator to K8s.
+
+Usage:
+ $0 [options]
+
+Options:
+ -a, --all deploy the TensorFlow and PyTorch operator
+ -tf, --tensorflow deploy the TensorFlow operator
+ -pt, --pytorch deploy the PyTorch operator
+ -s, --sample pull the sample docker image and load into K8s registry
+ -h, --help prints this usage message
+END
+}
+
+function main() {
+ if [[ $# -eq 0 ]]; then
+ usage
+ exit 1
+ fi
+
+ while [[ $# -gt 0 ]]; do
+ case $1 in
+ -a|--all)
+ opt_all="true"
+ shift ;;
+ -tf|--tensorflow)
+ opt_tf="true"
+ shift ;;
+ -pt|--pytorch)
+ opt_pt="true"
+ shift ;;
+ -s|--sample)
+ opt_s="true"
+ shift ;;
+ -h|--help)
+ usage
+ exit 0
+ ;;
+ *)
+ echo "Unknown options: $*"
+ usage
+ exit 2
+ ;;
+ esac
+ done
+
+ hack::ensure_kubectl
+
+ if [[ "${opt_tf}" == "true" && "${opt_pt}" == "true" ]]; then
+ opt_all="true"
+ fi
+ if [[ "${opt_all}" == "true" ]]; then
+ deploy_tf_operator ${opt_s}
+ deploy_pytorch_operator ${opt_s}
+ elif [[ "${opt_tf}" == "true" ]]; then
+ deploy_tf_operator ${opt_s}
+ elif [[ "${opt_pt}" == "true" ]]; then
+ deploy_pytorch_operator ${opt_s}
+ fi
+}
+
+main "$@"
diff --git a/dev-support/k8s/pytorchjob/deployment.yaml
b/dev-support/k8s/pytorchjob/deployment.yaml
index 8ddecda..441a517 100644
--- a/dev-support/k8s/pytorchjob/deployment.yaml
+++ b/dev-support/k8s/pytorchjob/deployment.yaml
@@ -3,7 +3,7 @@ apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: pytorch-operator
- namespace: submarine
+ namespace: default
spec:
replicas: 1
selector:
diff --git a/dev-support/k8s/pytorchjob/deployment_v1.16plus.yaml
b/dev-support/k8s/pytorchjob/deployment_v1.16plus.yaml
index e80a9ab..4a2238f 100644
--- a/dev-support/k8s/pytorchjob/deployment_v1.16plus.yaml
+++ b/dev-support/k8s/pytorchjob/deployment_v1.16plus.yaml
@@ -5,7 +5,7 @@ metadata:
labels:
name: pytorch-operator
name: pytorch-operator
- namespace: submarine
+ namespace: default
spec:
progressDeadlineSeconds: 2147483647
replicas: 1
diff --git a/dev-support/k8s/pytorchjob/rbac.yaml
b/dev-support/k8s/pytorchjob/rbac.yaml
index 637b859..0261c85 100644
--- a/dev-support/k8s/pytorchjob/rbac.yaml
+++ b/dev-support/k8s/pytorchjob/rbac.yaml
@@ -4,7 +4,7 @@ metadata:
labels:
app: pytorch-operator
name: pytorch-operator
- namespace: submarine
+ namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
@@ -49,5 +49,5 @@ roleRef:
subjects:
- kind: ServiceAccount
name: pytorch-operator
- namespace: submarine
+ namespace: default
---
diff --git a/dev-support/k8s/pytorchjob/service.yaml
b/dev-support/k8s/pytorchjob/service.yaml
index e9c0a70..8e57a49 100644
--- a/dev-support/k8s/pytorchjob/service.yaml
+++ b/dev-support/k8s/pytorchjob/service.yaml
@@ -8,7 +8,7 @@ metadata:
labels:
app: pytorch-operator
name: pytorch-operator
- namespace: submarine
+ namespace: default
spec:
ports:
- name: monitoring-port
diff --git a/dev-support/k8s/tfjob/crd.yaml b/dev-support/k8s/tfjob/crd.yaml
index b693c40..38f850b 100644
--- a/dev-support/k8s/tfjob/crd.yaml
+++ b/dev-support/k8s/tfjob/crd.yaml
@@ -3,19 +3,16 @@ kind: CustomResourceDefinition
metadata:
name: tfjobs.kubeflow.org
spec:
- additionalPrinterColumns:
- - JSONPath: .status.conditions[-1:].type
- name: State
- type: string
- - JSONPath: .metadata.creationTimestamp
- name: Age
- type: date
group: kubeflow.org
+ scope: Namespaced
names:
kind: TFJob
- plural: tfjobs
singular: tfjob
- scope: Namespaced
+ plural: tfjobs
+ versions:
+ - name: v1
+ served: true
+ storage: true
subresources:
status: {}
validation:
@@ -25,23 +22,21 @@ spec:
properties:
tfReplicaSpecs:
properties:
- Chief:
+ # The validation works when the configuration contains
+ # `Worker`, `PS` or `Chief`. Otherwise it will not be
validated.
+ Worker:
properties:
replicas:
- maximum: 1
- minimum: 1
type: integer
+ minimum: 1
PS:
properties:
replicas:
- minimum: 1
type: integer
- Worker:
+ minimum: 1
+ Chief:
properties:
replicas:
- minimum: 1
type: integer
- versions:
- - name: v1
- served: true
- storage: true
+ minimum: 1
+ maximum: 1
diff --git a/dev-support/k8s/tfjob/operator/cluster-role.yaml
b/dev-support/k8s/tfjob/operator/cluster-role.yaml
index 72b2903..2740b98 100644
--- a/dev-support/k8s/tfjob/operator/cluster-role.yaml
+++ b/dev-support/k8s/tfjob/operator/cluster-role.yaml
@@ -6,35 +6,36 @@ metadata:
app: tf-job-operator
name: tf-job-operator
rules:
-- apiGroups:
- - kubeflow.org
- resources:
- - tfjobs
- - tfjobs/status
- verbs:
- - '*'
-- apiGroups:
- - apiextensions.k8s.io
- resources:
- - customresourcedefinitions
- verbs:
- - '*'
-- apiGroups:
- - ""
- resources:
- - pods
- - services
- - endpoints
- - events
- verbs:
- - '*'
-- apiGroups:
- - apps
- - extensions
- resources:
- - deployments
- verbs:
- - '*'
+ - apiGroups:
+ - kubeflow.org
+ resources:
+ - tfjobs
+ - tfjobs/status
+ - tfjobs/finalizers
+ verbs:
+ - '*'
+ - apiGroups:
+ - apiextensions.k8s.io
+ resources:
+ - customresourcedefinitions
+ verbs:
+ - '*'
+ - apiGroups:
+ - ""
+ resources:
+ - pods
+ - services
+ - endpoints
+ - events
+ verbs:
+ - '*'
+ - apiGroups:
+ - apps
+ - extensions
+ resources:
+ - deployments
+ verbs:
+ - '*'
---
@@ -46,8 +47,8 @@ metadata:
rbac.authorization.kubeflow.org/aggregate-to-kubeflow-admin: "true"
aggregationRule:
clusterRoleSelectors:
- - matchLabels:
- rbac.authorization.kubeflow.org/aggregate-to-kubeflow-tfjobs-admin:
"true"
+ - matchLabels:
+ rbac.authorization.kubeflow.org/aggregate-to-kubeflow-tfjobs-admin:
"true"
rules: []
---
@@ -60,20 +61,20 @@ metadata:
rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rbac.authorization.kubeflow.org/aggregate-to-kubeflow-tfjobs-admin: "true"
rules:
-- apiGroups:
- - kubeflow.org
- resources:
- - tfjobs
- - tfjobs/status
- verbs:
- - get
- - list
- - watch
- - create
- - delete
- - deletecollection
- - patch
- - update
+ - apiGroups:
+ - kubeflow.org
+ resources:
+ - tfjobs
+ - tfjobs/status
+ verbs:
+ - get
+ - list
+ - watch
+ - create
+ - delete
+ - deletecollection
+ - patch
+ - update
---
@@ -84,12 +85,12 @@ metadata:
labels:
rbac.authorization.kubeflow.org/aggregate-to-kubeflow-view: "true"
rules:
-- apiGroups:
- - kubeflow.org
- resources:
- - tfjobs
- - tfjobs/status
- verbs:
- - get
- - list
- - watch
+ - apiGroups:
+ - kubeflow.org
+ resources:
+ - tfjobs
+ - tfjobs/status
+ verbs:
+ - get
+ - list
+ - watch
diff --git a/dev-support/k8s/tfjob/operator/deployment.yaml
b/dev-support/k8s/tfjob/operator/deployment.yaml
index bacd44e..411c73d 100644
--- a/dev-support/k8s/tfjob/operator/deployment.yaml
+++ b/dev-support/k8s/tfjob/operator/deployment.yaml
@@ -9,22 +9,23 @@ spec:
metadata:
labels:
name: tf-job-operator
+ annotations:
+ sidecar.istio.io/inject: "false"
spec:
containers:
- - command:
- - /opt/kubeflow/tf-operator.v1
- - --alsologtostderr
- - -v=1
- - --monitoring-port=8443
- env:
- - name: MY_POD_NAMESPACE
- valueFrom:
- fieldRef:
- fieldPath: metadata.namespace
- - name: MY_POD_NAME
- valueFrom:
- fieldRef:
- fieldPath: metadata.name
- image:
gcr.io/kubeflow-images-public/tf_operator:kubeflow-tf-operator-postsubmit-v1-5adee6f-6109-a25c
- name: tf-job-operator
+ - args:
+ - --alsologtostderr
+ - -v=1
+ - --monitoring-port=8443
+ env:
+ - name: MY_POD_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: MY_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+ image: gcr.io/kubeflow-images-public/tf_operator:v1.0.0-g92389064
+ name: tf-job-operator
serviceAccountName: tf-job-operator
diff --git a/dev-support/k8s/tfjob/operator/kustomization.yaml
b/dev-support/k8s/tfjob/operator/kustomization.yaml
index 86826d7..5be190f 100644
--- a/dev-support/k8s/tfjob/operator/kustomization.yaml
+++ b/dev-support/k8s/tfjob/operator/kustomization.yaml
@@ -1,6 +1,6 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
-namespace: submarine
+namespace: default
resources:
- cluster-role-binding.yaml
- cluster-role.yaml
@@ -12,4 +12,4 @@ commonLabels:
images:
- name: gcr.io/kubeflow-images-public/tf_operator
newName: gcr.io/kubeflow-images-public/tf_operator
- newTag: v0.7.0
+ newTag: v1.0.0-g92389064
diff --git a/docs/submarine-server/README.md b/docs/submarine-server/README.md
index 9f41e29..c1fc875 100644
--- a/docs/submarine-server/README.md
+++ b/docs/submarine-server/README.md
@@ -125,13 +125,12 @@ or
For more info about the spec definition see
[here](../design/submarine-server/jobspec.md).
-### Submit Job
-> Before submit training job, you should make sure you had deployed the
[submarine server and tf-operator](./setup-kubernetes.md#setup-submarine).
+## Job Operation by REST API
+### Create Job
+`POST /api/v1/jobs`
-You can use the Postman to post the job to server or use `curl` with the
following command:
-
-For Tensorflow Job:
-```bash
+**Example Request**
+```sh
curl -X POST -H "Content-Type: application/json" -d '
{
"name": "mnist",
@@ -149,11 +148,6 @@ curl -X POST -H "Content-Type: application/json" -d '
"namespace": "submarine"
},
"taskSpecs": {
- "Ps": {
- "name": "tensorflow",
- "replicas": 1,
- "resources": "cpu=1,memory=1024M"
- },
"Worker": {
"name": "tensorflow",
"replicas": 1,
@@ -161,19 +155,168 @@ curl -X POST -H "Content-Type: application/json" -d '
}
}
}
-' http://127.0.0.1:8080/api/v1/jobs
+' http://127.0.0.1/api/v1/jobs
```
-For PyTorch Job:
-```bash
-curl -X POST -H "Content-Type: application/json" -d '
+**Example Response:**
+```sh
{
- "name": "pytorch-dist-mnist-gloo",
+ "status": "OK",
+ "code": 200,
+ "result": {
+ "jobId": "job_1586156073228_0005",
+ "name": "mnist",
+ "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+ "status": "Accepted",
+ "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+ "spec": {
+ "name": "mnist",
+ "librarySpec": {
+ "name": "TensorFlow",
+ "version": "2.1.0",
+ "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+ "cmd": "python /var/tf_mnist/mnist_with_summaries.py
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+ "envVars": {
+ "ENV_1": "ENV1"
+ }
+ },
+ "submitterSpec": {
+ "type": "k8s",
+ "namespace": "submarine"
+ },
+ "taskSpecs": {
+ "Worker": {
+ "name": "tensorflow",
+ "resources": "cpu=1,memory=1024M",
+ "replicas": 1,
+ "resourceMap": {
+ "memory": "1024M",
+ "cpu": "1"
+ }
+ }
+ }
+ }
+ }
+}
+```
+
+### List Jobs
+`GET /api/v1/jobs`
+
+**Example Request:**
+```sh
+curl -X GET http://127.0.0.1/api/v1/jobs
+```
+
+**Example Response:**
+```sh
+{
+ "status": "OK",
+ "code": 200,
+ "result": [
+ {
+ "jobId": "job_1586156073228_0005",
+ "name": "mnist",
+ "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+ "status": "Created",
+ "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+ "createdTime": "2020-04-06T14:59:29.000+08:00",
+ "spec": {
+ "name": "mnist",
+ "librarySpec": {
+ "name": "TensorFlow",
+ "version": "2.1.0",
+ "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+ "cmd": "python /var/tf_mnist/mnist_with_summaries.py
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+ "envVars": {
+ "ENV_1": "ENV1"
+ }
+ },
+ "submitterSpec": {
+ "type": "k8s",
+ "namespace": "submarine"
+ },
+ "taskSpecs": {
+ "Worker": {
+ "name": "tensorflow",
+ "resources": "cpu=1,memory=1024M",
+ "replicas": 1,
+ "resourceMap": {
+ "memory": "1024M",
+ "cpu": "1"
+ }
+ }
+ }
+ }
+ }
+ ]
+}
+```
+
+### Get Job
+`GET /api/v1/jobs/{id}`
+
+**Example Request:**
+```sh
+curl -X GET http://127.0.0.1/api/v1/jobs/job_1586156073228_0005
+```
+
+**Example Response:**
+```sh
+{
+ "status": "OK",
+ "code": 200,
+ "result": {
+ "jobId": "job_1586156073228_0005",
+ "name": "mnist",
+ "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+ "status": "Created",
+ "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+ "createdTime": "2020-04-06T14:59:29.000+08:00",
+ "spec": {
+ "name": "mnist",
+ "librarySpec": {
+ "name": "TensorFlow",
+ "version": "2.1.0",
+ "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+ "cmd": "python /var/tf_mnist/mnist_with_summaries.py
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+ "envVars": {
+ "ENV_1": "ENV1"
+ }
+ },
+ "submitterSpec": {
+ "type": "k8s",
+ "namespace": "submarine"
+ },
+ "taskSpecs": {
+ "Worker": {
+ "name": "tensorflow",
+ "resources": "cpu=1,memory=1024M",
+ "replicas": 1,
+ "resourceMap": {
+ "memory": "1024M",
+ "cpu": "1"
+ }
+ }
+ }
+ }
+ }
+}
+```
+
+### Patch Job
+`PATCH /api/v1/jobs/{id}`
+
+**Example Request:**
+```sh
+curl -X PATCH -H "Content-Type: application/json" -d '
+{
+ "name": "mnist",
"librarySpec": {
- "name": "pytorch",
+ "name": "TensorFlow",
"version": "2.1.0",
- "image": "apache/submarine:pytorch-dist-mnist-1.0",
- "cmd": "python /var/mnist.py --backend gloo",
+ "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+ "cmd": "python /var/tf_mnist/mnist_with_summaries.py --log_dir=/train/log
--learning_rate=0.01 --batch_size=150",
"envVars": {
"ENV_1": "ENV1"
}
@@ -183,50 +326,107 @@ curl -X POST -H "Content-Type: application/json" -d '
"namespace": "submarine"
},
"taskSpecs": {
- "Master": {
- "name": "master",
- "replicas": 1,
- "resources": "cpu=1,memory=1024M"
- },
"Worker": {
- "name": "worker",
- "replicas": 1,
+ "name": "tensorflow",
+ "replicas": 2,
"resources": "cpu=1,memory=1024M"
}
}
}
-' http://127.0.0.1:8080/api/v1/jobs
+' http://127.0.0.1/api/v1/jobs/job_1586156073228_0005
```
-### Verify Jobs
-You can run following command to get the submitted job:
-```
-kubectl get -n submarine tfjob
-kubectl get -n submarine pytorchjob
-```
-
-**Output:**
-```
-NAME STATE AGE
-mnist Created 7m6s
+**Example Response:**
+```sh
+{
+ "status": "OK",
+ "code": 200,
+ "success": true,
+ "result": {
+ "jobId": "job_1586156073228_0005",
+ "name": "mnist",
+ "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+ "status": "Created",
+ "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+ "createdTime": "2020-04-06T14:59:29.000+08:00",
+ "spec": {
+ "name": "mnist",
+ "librarySpec": {
+ "name": "TensorFlow",
+ "version": "2.1.0",
+ "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+ "cmd": "python /var/tf_mnist/mnist_with_summaries.py
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+ "envVars": {
+ "ENV_1": "ENV1"
+ }
+ },
+ "submitterSpec": {
+ "type": "k8s",
+ "namespace": "submarine"
+ },
+ "taskSpecs": {
+ "Worker": {
+ "name": "tensorflow",
+ "resources": "cpu=1,memory=1024M",
+ "replicas": 2,
+ "resourceMap": {
+ "memory": "1024M",
+ "cpu": "1"
+ }
+ }
+ }
+ }
+ }
+}
```
-Also you can find pods which running the jobs, run following command:
-```
-kubectl get -n submarine pods
-```
+### Delete Job
+`GET /api/v1/jobs/{id}`
-**Output:**
+**Example Request:**
+```sh
+curl -X DELETE http://127.0.0.1/api/v1/jobs/job_123_01
```
-NAME READY STATUS RESTARTS AGE
-mnist-ps-0 0/1 ContainerCreating 0 3m47s
-mnist-worker-0 0/1 Pending 0 3m47s
-tf-job-operator-74cc6bd6cb-fqd5s 1/1 Running 0 98m
-```
-
-### Delete Jobs
-```bash
-kubectl -n submarine delete tfjob/mnist
-kubectl -n submarine delete pytorchjob/pytorch-dist-mnist-gloo
+**Example Response:**
+```sh
+{
+ "status": "OK",
+ "code": 200,
+ "result": {
+ "jobId": "job_1586156073228_0005",
+ "name": "mnist",
+ "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+ "status": "Deleted",
+ "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+ "createdTime": "2020-04-06T14:59:29.000+08:00",
+ "spec": {
+ "name": "mnist",
+ "librarySpec": {
+ "name": "TensorFlow",
+ "version": "2.1.0",
+ "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+ "cmd": "python /var/tf_mnist/mnist_with_summaries.py
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+ "envVars": {
+ "ENV_1": "ENV1"
+ }
+ },
+ "submitterSpec": {
+ "type": "k8s",
+ "namespace": "submarine"
+ },
+ "taskSpecs": {
+ "Worker": {
+ "name": "tensorflow",
+ "resources": "cpu=1,memory=1024M",
+ "replicas": 1,
+ "resourceMap": {
+ "memory": "1024M",
+ "cpu": "1"
+ }
+ }
+ }
+ }
+ }
+}
```
diff --git a/submarine-cloud/hack/integration-test.sh
b/submarine-cloud/hack/integration-test.sh
index d4da608..db39469 100755
--- a/submarine-cloud/hack/integration-test.sh
+++ b/submarine-cloud/hack/integration-test.sh
@@ -30,6 +30,7 @@ export KUBECONFIG=~/.kube/kind-config-${clusterName:-kind}
function start() {
$ROOT/hack/kind-cluster-build.sh
+ $SUBMARINE_HOME/dev-support/k8s/deploy-kubeflow-operators.sh -a
$ROOT/hack/deploy-submarine.sh --test
for((i=1;i<=30;i++)); do
diff --git a/submarine-cloud/manifests/submarine-cluster/rbac.yaml
b/submarine-cloud/manifests/submarine-cluster/rbac.yaml
index eab1fa6..a8ecd37 100644
--- a/submarine-cloud/manifests/submarine-cluster/rbac.yaml
+++ b/submarine-cloud/manifests/submarine-cluster/rbac.yaml
@@ -29,6 +29,11 @@ items:
- endpoints
- pods
verbs: ["list", "get"]
+ - apiGroups: ["kubeflow.org"]
+ resources:
+ - tfjobs
+ - pytorchjobs
+ verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
diff --git a/submarine-cloud/manifests/submarine-cluster/submarine-server.yaml
b/submarine-cloud/manifests/submarine-cluster/submarine-server.yaml
index b6cf9a0..76f1d73 100644
--- a/submarine-cloud/manifests/submarine-cluster/submarine-server.yaml
+++ b/submarine-cloud/manifests/submarine-cluster/submarine-server.yaml
@@ -28,7 +28,7 @@ spec:
labels:
app: cluster-test
spec:
- ServiceAccountName: "submarine-node"
+ serviceAccountName: "submarine-node"
containers:
- name: submarine-node
image: "apache/submarine:server-0.4.0-SNAPSHOT"
diff --git
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/exception/SubmarineRuntimeException.java
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/exception/SubmarineRuntimeException.java
index 207270d..0daacd3 100644
---
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/exception/SubmarineRuntimeException.java
+++
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/exception/SubmarineRuntimeException.java
@@ -22,7 +22,18 @@ package org.apache.submarine.commons.utils.exception;
public class SubmarineRuntimeException extends RuntimeException {
private static final long serialVersionUID = 7159777541471705348L;
+ private int code;
+
public SubmarineRuntimeException(String message) {
super(message);
}
+
+ public SubmarineRuntimeException(int code, String message) {
+ super(message);
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
deleted file mode 100644
index 36e741e..0000000
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.api;
-
-import org.apache.submarine.server.api.exception.InvalidSpecException;
-import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
-import org.apache.submarine.server.api.job.Job;
-import org.apache.submarine.server.api.spec.JobSpec;
-
-/**
- * Handle the job's operate (CRUD)
- */
-public interface JobHandler {
- /**
- * Submit job
- * @param jobSpec job spec
- * @return job object
- * @throws UnsupportedJobTypeException caused by the unsupported job type
- */
- Job submitJob(JobSpec jobSpec) throws UnsupportedJobTypeException,
InvalidSpecException;
-
- /**
- * Get job info
- * @param jobSpec job spec
- * @return job object
- * @throws UnsupportedJobTypeException caused by the unsupported job type
- */
- default Job getJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
- // TODO(submarine) should implementing later
- return null;
- }
-
- /**
- * Update job info
- * @param jobSpec job spec
- * @return job object
- * @throws UnsupportedJobTypeException caused by the unsupported job type
- */
- default Job updateJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
- // TODO(submarine) should implementing later
- return null;
- }
-
- /**
- * Delete the specified job
- * @param jobSpec job spec
- * @return job object
- * @throws UnsupportedJobTypeException caused by the unsupported job type
- */
- default Job deleteJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
- // TODO(submarine) should implementing later
- return null;
- }
-}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
deleted file mode 100644
index 866fd74..0000000
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.api.exception;
-
-public class UnsupportedJobTypeException extends Exception {
- private static final long serialVersionUID = 4752254162145918312L;
-
- public UnsupportedJobTypeException() {
- super("Unsupported Job Type Exception");
- }
-}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
index 03068ae..443773a 100644
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
@@ -19,28 +19,21 @@
package org.apache.submarine.server.api.job;
+import org.apache.submarine.server.api.spec.JobSpec;
+
/**
* The Generic Machine Learning Job in Submarine.
*/
public class Job {
private JobId jobId;
private String name;
- private String identifier;
-
- /**
- * Get the job instance
- * @param jobId job id
- * @param name job name
- * @param identifier identifier
- * @return object
- */
- public static Job newInstance(JobId jobId, String name, String identifier) {
- Job job = new Job();
- job.setJobId(jobId);
- job.setName(name);
- job.setIdentifier(identifier);
- return job;
- }
+ private String uid;
+ private String status;
+ private String acceptedTime;
+ private String createdTime;
+ private String runningTime;
+ private String finishedTime;
+ private JobSpec spec;
/**
* Get the job id which is unique in submarine
@@ -81,16 +74,115 @@ public class Job {
* In YARN cluster it best to set the ApplicationId, and in K8s cluster it
maybe the job name.
* @return the unique identifier
*/
- public String getIdentifier() {
- return identifier;
+ public String getUid() {
+ return uid;
}
/**
* Set the job identifier, in YARN cluster it best to set the application
id, and in K8s cluster
- * it maybe the job name.
- * @param identifier application id (YARN) or Job Name (K8s)
+ * it maybe the uid.
+ * @param uid application id (YARN) or uid (K8s)
*/
- public void setIdentifier(String identifier) {
- this.identifier = identifier;
+ public void setUid(String uid) {
+ this.uid = uid;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getAcceptedTime() {
+ return acceptedTime;
+ }
+
+ public void setAcceptedTime(String acceptedTime) {
+ this.acceptedTime = acceptedTime;
+ }
+
+ public String getCreatedTime() {
+ return createdTime;
+ }
+
+ public void setCreatedTime(String creatTime) {
+ this.createdTime = creatTime;
+ }
+
+ public String getRunningTime() {
+ return runningTime;
+ }
+
+ public void setRunningTime(String runningTime) {
+ this.runningTime = runningTime;
+ }
+
+ public String getFinishedTime() {
+ return finishedTime;
+ }
+
+ public void setFinishedTime(String finishedTime) {
+ this.finishedTime = finishedTime;
+ }
+
+ public JobSpec getSpec() {
+ return spec;
+ }
+
+ public void setSpec(JobSpec spec) {
+ this.spec = spec;
+ }
+
+ public enum Status {
+ STATUS_ACCEPTED("Accepted"),
+ STATUS_CREATED("Created"),
+ STATUS_RUNNING("Running"),
+ STATUS_SUCCEEDED("Succeeded"),
+ STATUS_DELETED("Deleted");
+
+ private String value;
+ Status(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+ }
+
+ public void rebuild(Job job) {
+ if (job != null) {
+ if (job.getName() != null) {
+ this.setName(job.getName());
+ }
+ if (job.getUid() != null) {
+ this.setUid(job.getUid());
+ }
+ if (job.getSpec() != null) {
+ this.setSpec(job.getSpec());
+ }
+ if (job.getStatus() != null) {
+ this.setStatus(job.getStatus());
+ }
+ if (job.getAcceptedTime() != null) {
+ this.setAcceptedTime(job.getAcceptedTime());
+ }
+ if (job.getCreatedTime() != null) {
+ this.setCreatedTime(job.getCreatedTime());
+ }
+ if (job.getRunningTime() != null) {
+ this.setRunningTime(job.getRunningTime());
+ }
+ if (job.getFinishedTime() != null) {
+ this.setFinishedTime(job.getFinishedTime());
+ }
+ }
}
}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
index 7026ed0..2a48329 100644
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
@@ -116,7 +116,10 @@ public class JobId implements Comparable<JobId> {
return true;
}
JobId other = (JobId) obj;
- return this.getId() != other.getId();
+ if (this.getServerTimestamp() != other.getServerTimestamp()) {
+ return false;
+ }
+ return this.getId() == other.getId();
}
@Override
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
similarity index 53%
rename from
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
rename to
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
index 8e455bb..46bef31 100644
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
@@ -17,15 +17,16 @@
* under the License.
*/
-package org.apache.submarine.server.api;
+package org.apache.submarine.server.api.job;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.spec.JobSpec;
/**
* The submitter should implement this interface.
*/
-public interface JobSubmitter extends JobHandler {
-
+public interface JobSubmitter {
/**
* Initialize the submitter related code
*/
@@ -38,4 +39,35 @@ public interface JobSubmitter extends JobHandler {
*/
String getSubmitterType();
+ /**
+ * Create job with job spec
+ * @param jobSpec job spec
+ * @return object
+ * @throws SubmarineRuntimeException running error
+ */
+ Job createJob(JobSpec jobSpec) throws SubmarineRuntimeException;
+
+ /**
+ * Find job by job spec
+ * @param jobSpec job spec
+ * @return object
+ * @throws SubmarineRuntimeException running error
+ */
+ Job findJob(JobSpec jobSpec) throws SubmarineRuntimeException;
+
+ /**
+ * Patch job with job spec
+ * @param jobSpec job spec
+ * @return object
+ * @throws SubmarineRuntimeException running error
+ */
+ Job patchJob(JobSpec jobSpec) throws SubmarineRuntimeException;
+
+ /**
+ * Delete job by job spec
+ * @param jobSpec job spec
+ * @return object
+ * @throws SubmarineRuntimeException running error
+ */
+ Job deleteJob(JobSpec jobSpec) throws SubmarineRuntimeException;
}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
deleted file mode 100644
index f475ed4..0000000
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server;
-
-import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.submarine.server.api.JobHandler;
-import org.apache.submarine.server.api.JobSubmitter;
-import org.apache.submarine.server.api.exception.InvalidSpecException;
-import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
-import org.apache.submarine.server.api.job.Job;
-import org.apache.submarine.server.api.job.JobId;
-import org.apache.submarine.server.api.spec.JobSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * It responsible for manage the job (CRUD) and cached the job.
- */
-public class JobManager implements JobHandler {
- private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
- private static volatile JobManager manager;
-
- private final AtomicInteger jobCounter = new AtomicInteger(0);
-
- private final ConcurrentMap<JobId, Job> jobs = new ConcurrentHashMap<>();
-
- private SubmitterManager submitterManager;
- private ExecutorService executorService;
-
- /**
- * Get the singleton instance
- * @return object
- */
- public static JobManager getInstance() {
- if (manager == null) {
- synchronized (JobManager.class) {
- if (manager == null) {
- SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
- SubmitterManager submitterManager = new SubmitterManager(conf);
- manager = new JobManager(submitterManager);
- }
- }
- }
- return manager;
- }
-
- private JobManager(SubmitterManager submitterManager) {
- this.submitterManager = submitterManager;
- this.executorService = Executors.newFixedThreadPool(50);
- }
-
- @Override
- public Job submitJob(JobSpec spec) throws UnsupportedJobTypeException {
- if (!spec.validate()) {
- return null;
- }
-
- JobSubmitter submitter = submitterManager.getSubmitterByType(
- spec.getSubmitterSpec().getType());
- if (submitter == null) {
- throw new UnsupportedJobTypeException();
- }
-
- Job job = new Job();
- job.setJobId(generateJobId());
- executorService.submit(() -> {
- try {
- jobs.putIfAbsent(job.getJobId(), submitter.submitJob(spec));
- } catch (UnsupportedJobTypeException e) {
- LOG.error(e.getMessage(), e);
- } catch (InvalidSpecException e) {
- LOG.error("Invalid job spec: " + spec + ", " + e.getMessage());
- }
- });
- return job;
- }
-
- private JobId generateJobId() {
- return JobId.newInstance(SubmarineServer.getServerTimeStamp(),
jobCounter.incrementAndGet());
- }
-}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
index b6a0a0e..f955eeb 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
@@ -19,6 +19,7 @@
package org.apache.submarine.server;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.submarine.server.rest.provider.YamlEntityProvider;
import org.apache.submarine.server.rpc.SubmarineRpcServer;
import org.apache.submarine.server.workbench.websocket.NotebookServer;
import org.apache.submarine.commons.cluster.ClusterServer;
@@ -126,9 +127,9 @@ public class SubmarineServer extends ResourceConfig {
@Inject
public SubmarineServer() {
packages("org.apache.submarine.server.workbench.rest",
- "org.apache.submarine.server.jobserver.rest",
"org.apache.submarine.server.rest"
);
+ register(YamlEntityProvider.class);
}
private static void startServer() throws InterruptedException {
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
index 36e3d6c..77cc524 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
@@ -20,7 +20,7 @@
package org.apache.submarine.server;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.submarine.server.api.JobSubmitter;
+import org.apache.submarine.server.api.job.JobSubmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
new file mode 100644
index 0000000..df346a0
--- /dev/null
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.job;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.SubmarineServer;
+import org.apache.submarine.server.SubmitterManager;
+import org.apache.submarine.server.api.job.JobSubmitter;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobId;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * It's responsible for managing the jobs CRUD and cache them
+ */
+public class JobManager {
+ private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
+
+ private static volatile JobManager manager;
+
+ private final AtomicInteger jobCounter = new AtomicInteger(0);
+
+ /**
+ * Used to cache the specs by the job id.
+ * key: the string of job id
+ * value: Job object
+ */
+ private final ConcurrentMap<String, Job> cachedJobMap = new
ConcurrentHashMap<>();
+
+ private SubmitterManager submitterManager;
+
+ /**
+ * Get the singleton instance
+ * @return object
+ */
+ public static JobManager getInstance() {
+ if (manager == null) {
+ synchronized (JobManager.class) {
+ if (manager == null) {
+ SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
+ SubmitterManager submitterManager = new SubmitterManager(conf);
+ manager = new JobManager(submitterManager);
+ }
+ }
+ }
+ return manager;
+ }
+
+ private JobManager(SubmitterManager submitterManager) {
+ this.submitterManager = submitterManager;
+ }
+
+ /**
+ * Create job
+ * @param spec job spec
+ * @return object
+ * @throws SubmarineRuntimeException the service error
+ */
+ public Job createJob(JobSpec spec) throws SubmarineRuntimeException {
+ checkSpec(spec);
+ JobSubmitter submitter = getSubmitter(spec.getSubmitterSpec().getType());
+ Job job = submitter.createJob(spec);
+ job.setJobId(generateJobId());
+ job.setSpec(spec);
+ cachedJobMap.putIfAbsent(job.getJobId().toString(), job);
+ return job;
+ }
+
+ private JobId generateJobId() {
+ return JobId.newInstance(SubmarineServer.getServerTimeStamp(),
jobCounter.incrementAndGet());
+ }
+
+ /**
+ * Get job
+ * @param id job id
+ * @return object
+ * @throws SubmarineRuntimeException the service error
+ */
+ public Job getJob(String id) throws SubmarineRuntimeException {
+ checkJobId(id);
+ Job job = cachedJobMap.get(id);
+ JobSpec spec = job.getSpec();
+ Job patchJob =
getSubmitter(spec.getSubmitterSpec().getType()).findJob(spec);
+ job.rebuild(patchJob);
+ return job;
+ }
+
+ /**
+ * List jobs
+ * @param status job status, if null will return all jobs
+ * @return job list
+ * @throws SubmarineRuntimeException the service error
+ */
+ public List<Job> listJobsByStatus(String status) throws
SubmarineRuntimeException {
+ List<Job> jobList = new ArrayList<>();
+ for (Map.Entry<String, Job> entry : cachedJobMap.entrySet()) {
+ Job job = entry.getValue();
+ JobSpec spec = job.getSpec();
+ JobSubmitter submitter = getSubmitter(spec.getSubmitterSpec().getType());
+ Job patchJob = submitter.findJob(spec);
+ LOG.info("Found job: {}", patchJob.getStatus());
+ if (status == null ||
status.toLowerCase().equals(patchJob.getStatus().toLowerCase())) {
+ job.rebuild(patchJob);
+ jobList.add(job);
+ }
+ }
+ LOG.info("List job: {}", jobList.size());
+ return jobList;
+ }
+
+ /**
+ * Patch the job
+ * @param id job id
+ * @param spec job spec
+ * @return object
+ * @throws SubmarineRuntimeException the service error
+ */
+ public Job patchJob(String id, JobSpec spec) throws
SubmarineRuntimeException {
+ checkJobId(id);
+ checkSpec(spec);
+ Job job = cachedJobMap.get(id);
+ JobSubmitter submitter = getSubmitter(spec.getSubmitterSpec().getType());
+ Job patchJob = submitter.patchJob(spec);
+ job.setSpec(spec);
+ job.rebuild(patchJob);
+ return job;
+ }
+
+ /**
+ * Delete job
+ * @param id job id
+ * @return object
+ * @throws SubmarineRuntimeException the service error
+ */
+ public Job deleteJob(String id) throws SubmarineRuntimeException {
+ checkJobId(id);
+ Job job = cachedJobMap.remove(id);
+ JobSpec spec = job.getSpec();
+ Job patchJob =
getSubmitter(spec.getSubmitterSpec().getType()).deleteJob(spec);
+ job.rebuild(patchJob);
+ return job;
+ }
+
+ private JobSubmitter getSubmitter(String type) throws
SubmarineRuntimeException {
+ JobSubmitter submitter = submitterManager.getSubmitterByType(type);
+ if (submitter == null) {
+ throw new
SubmarineRuntimeException(Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ "Invalid submitter.");
+ }
+ return submitter;
+ }
+
+ private void checkSpec(JobSpec spec) throws SubmarineRuntimeException {
+ if (spec == null) {
+ throw new SubmarineRuntimeException(Status.OK.getStatusCode(), "Invalid
job spec.");
+ }
+ }
+
+ private void checkJobId(String id) throws SubmarineRuntimeException {
+ JobId jobId = JobId.fromString(id);
+ if (jobId == null || !cachedJobMap.containsKey(id)) {
+ throw new SubmarineRuntimeException(Status.NOT_FOUND.getStatusCode(),
"Not found job.");
+ }
+ }
+}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/response/JsonResponse.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/response/JsonResponse.java
index 72d4c2b..f389599 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/response/JsonResponse.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/response/JsonResponse.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.NewCookie;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import java.util.ArrayList;
import java.util.Date;
@@ -59,13 +60,17 @@ public class JsonResponse<T> {
private static final String CGLIB_PROPERTY_PREFIX = "\\$cglib_prop_";
private JsonResponse(Builder<T> builder) {
- this.status = builder.status;
this.code = builder.code;
this.success = builder.success;
this.message = builder.message;
this.attributes = builder.attributes;
this.result = (T) builder.result;
this.cookies = builder.cookies;
+ if (builder.status != null) {
+ this.status = builder.status;
+ } else {
+ status = Response.Status.fromStatusCode(this.code);
+ }
}
public T getResult() {
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
index 551cfc8..e3511ec 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
@@ -19,32 +19,32 @@
package org.apache.submarine.server.rest;
-import org.apache.submarine.server.JobManager;
-import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
-import org.apache.submarine.server.api.job.Job;
-import org.apache.submarine.server.api.job.JobId;
-import org.apache.submarine.server.api.spec.JobSpec;
-import org.apache.submarine.server.response.JsonResponse;
-
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
+import javax.ws.rs.PATCH;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import java.util.ArrayList;
import java.util.List;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.job.JobManager;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.apache.submarine.server.response.JsonResponse;
+
/**
- * {@link JobManager}'s REST API v1. It can accept {@link JobSpec} to create a
job.
+ * Job Service REST API v1. It can accept {@link JobSpec} to create a job.
*/
@Path(RestConstants.V1 + "/" + RestConstants.JOBS)
@Produces({MediaType.APPLICATION_JSON + "; " + RestConstants.CHARSET_UTF8})
public class JobManagerRestApi {
-
+ private final JobManager jobManager = JobManager.getInstance();
/**
* Return the Pong message for test the connectivity
* @return Pong message
@@ -59,24 +59,17 @@ public class JobManagerRestApi {
/**
* Returns the contents of {@link Job} that submitted by user.
- * @param jobSpec job spec
+ * @param spec job spec
* @return the contents of job
*/
@POST
@Consumes({RestConstants.MEDIA_TYPE_YAML, MediaType.APPLICATION_JSON})
- public Response submitJob(JobSpec jobSpec) {
- if (!jobSpec.validate()) {
- return new JsonResponse.Builder<String>(Response.Status.ACCEPTED)
- .success(false).result("Invalid params.").build();
- }
-
+ public Response createJob(JobSpec spec) {
try {
- Job job = JobManager.getInstance().submitJob(jobSpec);
- return new JsonResponse.Builder<Job>(Response.Status.OK)
- .success(true).result(job).build();
- } catch (UnsupportedJobTypeException e) {
- return new JsonResponse.Builder<String>(Response.Status.ACCEPTED)
- .success(false).result(e.getMessage()).build();
+ Job job = jobManager.createJob(spec);
+ return new
JsonResponse.Builder<Job>(Response.Status.OK).result(job).build();
+ } catch (SubmarineRuntimeException e) {
+ return parseJobServiceException(e);
}
}
@@ -85,10 +78,13 @@ public class JobManagerRestApi {
* @return job list
*/
@GET
- public Response listJob() {
- // TODO(jiwq): Hook JobManager when 0.4.0 released
- return new JsonResponse.Builder<List<Job>>(Response.Status.OK)
- .success(true).result(new ArrayList<>()).build();
+ public Response listJob(@QueryParam("status") String status) {
+ try {
+ List<Job> jobList = jobManager.listJobsByStatus(status);
+ return new
JsonResponse.Builder<List<Job>>(Response.Status.OK).result(jobList).build();
+ } catch (SubmarineRuntimeException e) {
+ return parseJobServiceException(e);
+ }
}
/**
@@ -97,13 +93,27 @@ public class JobManagerRestApi {
* @return the detailed info of job
*/
@GET
- @Path("{" + RestConstants.JOB_ID + "}")
+ @Path("/{id}")
public Response getJob(@PathParam(RestConstants.JOB_ID) String id) {
- // TODO(jiwq): Hook JobManager when 0.4.0 released
- Job job = new Job();
- job.setJobId(JobId.fromString(id));
- return new JsonResponse.Builder<Job>(Response.Status.OK)
- .success(true).result(job).build();
+ try {
+ Job job = jobManager.getJob(id);
+ return new
JsonResponse.Builder<Job>(Response.Status.OK).result(job).build();
+ } catch (SubmarineRuntimeException e) {
+ return parseJobServiceException(e);
+ }
+ }
+
+ @PATCH
+ @Path("/{id}")
+ @Consumes({RestConstants.MEDIA_TYPE_YAML, MediaType.APPLICATION_JSON})
+ public Response patchJob(@PathParam(RestConstants.JOB_ID) String id, JobSpec
spec) {
+ try {
+ Job job = jobManager.patchJob(id, spec);
+ return new JsonResponse.Builder<Job>(Response.Status.OK).success(true)
+ .result(job).build();
+ } catch (SubmarineRuntimeException e) {
+ return parseJobServiceException(e);
+ }
}
/**
@@ -112,10 +122,18 @@ public class JobManagerRestApi {
* @return the detailed info about deleted job
*/
@DELETE
- @Path("{" + RestConstants.JOB_ID + "}")
+ @Path("/{id}")
public Response deleteJob(@PathParam(RestConstants.JOB_ID) String id) {
- // TODO(jiwq): Hook JobManager when 0.4.0 released
- return new JsonResponse.Builder<Job>(Response.Status.OK)
- .success(true).result(new Job()).build();
+ try {
+ Job job = jobManager.deleteJob(id);
+ return new JsonResponse.Builder<Job>(Response.Status.OK)
+ .result(job).build();
+ } catch (SubmarineRuntimeException e) {
+ return parseJobServiceException(e);
+ }
+ }
+
+ private Response parseJobServiceException(SubmarineRuntimeException e) {
+ return new
JsonResponse.Builder<String>(e.getCode()).message(e.getMessage()).build();
}
}
diff --git
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
index 33ad4bb..ead3c4a 100644
---
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
+++
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
@@ -176,14 +176,23 @@ public abstract class AbstractSubmarineServerTest {
return httpPost(path, body, StringUtils.EMPTY, StringUtils.EMPTY);
}
+ protected static PostMethod httpPost(String path, String body, String
mediaType)
+ throws IOException {
+ return httpPost(path, body, mediaType, StringUtils.EMPTY,
StringUtils.EMPTY);
+ }
+
protected static PostMethod httpPost(String path, String request, String
user, String pwd)
throws IOException {
+ return httpPost(path, request, MediaType.APPLICATION_JSON, user, pwd);
+ }
+ protected static PostMethod httpPost(String path, String request, String
mediaType, String user,
+ String pwd) throws IOException {
LOG.info("Connecting to {}", URL + path);
HttpClient httpClient = new HttpClient();
PostMethod postMethod = new PostMethod(URL + path);
postMethod.setRequestBody(request);
- postMethod.setRequestHeader("Content-type", MediaType.APPLICATION_JSON);
+ postMethod.setRequestHeader("Content-type", mediaType);
postMethod.getParams().setCookiePolicy(CookiePolicy.IGNORE_COOKIES);
if (userAndPasswordAreNotBlank(user, pwd)) {
diff --git
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerTest.java
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerTest.java
index fa7f5cf..466e225 100644
---
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerTest.java
+++
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerTest.java
@@ -19,12 +19,9 @@
package org.apache.submarine.server;
import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.submarine.server.rest.JobManagerRestApiTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -32,7 +29,6 @@ import java.util.ArrayList;
import static org.junit.Assert.assertFalse;
public class SubmarineServerTest extends AbstractSubmarineServerTest {
- private static final Logger LOG =
LoggerFactory.getLogger(JobManagerRestApiTest.class);
@BeforeClass
public static void init() throws Exception {
diff --git
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/rest/JobManagerRestApiTest.java
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/rest/JobManagerRestApiTest.java
deleted file mode 100644
index 52d52b7..0000000
---
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/rest/JobManagerRestApiTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.rest;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-import org.apache.commons.httpclient.methods.DeleteMethod;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.io.FileUtils;
-import org.apache.submarine.server.AbstractSubmarineServerTest;
-import org.apache.submarine.server.api.job.Job;
-import org.apache.submarine.server.api.job.JobId;
-import org.apache.submarine.server.json.JobIdDeserializer;
-import org.apache.submarine.server.json.JobIdSerializer;
-import org.apache.submarine.server.response.JsonResponse;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.Assert.assertEquals;
-
-public class JobManagerRestApiTest extends AbstractSubmarineServerTest {
- private static final Logger LOG =
LoggerFactory.getLogger(JobManagerRestApiTest.class);
-
- @BeforeClass
- public static void init() throws Exception {
-
AbstractSubmarineServerTest.startUp(JobManagerRestApiTest.class.getSimpleName());
- }
-
- @AfterClass
- public static void destroy() throws Exception {
- AbstractSubmarineServerTest.shutDown();
- }
-
- @Test
- public void testJobServerPing() throws IOException {
- GetMethod response = httpGet("/api/" + RestConstants.V1 + "/"
- + RestConstants.JOBS + "/" + RestConstants.PING);
- LOG.info(response.toString());
-
- String requestBody = response.getResponseBodyAsString();
- LOG.info(requestBody);
-
- Gson gson = new Gson();
- JsonResponse jsonResponse = gson.fromJson(requestBody, JsonResponse.class);
- assertEquals("Response code should be 200 ",
- Response.Status.OK.getStatusCode(), jsonResponse.getCode());
- assertEquals("Response result should be Pong", "Pong",
- jsonResponse.getResult().toString());
- }
-
- // Test job created with correct JSON input
- @Test
- public void testCreateJobWhenJsonInputIsCorrectThenResponseCodeAccepted()
throws Exception {
- URL fileUrl = this.getClass().getResource("/tf-mnist-req.json");
- String jobSpec = FileUtils.readFileToString(new File(fileUrl.toURI()),
StandardCharsets.UTF_8);
-
- PostMethod response = httpPost("/api/" + RestConstants.V1 + "/" +
RestConstants.JOBS, jobSpec);
- LOG.info(response.toString());
-
- String responseBodyAsString = response.getResponseBodyAsString();
- LOG.info(responseBodyAsString);
-
- Gson gson = new Gson();
- JsonResponse jsonResponse = gson.fromJson(responseBodyAsString,
JsonResponse.class);
- assertEquals("Response code should be 202 ",
- Response.Status.ACCEPTED.getStatusCode(), jsonResponse.getCode());
- }
-
- // Test job created with incorrect JSON input
- @Test
- public void testCreateJobWhenJsonInputIsWrongThenResponseCodeBadRequest()
throws IOException {
- String jobSpec = "{\"ttype\": \"tensorflow\", \"version\":\"v1.13\"}";
-
- PostMethod response = httpPost("/api/" + RestConstants.V1 + "/" +
RestConstants.JOBS, jobSpec);
-
- assertEquals("Http Response should be 400 ",
- Response.Status.BAD_REQUEST.getStatusCode(), response.getStatusCode());
- }
-
- // Test get job list
- @Test
- public void testGetJobList() throws IOException {
- GetMethod response = httpGet("/api/" + RestConstants.V1 + "/" +
RestConstants.JOBS);
- LOG.info(response.toString());
-
- String requestBody = response.getResponseBodyAsString();
- LOG.info(requestBody);
-
- Gson gson = new Gson();
- JsonResponse jsonResponse = gson.fromJson(requestBody, JsonResponse.class);
- assertEquals("Response code should be 200 ",
- Response.Status.OK.getStatusCode(), jsonResponse.getCode());
- }
-
- // Test get job by id
- @Test
- public void testGetJobById() throws IOException {
- String jobId = "job_1577810970_0001";
- GetMethod response = httpGet("/api/" + RestConstants.V1 + "/"
- + RestConstants.JOBS + "/" + jobId);
- LOG.info(response.toString());
-
- String responseBodyAsString = response.getResponseBodyAsString();
- LOG.info(responseBodyAsString);
-
- Gson gson = new GsonBuilder()
- .registerTypeAdapter(JobId.class, new JobIdSerializer())
- .registerTypeAdapter(JobId.class, new JobIdDeserializer())
- .create();
- JsonResponse<Job> jsonResponse = gson.fromJson(responseBodyAsString,
- new TypeToken<JsonResponse<Job>>(){}.getType());
- assertEquals("Response code should be 200 ",
- Response.Status.OK.getStatusCode(), jsonResponse.getCode());
- assertEquals("Job id should be " + jobId, jobId,
- jsonResponse.getResult().getJobId().toString());
- }
-
- // Test delete job by id
- @Test
- public void testDeleteJobById() throws IOException {
- String jobId = "job1";
- DeleteMethod response = httpDelete("/api/" + RestConstants.V1 + "/"
- + RestConstants.JOBS + "/" + jobId);
- LOG.info(response.toString());
-
- String requestBody = response.getResponseBodyAsString();
- LOG.info(requestBody);
-
- Gson gson = new Gson();
- JsonResponse jsonResponse = gson.fromJson(requestBody, JsonResponse.class);
- assertEquals("Response code should be 200 ",
- Response.Status.OK.getStatusCode(), jsonResponse.getCode());
- }
-
- /**
- * FiXME. The manual YAML test with postman works but failed here.
- * We need to figure out why the YAML entity provider not work in this test.
- * */
- @Test
- public void testCreateJobWhenYamlInputIsCorrectThenResponseCodeAccepted()
throws IOException {
-// Client client = ClientBuilder.newBuilder()
-// .register(new YamlEntityProvider<>()).build();
-// this.setClient(client);
-// String jobSpec = "type: tf";
-// Response response = target(RestConstants.V1 + "/"
-// + RestConstants.JOBS + "/" + "test")
-// .request()
-// .put(Entity.entity(jobSpec, "application/yaml"));
-//
-// assertEquals("Http Response should be 202 ",
-// Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
- }
-}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
deleted file mode 100644
index 4e61010..0000000
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.submitter.k8s;
-
-/**
- * Job request for Kubernetes Submitter.
- */
-public class K8sJobRequest {
- private Path path;
- private Object body;
- private String jobName;
-
- public K8sJobRequest(Path path, Object body) {
- this(path, body, null);
- }
-
- public K8sJobRequest(Path path, Object body, String jobName) {
- this.path = path;
- this.body = body;
- this.jobName = jobName;
- }
-
- public void setPath(Path path) {
- this.path = path;
- }
-
- public Path getPath() {
- return path;
- }
-
- public void setBody(Object body) {
- this.body = body;
- }
-
- public Object getBody() {
- return body;
- }
-
- public String getJobName() {
- return jobName;
- }
-
- static class Path {
- private String group;
- private String apiVersion;
- private String namespace;
- private String plural;
-
- Path(String group, String apiVersion, String namespace, String plural) {
- this.group = group;
- this.apiVersion = apiVersion;
- this.namespace = namespace;
- this.plural = plural;
- }
-
- public String getGroup() {
- return group;
- }
-
- public String getApiVersion() {
- return apiVersion;
- }
-
- public String getNamespace() {
- return namespace;
- }
-
- public String getPlural() {
- return plural;
- }
- }
-}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
index 43387c9..37e0e86 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
@@ -19,33 +19,33 @@
package org.apache.submarine.server.submitter.k8s;
+import java.io.FileReader;
+import java.io.IOException;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
+import io.kubernetes.client.JSON;
import io.kubernetes.client.apis.CustomObjectsApi;
-import io.kubernetes.client.models.V1DeleteOptions;
-import io.kubernetes.client.models.V1DeleteOptionsBuilder;
+import io.kubernetes.client.models.V1Status;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
import org.apache.submarine.commons.utils.SubmarineConfVars;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
-import org.apache.submarine.server.api.JobSubmitter;
import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.job.JobSubmitter;
import org.apache.submarine.server.api.job.Job;
import org.apache.submarine.server.api.spec.JobSpec;
-import org.apache.submarine.server.submitter.k8s.model.CustomResourceJob;
-import org.apache.submarine.server.submitter.k8s.model.CustomResourceJobList;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
import org.apache.submarine.server.submitter.k8s.model.MLJob;
import org.apache.submarine.server.submitter.k8s.parser.JobSpecParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileReader;
-import java.io.IOException;
-
/**
* JobSubmitter for Kubernetes Cluster.
*/
@@ -59,6 +59,7 @@ public class K8sJobSubmitter implements JobSubmitter {
public K8sJobSubmitter() {}
+ @VisibleForTesting
public K8sJobSubmitter(String confPath) {
this.confPath = confPath;
}
@@ -90,7 +91,8 @@ public class K8sJobSubmitter implements JobSubmitter {
ApiClient client = ClientBuilder.cluster().build();
Configuration.setDefaultApiClient(client);
} catch (IOException e1) {
- throw new SubmarineRuntimeException("Failed to initialize k8s client");
+ LOG.error("Initialize K8s submitter failed. " + e.getMessage(), e1);
+ throw new SubmarineRuntimeException(500, "Initialize K8s submitter
failed.");
}
}
}
@@ -101,98 +103,91 @@ public class K8sJobSubmitter implements JobSubmitter {
}
@Override
- public Job submitJob(JobSpec jobSpec)
- throws InvalidSpecException {
- Job job = null;
-
- boolean success = createJob(JobSpecParser.parseJob(jobSpec));
- if (success) {
- job = new Job();
- job.setName(jobSpec.getName());
- } else {
- LOG.error("Failed to create job." + jobSpec.toString());
+ public Job createJob(JobSpec jobSpec) throws SubmarineRuntimeException {
+ Job job;
+ try {
+ MLJob mlJob = JobSpecParser.parseJob(jobSpec);
+ Object object = api.createNamespacedCustomObject(mlJob.getGroup(),
mlJob.getVersion(),
+ mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob,
"true");
+ job = parseResponseObject(object, ParseOp.PARSE_OP_RESULT);
+ } catch (InvalidSpecException e) {
+ throw new SubmarineRuntimeException(200, e.getMessage());
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
return job;
}
- @VisibleForTesting
- boolean createJob(MLJob job) {
+ @Override
+ public Job findJob(JobSpec jobSpec) throws SubmarineRuntimeException {
+ Job job;
try {
- api.createNamespacedCustomObject(job.getGroup(), job.getVersion(),
- job.getMetadata().getNamespace(), job.getPlural(),
- job, "true");
+ MLJob mlJob = JobSpecParser.parseJob(jobSpec);
+ Object object = api.getNamespacedCustomObject(mlJob.getGroup(),
mlJob.getVersion(),
+ mlJob.getMetadata().getNamespace(), mlJob.getPlural(),
mlJob.getMetadata().getName());
+ job = parseResponseObject(object, ParseOp.PARSE_OP_RESULT);
+ } catch (InvalidSpecException e) {
+ throw new SubmarineRuntimeException(200, e.getMessage());
} catch (ApiException e) {
- LOG.error("Failed to create job. " + e.getMessage(), e);
- return false;
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
- return true;
+ return job;
}
- @VisibleForTesting
- CustomResourceJob createCustomJob(K8sJobRequest request) {
+ @Override
+ public Job patchJob(JobSpec jobSpec) throws SubmarineRuntimeException {
+ Job job;
try {
- K8sJobRequest.Path path = request.getPath();
- Object o = api.createNamespacedCustomObject(path.getGroup(),
- path.getApiVersion(), path.getNamespace(), path.getPlural(),
- request.getBody(), "true");
- Gson gson = new Gson();
- return gson.fromJson(gson.toJson(o), CustomResourceJob.class);
- } catch (ApiException ae) {
- LOG.error("Exceptions when creating CRD job: " + ae.getMessage(), ae);
+ MLJob mlJob = JobSpecParser.parseJob(jobSpec);
+ Object object = api.patchNamespacedCustomObject(mlJob.getGroup(),
mlJob.getVersion(),
+ mlJob.getMetadata().getNamespace(), mlJob.getPlural(),
mlJob.getMetadata().getName(),
+ mlJob);
+ job = parseResponseObject(object, ParseOp.PARSE_OP_RESULT);
+ } catch (InvalidSpecException e) {
+ throw new SubmarineRuntimeException(200, e.getMessage());
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
- return null;
+ return job;
}
- @VisibleForTesting
- CustomResourceJob getCustomResourceJob(K8sJobRequest request) {
+ @Override
+ public Job deleteJob(JobSpec jobSpec) throws SubmarineRuntimeException {
+ Job job;
try {
- K8sJobRequest.Path path = request.getPath();
- Object o = api.getNamespacedCustomObject(path.getGroup(),
- path.getApiVersion(),
- path.getNamespace(), path.getPlural(), request.getJobName());
- Gson gson = new Gson();
- return gson.fromJson(gson.toJson(o), CustomResourceJob.class);
- } catch (ApiException ae) {
- // The API getNamespacedCustomObject throws exception when cannot found
resource
- // So the ApiException seems not a big issue
- LOG.warn("Exceptions when getting CRD job: " + ae.getMessage());
+ MLJob mlJob = JobSpecParser.parseJob(jobSpec);
+ Object object = api.deleteNamespacedCustomObject(mlJob.getGroup(),
mlJob.getVersion(),
+ mlJob.getMetadata().getNamespace(), mlJob.getPlural(),
mlJob.getMetadata().getName(),
+ MLJobConverter.toDeleteOptionsFromMLJob(mlJob), null, null, null);
+ job = parseResponseObject(object, ParseOp.PARSE_OP_DELETE);
+ } catch (InvalidSpecException e) {
+ throw new SubmarineRuntimeException(200, e.getMessage());
+ } catch (ApiException e) {
+ throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
- return null;
+ return job;
}
- @VisibleForTesting
- CustomResourceJob deleteCustomResourceJob(K8sJobRequest request) {
+ private Job parseResponseObject(Object object, ParseOp op) throws
SubmarineRuntimeException {
+ Gson gson = new JSON().getGson();
+ String jsonString = gson.toJson(object);
+ LOG.info("Upstream response JSON: {}", jsonString);
try {
- K8sJobRequest.Path path = request.getPath();
- V1DeleteOptions body =
- new V1DeleteOptionsBuilder().withApiVersion(
- path.getApiVersion()).build();
- Object o = api.deleteNamespacedCustomObject(path.getGroup(),
- path.getApiVersion(), path.getNamespace(), path.getPlural(),
- request.getJobName(), body, null,
- null, null);
- Gson gson = new Gson();
- return gson.fromJson(gson.toJson(o), CustomResourceJob.class);
- } catch (ApiException ae) {
- LOG.error("Exceptions when deleting CRD job: " + ae.getMessage(), ae);
+ if (op == ParseOp.PARSE_OP_RESULT) {
+ MLJob mlJob = gson.fromJson(jsonString, MLJob.class);
+ return MLJobConverter.toJobFromMLJob(mlJob);
+ } else if (op == ParseOp.PARSE_OP_DELETE) {
+ V1Status status = gson.fromJson(jsonString, V1Status.class);
+ return MLJobConverter.toJobFromStatus(status);
+ }
+ } catch (JsonSyntaxException e) {
+ LOG.warn("K8s submitter: parse response object failed by " +
e.getMessage(), e);
}
- return null;
+ throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream
response failed.");
}
- @VisibleForTesting
- CustomResourceJobList listCustomResourceJobs(K8sJobRequest request) {
- try {
- K8sJobRequest.Path path = request.getPath();
- Object o = api.listNamespacedCustomObject(path.getGroup(),
- path.getApiVersion(),
- path.getNamespace(), path.getPlural(), "true",
- null, null, null,
- null, null);
- Gson gson = new Gson();
- return gson.fromJson(gson.toJson(o), CustomResourceJobList.class);
- } catch (ApiException ae) {
- LOG.error("Exceptions when listing CRD jobs: " + ae.getMessage(), ae);
- }
- return null;
+ private enum ParseOp {
+ PARSE_OP_RESULT,
+ PARSE_OP_DELETE
}
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
index 81efc1f..af77365 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
@@ -20,6 +20,7 @@
package org.apache.submarine.server.submitter.k8s.model;
import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.models.V1JobStatus;
import io.kubernetes.client.models.V1ObjectMeta;
/**
@@ -43,6 +44,9 @@ public class MLJob {
private transient String plural;
+ @SerializedName("status")
+ private V1JobStatus status;
+
/**
* Set the api with version
*
@@ -135,4 +139,12 @@ public class MLJob {
public void setPlural(String plural) {
this.plural = plural;
}
+
+ public V1JobStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(V1JobStatus status) {
+ this.status = status;
+ }
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
new file mode 100644
index 0000000..ec59a67
--- /dev/null
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.util;
+
+import java.util.List;
+
+import io.kubernetes.client.models.V1DeleteOptions;
+import io.kubernetes.client.models.V1DeleteOptionsBuilder;
+import io.kubernetes.client.models.V1JobCondition;
+import io.kubernetes.client.models.V1JobStatus;
+import io.kubernetes.client.models.V1Status;
+import io.kubernetes.client.models.V1StatusDetails;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.submitter.k8s.model.MLJob;
+import org.joda.time.DateTime;
+
+/**
+ * Converter for different types.
+ * Such as MLJob to Job, V1Status to Job and others.
+ */
+public class MLJobConverter {
+ public static Job toJobFromMLJob(MLJob mlJob) {
+ Job job = new Job();
+ job.setUid(mlJob.getMetadata().getUid());
+ job.setName(mlJob.getMetadata().getName());
+
+ DateTime dateTime = mlJob.getMetadata().getCreationTimestamp();
+ if (dateTime != null) {
+ job.setAcceptedTime(dateTime.toString());
+ job.setStatus(Job.Status.STATUS_ACCEPTED.getValue());
+ }
+
+ V1JobStatus status = mlJob.getStatus();
+ if (status != null) {
+ dateTime = status.getStartTime();
+ if (dateTime != null) {
+ job.setCreatedTime(dateTime.toString());
+ job.setStatus(Job.Status.STATUS_CREATED.getValue());
+ }
+
+ List<V1JobCondition> conditions = status.getConditions();
+ if (conditions != null && conditions.size() > 1) {
+ job.setStatus(Job.Status.STATUS_RUNNING.getValue());
+ for (V1JobCondition condition : conditions) {
+ if (Boolean.parseBoolean(condition.getStatus())
+ && condition.getType().toLowerCase().equals(
+ "running")) {
+ dateTime = condition.getLastTransitionTime();
+ job.setRunningTime(dateTime.toString());
+ break;
+ }
+ }
+ }
+
+ dateTime = status.getCompletionTime();
+ if (dateTime != null) {
+ job.setFinishedTime(dateTime.toString());
+ job.setStatus(Job.Status.STATUS_SUCCEEDED.getValue());
+ }
+ }
+ return job;
+ }
+
+ public static Job toJobFromStatus(V1Status status) {
+ Job job = new Job();
+ V1StatusDetails details = status.getDetails();
+ if (details != null) {
+ job.setUid(details.getUid());
+ job.setName(details.getName());
+ }
+ if (status.getStatus().toLowerCase().equals("success")) {
+ job.setStatus(Job.Status.STATUS_DELETED.getValue());
+ }
+ return job;
+ }
+
+ public static V1DeleteOptions toDeleteOptionsFromMLJob(MLJob job) {
+ return new
V1DeleteOptionsBuilder().withApiVersion(job.getApiVersion()).build();
+ }
+}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/JobSpecParserTest.java
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/JobSpecParserTest.java
index 8bf7a35..36bb28a 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/JobSpecParserTest.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/JobSpecParserTest.java
@@ -19,8 +19,9 @@
package org.apache.submarine.server.submitter.k8s;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
import org.apache.submarine.server.api.exception.InvalidSpecException;
import org.apache.submarine.server.api.spec.JobLibrarySpec;
import org.apache.submarine.server.api.spec.JobSpec;
@@ -36,19 +37,7 @@ import
org.apache.submarine.server.submitter.k8s.parser.JobSpecParser;
import org.junit.Assert;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.io.Reader;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-
-public class JobSpecParserTest {
-
- private final String pytorchJobReqFile = "/pytorch_job_req.json";
- private final String tfJobReqFile = "/tf_mnist_req.json";
-
+public class JobSpecParserTest extends SpecBuilder {
@Test
public void testValidTensorflowJobSpec() throws IOException,
URISyntaxException, InvalidSpecException {
@@ -182,19 +171,4 @@ public class JobSpecParserTest {
Assert.assertEquals(expectedMasterContainerCpu,
actualMasterContainerCpu);
}
-
- private JobSpec buildFromJsonFile(String filePath) throws IOException,
- URISyntaxException {
- Gson gson = new GsonBuilder().create();
- try (Reader reader = Files.newBufferedReader(
- getCustomJobSpecFile(filePath).toPath(),
- StandardCharsets.UTF_8)) {
- return gson.fromJson(reader, JobSpec.class);
- }
- }
-
- private File getCustomJobSpecFile(String path) throws URISyntaxException {
- URL fileUrl = this.getClass().getResource(path);
- return new File(fileUrl.toURI());
- }
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
index 4d91a60..3191f4e 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
@@ -19,32 +19,21 @@
package org.apache.submarine.server.submitter.k8s;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1Namespace;
import io.kubernetes.client.models.V1NamespaceList;
-import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.server.api.job.Job;
import org.apache.submarine.server.api.spec.JobSpec;
-import org.apache.submarine.server.submitter.k8s.model.CustomResourceJob;
-import org.apache.submarine.server.submitter.k8s.model.CustomResourceJobList;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.io.Reader;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-
/**
* We have two ways to test submitter for K8s cluster, local and travis CI.
* <p>
@@ -62,21 +51,10 @@ import java.nio.file.Files;
* Local: docker run -it --privileged -p 8443:8443 -p 10080:10080
bsycorp/kind:latest-1.15
* Travis: See '.travis.yml'
*/
-public class K8SJobSubmitterTest {
- private final String tfJobName = "mnist";
- private final String pytorchJobName = "pytorch-dist-mnist-gloo";
-
- // The spec files in test/resources
- private final String tfJobSpecFile = "/tf_job_mnist.json";
- private final String tfJobReqFile = "/tf_mnist_req.json";
- private final String pytorchJobSpecFile = "/pytorch_job_mnist_gloo.json";
- private final String pytorchJobReqFile = "/pytorch_job_req.json";
+public class K8SJobSubmitterTest extends SpecBuilder {
private K8sJobSubmitter submitter;
- private K8sJobRequest.Path tfPath;
- private K8sJobRequest.Path pyTorchPath;
-
@Before
public void before() throws IOException, ApiException {
String confPath = System.getProperty("user.home") + "/.kube/config";
@@ -87,12 +65,8 @@ public class K8SJobSubmitterTest {
submitter.initialize(null);
String ns = "submarine";
if (!isEnvReady()) {
- throw new ApiException(" Please create a namespace 'submarine'.");
+ throw new ApiException("Please create a namespace 'submarine'.");
}
- tfPath = new K8sJobRequest.Path(TFJob.CRD_TF_GROUP_V1,
- TFJob.CRD_TF_VERSION_V1, ns, TFJob.CRD_TF_PLURAL_V1);
- pyTorchPath = new K8sJobRequest.Path(PyTorchJob.CRD_PYTORCH_GROUP_V1,
- PyTorchJob.CRD_PYTORCH_VERSION_V1, ns,
PyTorchJob.CRD_PYTORCH_PLURAL_V1);
}
private boolean isEnvReady() throws ApiException {
@@ -106,101 +80,37 @@ public class K8SJobSubmitterTest {
return false;
}
- // Delete the job might take time
- @After
- public void after() {
- try {
- tryDeleteCustomJob(tfPath, tfJobName);
- tryDeleteCustomJob(pyTorchPath, pytorchJobName);
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testRawTFJobSpec() throws URISyntaxException {
- tryCreateCustomJob(tfPath, tfJobName, tfJobSpecFile);
- CustomResourceJob job = getCustomJob(tfPath, tfJobName);
- Assert.assertNotNull(job);
- Assert.assertEquals(tfJobName, job.getMetadata().getName());
- CustomResourceJobList jobList = listCustomJobs(tfPath, tfJobSpecFile);
- Assert.assertEquals(1, jobList.getItems().size());
- }
-
- @Test
- public void testRunRawPyTorchJobSpec() throws URISyntaxException {
- tryCreateCustomJob(pyTorchPath, pytorchJobName, pytorchJobSpecFile);
- CustomResourceJob job = getCustomJob(pyTorchPath, pytorchJobName);
- Assert.assertNotNull(job);
- Assert.assertEquals(pytorchJobName, job.getMetadata().getName());
- }
-
@Test
public void testRunPyTorchJobPerRequest() throws URISyntaxException,
- IOException, InvalidSpecException {
- JobSpec jobSpec = buildFromJsonFile(pytorchJobReqFile);
- Job job = submitter.submitJob(jobSpec);
- Assert.assertNotNull(job);
- CustomResourceJob cjob = getCustomJob(pyTorchPath, pytorchJobName);
- Assert.assertEquals(pytorchJobName, cjob.getMetadata().getName());
- Assert.assertNotNull(cjob);
+ IOException, SubmarineRuntimeException {
+ JobSpec spec = buildFromJsonFile(pytorchJobReqFile);
+ run(spec);
}
@Test
public void testRunTFJobPerRequest() throws URISyntaxException,
- IOException, InvalidSpecException {
- JobSpec jobSpec = buildFromJsonFile(tfJobReqFile);
- Job job = submitter.submitJob(jobSpec);
- Assert.assertNotNull(job);
- CustomResourceJob cjob = getCustomJob(tfPath, tfJobName);
- Assert.assertEquals(tfJobName, cjob.getMetadata().getName());
- Assert.assertNotNull(cjob);
- }
-
- private JobSpec buildFromJsonFile(String filePath) throws IOException,
- URISyntaxException {
- Gson gson = new GsonBuilder().create();
- try (Reader reader = Files.newBufferedReader(
- getCustomJobSpecFile(filePath).toPath(),
- StandardCharsets.UTF_8)) {
- return gson.fromJson(reader, JobSpec.class);
- }
- }
-
- public CustomResourceJob tryCreateCustomJob(K8sJobRequest.Path requestPath,
- String jobName, String jobSpecFile) throws URISyntaxException {
- CustomResourceJob job = getCustomJob(requestPath, jobName);
- if (job != null) {
- return job;
- }
- return submitter.createCustomJob(
- new K8sJobRequest(requestPath, getCustomJobSpecFile(jobSpecFile)));
- }
-
- public CustomResourceJobList listCustomJobs(K8sJobRequest.Path requestPath,
- String jobSpecFile) throws URISyntaxException {
- return submitter.listCustomResourceJobs(
- new K8sJobRequest(requestPath, getCustomJobSpecFile(jobSpecFile)));
- }
-
- public CustomResourceJob tryDeleteCustomJob(K8sJobRequest.Path requestPath,
- String jobName) {
- if (getCustomJob(requestPath, jobName) != null) {
- K8sJobRequest request = new K8sJobRequest(requestPath, null, jobName);
- return submitter.deleteCustomResourceJob(request);
- }
- return null;
- }
-
- private CustomResourceJob getCustomJob(K8sJobRequest.Path requestPath,
- String jobName) {
- K8sJobRequest request = new K8sJobRequest(requestPath, null, jobName);
- return submitter.getCustomResourceJob(request);
+ IOException, SubmarineRuntimeException {
+ JobSpec spec = buildFromJsonFile(tfJobReqFile);
+ run(spec);
}
- private File getCustomJobSpecFile(String path) throws URISyntaxException {
- URL fileUrl = this.getClass().getResource(path);
- return new File(fileUrl.toURI());
+ private void run(JobSpec spec) throws SubmarineRuntimeException {
+ // create
+ Job jobCreated = submitter.createJob(spec);
+ Assert.assertNotNull(jobCreated);
+
+ // find
+ Job jobFound = submitter.findJob(spec);
+ Assert.assertNotNull(jobFound);
+ Assert.assertEquals(jobCreated.getUid(), jobFound.getUid());
+ Assert.assertEquals(jobCreated.getName(), jobFound.getName());
+ Assert.assertEquals(jobCreated.getAcceptedTime(),
jobFound.getAcceptedTime());
+
+ // delete
+ Job jobDeleted = submitter.deleteJob(spec);
+ Assert.assertNotNull(jobDeleted);
+ Assert.assertEquals(Job.Status.STATUS_DELETED.getValue(),
jobDeleted.getStatus());
+ Assert.assertEquals(jobFound.getUid(), jobDeleted.getUid());
+ Assert.assertEquals(jobFound.getName(), jobDeleted.getName());
}
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
new file mode 100644
index 0000000..99feee4
--- /dev/null
+++
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import io.kubernetes.client.models.V1DeleteOptions;
+import io.kubernetes.client.models.V1JobCondition;
+import io.kubernetes.client.models.V1JobConditionBuilder;
+import io.kubernetes.client.models.V1JobStatus;
+import io.kubernetes.client.models.V1JobStatusBuilder;
+import io.kubernetes.client.models.V1Status;
+import io.kubernetes.client.models.V1StatusBuilder;
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.apache.submarine.server.submitter.k8s.model.MLJob;
+import org.apache.submarine.server.submitter.k8s.parser.JobSpecParser;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MLJobConverterTest extends SpecBuilder {
+ @Test
+ public void testMLJob2Job() throws IOException, URISyntaxException,
InvalidSpecException {
+ // Accepted Status
+ JobSpec spec = buildFromJsonFile(tfJobReqFile);
+ MLJob mlJob = JobSpecParser.parseJob(spec);
+ V1JobStatus status = new V1JobStatusBuilder().build();
+ mlJob.setStatus(status);
+ Job job = MLJobConverter.toJobFromMLJob(mlJob);
+ Assert.assertNull(job.getStatus());
+ Assert.assertNull(job.getStatus());
+
+ // Created Status
+ DateTime startTime = new DateTime();
+ mlJob.getStatus().setStartTime(startTime);
+
+ List<V1JobCondition> conditions = new ArrayList<>();
+ DateTime createdTime = new DateTime();
+ V1JobCondition condition = new V1JobConditionBuilder().withStatus("True")
+ .withType("Created").withLastTransitionTime(createdTime).build();
+ conditions.add(condition);
+ mlJob.getStatus().setConditions(conditions);
+
+ job = MLJobConverter.toJobFromMLJob(mlJob);
+ Assert.assertEquals(Job.Status.STATUS_CREATED.getValue(), job.getStatus());
+ Assert.assertEquals(startTime.toString(), job.getCreatedTime());
+
+ // Running Status
+ DateTime runningTime = new DateTime();
+ condition = new V1JobConditionBuilder().withStatus("True")
+ .withType("Running").withLastTransitionTime(runningTime).build();
+ conditions.add(condition);
+
+ mlJob.getStatus().setConditions(conditions);
+ job = MLJobConverter.toJobFromMLJob(mlJob);
+ Assert.assertEquals(Job.Status.STATUS_RUNNING.toString(), job.getStatus());
+ Assert.assertEquals(runningTime.toString(), job.getRunningTime());
+
+ // Succeeded Status
+ DateTime finishedTime = new DateTime();
+ mlJob.getStatus().setCompletionTime(finishedTime);
+ job = MLJobConverter.toJobFromMLJob(mlJob);
+ Assert.assertEquals(Job.Status.STATUS_SUCCEEDED.toString(),
job.getStatus());
+ Assert.assertEquals(finishedTime.toString(), job.getFinishedTime());
+ }
+
+ @Test
+ public void testStatus2Job() {
+ V1Status status = new V1StatusBuilder().withStatus("Success").build();
+ Job job = MLJobConverter.toJobFromStatus(status);
+ Assert.assertNotNull(job);
+ Assert.assertEquals(Job.Status.STATUS_DELETED.getValue(), job.getStatus());
+ }
+
+ @Test
+ public void testMLJob2DeleteOptions() throws IOException, URISyntaxException,
+ InvalidSpecException {
+ JobSpec spec = buildFromJsonFile(tfJobReqFile);
+ MLJob mlJob = JobSpecParser.parseJob(spec);
+ V1DeleteOptions options = MLJobConverter.toDeleteOptionsFromMLJob(mlJob);
+ Assert.assertNotNull(options);
+ Assert.assertEquals(mlJob.getApiVersion(), options.getApiVersion());
+ }
+}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
new file mode 100644
index 0000000..c8ac752
--- /dev/null
+++
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.submarine.server.api.spec.JobSpec;
+
+public abstract class SpecBuilder {
+ // The spec files in test/resources
+ protected final String tfJobReqFile = "/tf_mnist_req.json";
+ protected final String pytorchJobReqFile = "/pytorch_job_req.json";
+
+ protected JobSpec buildFromJsonFile(String filePath) throws IOException,
+ URISyntaxException {
+ Gson gson = new GsonBuilder().create();
+ try (Reader reader =
Files.newBufferedReader(getCustomJobSpecFile(filePath).toPath(),
+ StandardCharsets.UTF_8)) {
+ return gson.fromJson(reader, JobSpec.class);
+ }
+ }
+
+ private File getCustomJobSpecFile(String path) throws URISyntaxException {
+ URL fileUrl = this.getClass().getResource(path);
+ return new File(fileUrl.toURI());
+ }
+}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/pytorch_job_mnist_gloo.json
b/submarine-server/server-submitter/submitter-k8s/src/test/resources/pytorch_job_mnist_gloo.json
deleted file mode 100644
index a19a33d..0000000
---
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/pytorch_job_mnist_gloo.json
+++ /dev/null
@@ -1,47 +0,0 @@
-{
- "apiVersion": "kubeflow.org/v1",
- "kind": "PyTorchJob",
- "metadata": {
- "name": "pytorch-dist-mnist-gloo"
- },
- "spec": {
- "pytorchReplicaSpecs": {
- "Master": {
- "replicas": 1,
- "restartPolicy": "Never",
- "template": {
- "spec": {
- "containers": [
- {
- "name": "pytorch",
- "image": "apache/submarine:pytorch-dist-mnist-1.0",
- "args": [
- "--backend",
- "gloo"
- ]
- }
- ]
- }
- }
- },
- "Worker": {
- "replicas": 1,
- "restartPolicy": "Never",
- "template": {
- "spec": {
- "containers": [
- {
- "name": "pytorch",
- "image": "apache/submarine:pytorch-dist-mnist-1.0",
- "args": [
- "--backend",
- "gloo"
- ]
- }
- ]
- }
- }
- }
- }
- }
-}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.json
b/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.json
deleted file mode 100644
index 6d36d1d..0000000
---
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.json
+++ /dev/null
@@ -1,48 +0,0 @@
-{
- "apiVersion": "kubeflow.org/v1",
- "kind": "TFJob",
- "metadata": {
- "name": "mnist",
- "namespace": "submarine"
- },
- "spec": {
- "cleanPodPolicy": "None",
- "tfReplicaSpecs": {
- "Worker": {
- "replicas": 1,
- "restartPolicy": "Never",
- "template": {
- "spec": {
- "containers": [
- {
- "name": "tensorflow",
- "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
- "command": [
- "python",
- "/var/tf_mnist/mnist_with_summaries.py",
- "--log_dir=/train/logs",
- "--learning_rate=0.01",
- "--batch_size=150"
- ],
- "volumeMounts": [
- {
- "mountPath": "/train",
- "name": "training"
- }
- ]
- }
- ],
- "volumes": [
- {
- "name": "training",
- "persistentVolumeClaim": {
- "claimName": "tfevent-volume"
- }
- }
- ]
- }
- }
- }
- }
- }
-}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.yaml
b/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.yaml
deleted file mode 100644
index 370f4cc..0000000
---
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-apiVersion: "kubeflow.org/v1"
-kind: "TFJob"
-metadata:
- name: "mnist"
- namespace: kubeflow
-spec:
- cleanPodPolicy: None
- tfReplicaSpecs:
- Worker:
- replicas: 1
- restartPolicy: Never
- template:
- spec:
- containers:
- - name: tensorflow
- image: gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0
- command:
- - "python"
- - "/var/tf_mnist/mnist_with_summaries.py"
- - "--log_dir=/train/logs"
- - "--learning_rate=0.01"
- - "--batch_size=150"
- volumeMounts:
- - mountPath: "/train"
- name: "training"
- volumes:
- - name: "training"
- persistentVolumeClaim:
- claimName: "tfevent-volume"
diff --git a/submarine-test/test-k8s/pom.xml b/submarine-test/test-k8s/pom.xml
index b6b83c5..1232180 100644
--- a/submarine-test/test-k8s/pom.xml
+++ b/submarine-test/test-k8s/pom.xml
@@ -41,6 +41,12 @@
<groupId>org.apache.submarine</groupId>
<artifactId>submarine-server-core</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -72,6 +78,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -94,6 +104,12 @@
<artifactId>snakeyaml</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.kubernetes</groupId>
+ <artifactId>client-java</artifactId>
+ <version>${k8s.client-java.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
b/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
index ec63518..f9a0fb0 100644
---
a/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
+++
b/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
@@ -19,48 +19,285 @@
package org.apache.submarine.rest;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.Configuration;
+import io.kubernetes.client.JSON;
+import io.kubernetes.client.apis.CustomObjectsApi;
+import io.kubernetes.client.util.ClientBuilder;
+import io.kubernetes.client.util.KubeConfig;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.io.FileUtils;
import org.apache.submarine.server.AbstractSubmarineServerTest;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobId;
+import org.apache.submarine.server.json.JobIdDeserializer;
+import org.apache.submarine.server.json.JobIdSerializer;
import org.apache.submarine.server.response.JsonResponse;
import org.apache.submarine.server.rest.RestConstants;
+import org.joda.time.DateTime;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.junit.BeforeClass;
-
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.Assert.assertEquals;
+@SuppressWarnings("rawtypes")
public class JobManagerRestApiIT extends AbstractSubmarineServerTest {
private static final Logger LOG =
LoggerFactory.getLogger(JobManagerRestApiIT.class);
+ private static CustomObjectsApi k8sApi;
+ /** Key is the ml framework name, the value is the operator */
+ private static Map<String, KfOperator> kfOperatorMap;
+ private static String JOB_PATH = "/api/" + RestConstants.V1 + "/" +
RestConstants.JOBS;
+
+ private Gson gson = new GsonBuilder()
+ .registerTypeAdapter(JobId.class, new JobIdSerializer())
+ .registerTypeAdapter(JobId.class, new JobIdDeserializer())
+ .create();
+
@BeforeClass
- public static void startUp(){
+ public static void startUp() throws IOException {
Assert.assertTrue(checkIfServerIsRunning());
+
+ // The kube config path defined by kind-cluster-build.sh
+ String confPath = System.getProperty("user.home") +
"/.kube/kind-config-kind";
+ KubeConfig config = KubeConfig.loadKubeConfig(new FileReader(confPath));
+ ApiClient client = ClientBuilder.kubeconfig(config).build();
+ Configuration.setDefaultApiClient(client);
+ k8sApi = new CustomObjectsApi();
+
+ kfOperatorMap = new HashMap<>();
+ kfOperatorMap.put("tensorflow", new KfOperator("v1", "tfjobs"));
+ kfOperatorMap.put("pytorch", new KfOperator("v1", "pytorchjobs"));
+ }
+
+ @Test
+ public void testJobServerPing() throws IOException {
+ GetMethod response = httpGet("/api/" + RestConstants.V1 + "/"
+ + RestConstants.JOBS + "/" + RestConstants.PING);
+ String requestBody = response.getResponseBodyAsString();
+ Gson gson = new Gson();
+ JsonResponse jsonResponse = gson.fromJson(requestBody, JsonResponse.class);
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
jsonResponse.getCode());
+ Assert.assertEquals("Pong", jsonResponse.getResult().toString());
}
- // Test job created with correct JSON input
@Test
- public void testCreateJob() throws Exception {
- URL fileUrl = this.getClass().getResource("/tf-mnist-req.json");
- String jobSpec = FileUtils.readFileToString(new File(fileUrl.toURI()),
StandardCharsets.UTF_8);
+ public void testTensorFlowWithJsonSpec() throws Exception {
+ String body = loadContent("tensorflow/tf-mnist-req.json");
+ String patchBody = loadContent("tensorflow/tf-mnist-patch-req.json");
+ run(body, patchBody, "application/json");
+ }
- PostMethod response = httpPost("/api/" + RestConstants.V1 + "/" +
RestConstants.JOBS, jobSpec);
- LOG.debug(response.toString());
+ @Test
+ public void testTensorFlowWithYamlSpec() throws Exception {
+ String body = loadContent("tensorflow/tf-mnist-req.yaml");
+ String patchBody = loadContent("tensorflow/tf-mnist-patch-req.yaml");
+ run(body, patchBody, "application/yaml");
+ }
- String responseBodyAsString = response.getResponseBodyAsString();
- LOG.debug(responseBodyAsString);
+ @Test
+ public void testPyTorchWithJsonSpec() throws Exception {
+ String body = loadContent("pytorch/pt-mnist-req.json");
+ String patchBody = loadContent("pytorch/pt-mnist-patch-req.json");
+ run(body, patchBody, "application/json");
+ }
- Gson gson = new Gson();
- JsonResponse jsonResponse = gson.fromJson(responseBodyAsString,
JsonResponse.class);
- assertEquals("Response code should be 202 ",
- Response.Status.ACCEPTED.getStatusCode(), jsonResponse.getCode());
+ @Test
+ public void testPyTorchWithYamlSpec() throws Exception {
+ String body = loadContent("pytorch/pt-mnist-req.yaml");
+ String patchBody = loadContent("pytorch/pt-mnist-patch-req.yaml");
+ run(body, patchBody, "application/yaml");
+ }
+
+ private void run(String body, String patchBody, String contentType) throws
Exception {
+ // create
+ LOG.info("Create training job by Job REST API");
+ PostMethod postMethod = httpPost(JOB_PATH, body, contentType);
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
postMethod.getStatusCode());
+
+ String json = postMethod.getResponseBodyAsString();
+ JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
jsonResponse.getCode());
+
+ Job createdJob = gson.fromJson(gson.toJson(jsonResponse.getResult()),
Job.class);
+ verifyCreateJobApiResult(createdJob);
+
+ // find
+ GetMethod getMethod = httpGet(JOB_PATH + "/" +
createdJob.getJobId().toString());
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
getMethod.getStatusCode());
+
+ json = getMethod.getResponseBodyAsString();
+ jsonResponse = gson.fromJson(json, JsonResponse.class);
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
jsonResponse.getCode());
+
+ Job foundJob = gson.fromJson(gson.toJson(jsonResponse.getResult()),
Job.class);
+ verifyGetJobApiResult(createdJob, foundJob);
+
+ // patch
+ // TODO(jiwq): the commons-httpclient not support patch method
+ // https://tools.ietf.org/html/rfc5789
+
+ // delete
+ DeleteMethod deleteMethod = httpDelete(JOB_PATH + "/" +
createdJob.getJobId().toString());
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
deleteMethod.getStatusCode());
+
+ json = deleteMethod.getResponseBodyAsString();
+ jsonResponse = gson.fromJson(json, JsonResponse.class);
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
jsonResponse.getCode());
+
+ Job deletedJob = gson.fromJson(gson.toJson(jsonResponse.getResult()),
Job.class);
+ verifyDeleteJobApiResult(createdJob, deletedJob);
+ }
+
+ private void verifyCreateJobApiResult(Job createdJob) throws Exception {
+ Assert.assertNotNull(createdJob.getUid());
+ Assert.assertNotNull(createdJob.getAcceptedTime());
+ Assert.assertEquals(Job.Status.STATUS_ACCEPTED.getValue(),
createdJob.getStatus());
+
+ assertK8sResultEquals(createdJob);
+ }
+
+ private void verifyGetJobApiResult(Job createdJob, Job foundJob) throws
Exception {
+ Assert.assertEquals(createdJob.getJobId(), foundJob.getJobId());
+ Assert.assertEquals(createdJob.getUid(), foundJob.getUid());
+ Assert.assertEquals(createdJob.getName(), foundJob.getName());
+ Assert.assertEquals(createdJob.getAcceptedTime(),
foundJob.getAcceptedTime());
+
+ assertK8sResultEquals(foundJob);
+ }
+
+ private void assertK8sResultEquals(Job job) throws Exception {
+ KfOperator operator =
kfOperatorMap.get(job.getSpec().getLibrarySpec().getName()
+ .toLowerCase());
+ JsonObject rootObject = getJobByK8sApi(operator.getGroup(),
operator.getVersion(),
+ operator.getNamespace(), operator.getPlural(), job.getName());
+ JsonObject metadataObject = rootObject.getAsJsonObject("metadata");
+
+ String uid = metadataObject.getAsJsonPrimitive("uid").getAsString();
+ LOG.info("Uid from Job REST is {}", job.getUid());
+ LOG.info("Uid from K8s REST is {}", uid);
+ Assert.assertEquals(job.getUid(), uid);
+
+ String creationTimestamp =
metadataObject.getAsJsonPrimitive("creationTimestamp")
+ .getAsString();
+ Date expectedDate = new DateTime(job.getAcceptedTime()).toDate();
+ Date actualDate = new DateTime(creationTimestamp).toDate();
+ LOG.info("CreationTimestamp from Job REST is {}", expectedDate);
+ LOG.info("CreationTimestamp from K8s REST is {}", actualDate);
+ Assert.assertEquals(expectedDate, actualDate);
+ }
+
+ private void verifyDeleteJobApiResult(Job createdJob, Job deletedJob) {
+ Assert.assertEquals(createdJob.getName(), deletedJob.getName());
+ Assert.assertEquals(Job.Status.STATUS_DELETED.getValue(),
deletedJob.getStatus());
+
+ // verify the result by K8s api
+ KfOperator operator =
kfOperatorMap.get(createdJob.getSpec().getLibrarySpec().getName()
+ .toLowerCase());
+ JsonObject rootObject = null;
+ try {
+ rootObject = getJobByK8sApi(operator.getGroup(), operator.getVersion(),
+ operator.getNamespace(), operator.getPlural(), createdJob.getName());
+ } catch (ApiException e) {
+ Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
e.getCode());
+ } finally {
+ Assert.assertNull(rootObject);
+ }
+ }
+
+ private JsonObject getJobByK8sApi(String group, String version, String
namespace, String plural,
+ String name) throws ApiException {
+ Object obj = k8sApi.getNamespacedCustomObject(group, version, namespace,
plural, name);
+ Gson gson = new JSON().getGson();
+ JsonObject rootObject = gson.toJsonTree(obj).getAsJsonObject();
+ Assert.assertNotNull("Parse the K8s API Server response failed.",
rootObject);
+ return rootObject;
+ }
+
+ @Test
+ public void testCreateJobWithInvalidSpec() throws Exception {
+ PostMethod postMethod = httpPost(JOB_PATH, "", MediaType.APPLICATION_JSON);
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
postMethod.getStatusCode());
+
+ String json = postMethod.getResponseBodyAsString();
+ JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
jsonResponse.getCode());
+ }
+
+ @Test
+ public void testCreateJobWithInvalidSubmitter() throws Exception {
+ String body = loadContent("tensorflow/tf-mnist-req.json");
+ body = body.replace("k8s", "InvalidSubmitter");
+ PostMethod postMethod = httpPost(JOB_PATH, body,
MediaType.APPLICATION_JSON);
+ Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ postMethod.getStatusCode());
+
+ String json = postMethod.getResponseBodyAsString();
+ JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+ Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ jsonResponse.getCode());
+ }
+
+ @Test
+ public void testNotFoundJob() throws Exception {
+ GetMethod getMethod = httpGet(JOB_PATH + "/" + "job_123456789_0001");
+ Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
getMethod.getStatusCode());
+
+ String json = getMethod.getResponseBodyAsString();
+ JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+ Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
jsonResponse.getCode());
+ }
+
+ String loadContent(String resourceName) throws Exception {
+ URL fileUrl = this.getClass().getResource("/" + resourceName);
+ LOG.info("Resource file: " + fileUrl);
+ return FileUtils.readFileToString(new File(fileUrl.toURI()),
StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Direct used by K8s api. It storage the operator's base info.
+ */
+ private static class KfOperator {
+ private String version;
+ private String plural;
+
+ KfOperator(String version, String plural) {
+ this.version = version;
+ this.plural = plural;
+ }
+
+ public String getGroup() {
+ return "kubeflow.org";
+ }
+
+ public String getNamespace() {
+ return "default";
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public String getPlural() {
+ return plural;
+ }
}
}
diff --git
a/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.json
b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.json
new file mode 100644
index 0000000..dd4467c
--- /dev/null
+++ b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.json
@@ -0,0 +1,28 @@
+{
+ "name": "mnist",
+ "librarySpec": {
+ "name": "pytorch",
+ "version": "2.1.0",
+ "image": "apache/submarine:pytorch-dist-mnist-1.0",
+ "cmd": "python /var/mnist.py --backend gloo",
+ "envVars": {
+ "ENV_1": "ENV1"
+ }
+ },
+ "submitterSpec": {
+ "type": "k8s",
+ "namespace": "default"
+ },
+ "taskSpecs": {
+ "Master": {
+ "name": "master",
+ "replicas": 1,
+ "resources": "cpu=1,memory=1024M"
+ },
+ "Worker": {
+ "name": "worker",
+ "replicas": 2,
+ "resources": "cpu=1,memory=1024M"
+ }
+ }
+}
diff --git
a/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.yaml
b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.yaml
new file mode 100644
index 0000000..8b47741
--- /dev/null
+++ b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.yaml
@@ -0,0 +1,21 @@
+---
+name: mnist
+librarySpec:
+ name: pytorch
+ version: 2.1.0
+ image: apache/submarine:pytorch-dist-mnist-1.0
+ cmd: python /var/mnist.py --backend gloo
+ envVars:
+ ENV_1: ENV1
+submitterSpec:
+ type: k8s
+ namespace: default
+taskSpecs:
+ Master:
+ name: master
+ replicas: 1
+ resources: cpu=1,memory=1024M
+ Worker:
+ name: worker
+ replicas: s
+ resources: cpu=1,memory=1024M
diff --git
a/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.json
b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.json
new file mode 100644
index 0000000..cb884bf
--- /dev/null
+++ b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.json
@@ -0,0 +1,28 @@
+{
+ "name": "mnist",
+ "librarySpec": {
+ "name": "pytorch",
+ "version": "2.1.0",
+ "image": "apache/submarine:pytorch-dist-mnist-1.0",
+ "cmd": "python /var/mnist.py --backend gloo",
+ "envVars": {
+ "ENV_1": "ENV1"
+ }
+ },
+ "submitterSpec": {
+ "type": "k8s",
+ "namespace": "default"
+ },
+ "taskSpecs": {
+ "Master": {
+ "name": "master",
+ "replicas": 1,
+ "resources": "cpu=1,memory=1024M"
+ },
+ "Worker": {
+ "name": "worker",
+ "replicas": 1,
+ "resources": "cpu=1,memory=1024M"
+ }
+ }
+}
diff --git
a/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.yaml
b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.yaml
new file mode 100644
index 0000000..414d6ef
--- /dev/null
+++ b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.yaml
@@ -0,0 +1,21 @@
+---
+name: mnist
+librarySpec:
+ name: pytorch
+ version: 2.1.0
+ image: apache/submarine:pytorch-dist-mnist-1.0
+ cmd: python /var/mnist.py --backend gloo
+ envVars:
+ ENV_1: ENV1
+submitterSpec:
+ type: k8s
+ namespace: default
+taskSpecs:
+ Master:
+ name: master
+ replicas: 1
+ resources: cpu=1,memory=1024M
+ Worker:
+ name: worker
+ replicas: 1
+ resources: cpu=1,memory=1024M
diff --git a/submarine-server/server-core/src/test/resources/tf-mnist-req.json
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.json
similarity index 58%
rename from submarine-server/server-core/src/test/resources/tf-mnist-req.json
rename to
submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.json
index 0850e54..d8df243 100644
--- a/submarine-server/server-core/src/test/resources/tf-mnist-req.json
+++
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.json
@@ -11,21 +11,13 @@
},
"submitterSpec": {
"type": "k8s",
- "configPath": null,
- "namespace": "submarine",
- "kind": "TFJob",
- "apiVersion": "kubeflow.org/v1"
+ "namespace": "default"
},
"taskSpecs": {
- "Ps": {
- "name": "tensorflow",
- "replicas": 2,
- "resources": "cpu=4,memory=2048M,nvidia.com/gpu=1"
- },
"Worker": {
"name": "tensorflow",
- "replicas": 2,
- "resources": "cpu=4,memory=2048M,nvidia.com/gpu=1"
+ "replicas": 1,
+ "resources": "cpu=1,memory=1024M"
}
}
-}
\ No newline at end of file
+}
diff --git a/submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.yaml
similarity index 67%
copy from submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
copy to
submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.yaml
index 02e2125..edae4cb 100644
--- a/submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
+++
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.yaml
@@ -9,15 +9,11 @@ librarySpec:
submitterSpec:
type: "k8s"
configPath:
- namespace: "submarine"
+ namespace: "default"
kind: "TFJob"
apiVersion: "kubeflow.org/v1"
taskSpecs:
- Ps:
- name: tensorflow
- replicas: 2
- resources: "cpu=4,memory=2048M,nvidia.com/gpu=1"
Worker:
name: tensorflow
- replicas: 2
- resources: "cpu=4,memory=2048M,nvidia.com/gpu=1"
+ replicas: 1
+ resources: "cpu=4,memory=2048M"
diff --git a/submarine-test/test-k8s/src/test/resources/tf-mnist-req.json
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.json
similarity index 72%
rename from submarine-test/test-k8s/src/test/resources/tf-mnist-req.json
rename to
submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.json
index 13fc934..d7f7473 100644
--- a/submarine-test/test-k8s/src/test/resources/tf-mnist-req.json
+++ b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.json
@@ -11,18 +11,18 @@
},
"submitterSpec": {
"type": "k8s",
- "namespace": "submarine"
+ "namespace": "default"
},
"taskSpecs": {
"Ps": {
"name": "tensorflow",
- "replicas": 2,
- "resources": "cpu=4,memory=2048M,nvidia.com/gpu=1"
+ "replicas": 1,
+ "resources": "cpu=1,memory=512M"
},
"Worker": {
"name": "tensorflow",
- "replicas": 2,
- "resources": "cpu=4,memory=2048M,nvidia.com/gpu=1"
+ "replicas": 1,
+ "resources": "cpu=1,memory=512M"
}
}
}
diff --git a/submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.yaml
similarity index 70%
rename from submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
rename to
submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.yaml
index 02e2125..bf729a6 100644
--- a/submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
+++ b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.yaml
@@ -8,16 +8,15 @@ librarySpec:
ENV_1: "ENV1"
submitterSpec:
type: "k8s"
- configPath:
- namespace: "submarine"
+ namespace: "default"
kind: "TFJob"
apiVersion: "kubeflow.org/v1"
taskSpecs:
Ps:
name: tensorflow
- replicas: 2
- resources: "cpu=4,memory=2048M,nvidia.com/gpu=1"
+ replicas: 1
+ resources: "cpu=1,memory=512M"
Worker:
name: tensorflow
- replicas: 2
- resources: "cpu=4,memory=2048M,nvidia.com/gpu=1"
+ replicas: 1
+ resources: "cpu=1,memory=512M"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]