This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 56b2a56  SUBMARINE-455. Support CRUD job in submarine-server REST
56b2a56 is described below

commit 56b2a566696409d83f0b27c66890e9923cfdfedd
Author: Wanqiang Ji <[email protected]>
AuthorDate: Thu Apr 9 20:23:01 2020 +0800

    SUBMARINE-455. Support CRUD job in submarine-server REST
    
    ### What is this PR for?
    Nowadays the submarine-server only support create the job by REST. We 
should implement the other operations.
    
    - GET/PATCH/DELETE job by REST
    - Fill more info for Job object, such as the job 
status(accepted/created/running/succeeded) with time and the spec etc.
    
    ### What type of PR is it?
    Feature
    
    ### Todos
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-455
    
    ### How should this be tested?
    https://travis-ci.com/github/jiwq/submarine/builds/155090047
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Wanqiang Ji <[email protected]>
    
    Closes #249 from jiwq/SUBMARINE-455 and squashes the following commits:
    
    3049448 [Wanqiang Ji] SUBMARINE-455. Support CRUD job in submarine-server 
REST
---
 .gitignore                                         |   1 +
 conf/submarine-site.xml.template                   |  24 ++
 dev-support/k8s/deploy-kubeflow-operators.sh       | 161 +++++++++++
 dev-support/k8s/pytorchjob/deployment.yaml         |   2 +-
 .../k8s/pytorchjob/deployment_v1.16plus.yaml       |   2 +-
 dev-support/k8s/pytorchjob/rbac.yaml               |   4 +-
 dev-support/k8s/pytorchjob/service.yaml            |   2 +-
 dev-support/k8s/tfjob/crd.yaml                     |  33 +--
 dev-support/k8s/tfjob/operator/cluster-role.yaml   | 109 ++++----
 dev-support/k8s/tfjob/operator/deployment.yaml     |  33 +--
 dev-support/k8s/tfjob/operator/kustomization.yaml  |   4 +-
 docs/submarine-server/README.md                    | 306 +++++++++++++++++----
 submarine-cloud/hack/integration-test.sh           |   1 +
 .../manifests/submarine-cluster/rbac.yaml          |   5 +
 .../submarine-cluster/submarine-server.yaml        |   2 +-
 .../utils/exception/SubmarineRuntimeException.java |  11 +
 .../apache/submarine/server/api/JobHandler.java    |  71 -----
 .../api/exception/UnsupportedJobTypeException.java |  28 --
 .../org/apache/submarine/server/api/job/Job.java   | 136 +++++++--
 .../org/apache/submarine/server/api/job/JobId.java |   5 +-
 .../server/api/{ => job}/JobSubmitter.java         |  38 ++-
 .../org/apache/submarine/server/JobManager.java    | 104 -------
 .../apache/submarine/server/SubmarineServer.java   |   3 +-
 .../apache/submarine/server/SubmitterManager.java  |   2 +-
 .../apache/submarine/server/job/JobManager.java    | 194 +++++++++++++
 .../submarine/server/response/JsonResponse.java    |   7 +-
 .../submarine/server/rest/JobManagerRestApi.java   |  92 ++++---
 .../server/AbstractSubmarineServerTest.java        |  11 +-
 .../submarine/server/SubmarineServerTest.java      |   4 -
 .../server/rest/JobManagerRestApiTest.java         | 181 ------------
 .../server/submitter/k8s/K8sJobRequest.java        |  89 ------
 .../server/submitter/k8s/K8sJobSubmitter.java      | 157 +++++------
 .../server/submitter/k8s/model/MLJob.java          |  12 +
 .../server/submitter/k8s/util/MLJobConverter.java  |  97 +++++++
 .../server/submitter/k8s/JobSpecParserTest.java    |  34 +--
 .../server/submitter/k8s/K8SJobSubmitterTest.java  | 152 +++-------
 .../server/submitter/k8s/MLJobConverterTest.java   | 107 +++++++
 .../server/submitter/k8s/SpecBuilder.java          |  52 ++++
 .../src/test/resources/pytorch_job_mnist_gloo.json |  47 ----
 .../src/test/resources/tf_job_mnist.json           |  48 ----
 .../src/test/resources/tf_job_mnist.yaml           |  29 --
 submarine-test/test-k8s/pom.xml                    |  16 ++
 .../apache/submarine/rest/JobManagerRestApiIT.java | 279 +++++++++++++++++--
 .../test/resources/pytorch/pt-mnist-patch-req.json |  28 ++
 .../test/resources/pytorch/pt-mnist-patch-req.yaml |  21 ++
 .../src/test/resources/pytorch/pt-mnist-req.json   |  28 ++
 .../src/test/resources/pytorch/pt-mnist-req.yaml   |  21 ++
 .../resources/tensorflow/tf-mnist-patch-req.json   |  16 +-
 .../resources/tensorflow/tf-mnist-patch-req.yaml   |  10 +-
 .../resources/{ => tensorflow}/tf-mnist-req.json   |  10 +-
 .../test/resources/tensorflow}/tf-mnist-req.yaml   |  11 +-
 51 files changed, 1739 insertions(+), 1101 deletions(-)

diff --git a/.gitignore b/.gitignore
index aa35d37..ebf34fc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -83,6 +83,7 @@ spark-1.*-bin-hadoop*
 submarine-cloud/vendor/*
 submarine-cloud/output/*
 submarine-cloud/hack/conf/*
+submarine-cloud/hack/output/*
 submarine-cloud/bin/*
 
 submarine-security/spark-security/dependency-reduced-pom.xml
diff --git a/conf/submarine-site.xml.template b/conf/submarine-site.xml.template
index 4ebb047..e4caa2d 100755
--- a/conf/submarine-site.xml.template
+++ b/conf/submarine-site.xml.template
@@ -155,4 +155,28 @@
     <description>Rpc server port</description>
   </property>
 
+  <!-- Submarine Submitters Configuration  -->
+    <property>
+      <name>submarine.submitters</name>
+      <value>k8s</value>
+      <description>Submitter list for the server</description>
+    </property>
+    <property>
+      <name>submarine.submitters.k8s.class</name>
+      <value>org.apache.submarine.server.submitter.k8s.K8sJobSubmitter</value>
+      <description>The entry class for the specified submitter</description>
+    </property>
+    <property>
+      <name>submarine.submitters.k8s.classpath</name>
+      <value>../lib/submitter/k8s/</value>
+      <description>The libs for Kubernetes submitter</description>
+    </property>
+
+    <!-- K8s Configuration -->
+    <property>
+      <name>submarine.k8s.kube.config</name>
+      <value>../conf/k8s/config</value>
+      <description>Kube config for kubernetes, you should get the config from 
cluster</description>
+    </property>
+
 </configuration>
diff --git a/dev-support/k8s/deploy-kubeflow-operators.sh 
b/dev-support/k8s/deploy-kubeflow-operators.sh
new file mode 100755
index 0000000..d35ef86
--- /dev/null
+++ b/dev-support/k8s/deploy-kubeflow-operators.sh
@@ -0,0 +1,161 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -e
+
+readonly 
TF_OPERATOR_IMAGE="gcr.io/kubeflow-images-public/tf_operator:v1.0.0-g92389064"
+readonly 
PYTORCH_OPERATOR_IMAGE="gcr.io/kubeflow-images-public/pytorch-operator:v1.0.0-g047cf0f"
+readonly TF_MNIST_IMAGE="gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0"
+readonly PT_MNIST_IMAGE="apache/submarine:pytorch-dist-mnist-1.0"
+
+if [ -L "${BASH_SOURCE-$0}" ]; then
+  PWD=$(dirname "$(readlink "${BASH_SOURCE-$0}")")
+else
+  PWD=$(dirname "${BASH_SOURCE-$0}")
+fi
+CURRENT_PATH=$(cd "${PWD}">/dev/null; pwd)
+export CURRENT_PATH
+export SUBMARINE_HOME=${CURRENT_PATH}/../..
+# lib.sh use the ROOT variable
+export ROOT="${SUBMARINE_HOME}/submarine-cloud/"
+export KUBECONFIG="${HOME}/.kube/kind-config-${clusterName:-kind}"
+
+# shellcheck source=./../../submarine-cloud/hack/lib.sh
+source "${SUBMARINE_HOME}/submarine-cloud/hack/lib.sh"
+
+###########################################
+# Load local docker image into registry
+# Globals:
+#   KIND_BIN
+# Arguments:
+#   image
+###########################################
+function load_image_to_registry() {
+  if [[ ! $(docker inspect "$1" > /dev/null) ]] ; then
+    docker pull "$1"
+  fi
+  ${KIND_BIN} load docker-image "$1"
+}
+
+###########################################
+# Deploy tf-operator on K8s
+# Globals:
+#   KUBECTL_BIN
+#   CURRENT_PATH
+#   TF_OPERATOR_IMAGE
+# Arguments:
+#   useSample
+###########################################
+function deploy_tf_operator() {
+  load_image_to_registry "${TF_OPERATOR_IMAGE}"
+
+  ${KUBECTL_BIN} apply -f "${CURRENT_PATH}/tfjob/crd.yaml"
+  ${KUBECTL_BIN} kustomize "${CURRENT_PATH}/tfjob/operator" \
+    | ${KUBECTL_BIN} apply -f -
+
+  if [[ $1 == "true" ]]; then
+    load_image_to_registry "${TF_MNIST_IMAGE}"
+  fi
+}
+
+###########################################
+# Deploy tf-operator on K8s
+# Globals:
+#   KUBECTL_BIN
+#   CURRENT_PATH
+#   PYTORCH_OPERATOR_IMAGE
+# Arguments:
+#   useSample
+###########################################
+function deploy_pytorch_operator() {
+  load_image_to_registry "${PYTORCH_OPERATOR_IMAGE}"
+  ${KUBECTL_BIN} apply -f "${CURRENT_PATH}/pytorchjob"
+
+  if [[ $1 == "true" ]]; then
+    load_image_to_registry "${PT_MNIST_IMAGE}"
+  fi
+}
+
+###########################################
+# Print the usage information
+###########################################
+function usage() {
+  cat <<END
+
+This script aims to deploy the machine learning operator to K8s.
+
+Usage:
+    $0 [options]
+
+Options:
+  -a,  --all         deploy the TensorFlow and PyTorch operator
+  -tf, --tensorflow  deploy the TensorFlow operator
+  -pt, --pytorch     deploy the PyTorch operator
+  -s,  --sample      pull the sample docker image and load into K8s registry
+  -h,  --help        prints this usage message
+END
+}
+
+function main() {
+  if [[ $# -eq 0 ]]; then
+    usage
+    exit 1
+  fi
+
+  while [[ $# -gt 0 ]]; do
+    case $1 in
+      -a|--all)
+        opt_all="true"
+        shift ;;
+      -tf|--tensorflow)
+        opt_tf="true"
+        shift ;;
+      -pt|--pytorch)
+        opt_pt="true"
+        shift ;;
+      -s|--sample)
+        opt_s="true"
+        shift ;;
+      -h|--help)
+        usage
+        exit 0
+        ;;
+      *)
+        echo "Unknown options: $*"
+        usage
+        exit 2
+        ;;
+    esac
+  done
+
+  hack::ensure_kubectl
+
+  if [[ "${opt_tf}" == "true" && "${opt_pt}" == "true" ]]; then
+    opt_all="true"
+  fi
+  if [[ "${opt_all}" == "true" ]]; then
+    deploy_tf_operator ${opt_s}
+    deploy_pytorch_operator ${opt_s}
+  elif [[ "${opt_tf}" == "true" ]]; then
+    deploy_tf_operator ${opt_s}
+  elif [[ "${opt_pt}" == "true" ]]; then
+    deploy_pytorch_operator ${opt_s}
+  fi
+}
+
+main "$@"
diff --git a/dev-support/k8s/pytorchjob/deployment.yaml 
b/dev-support/k8s/pytorchjob/deployment.yaml
index 8ddecda..441a517 100644
--- a/dev-support/k8s/pytorchjob/deployment.yaml
+++ b/dev-support/k8s/pytorchjob/deployment.yaml
@@ -3,7 +3,7 @@ apiVersion: extensions/v1beta1
 kind: Deployment
 metadata:
   name: pytorch-operator
-  namespace: submarine
+  namespace: default
 spec:
   replicas: 1
   selector:
diff --git a/dev-support/k8s/pytorchjob/deployment_v1.16plus.yaml 
b/dev-support/k8s/pytorchjob/deployment_v1.16plus.yaml
index e80a9ab..4a2238f 100644
--- a/dev-support/k8s/pytorchjob/deployment_v1.16plus.yaml
+++ b/dev-support/k8s/pytorchjob/deployment_v1.16plus.yaml
@@ -5,7 +5,7 @@ metadata:
   labels:
     name: pytorch-operator
   name: pytorch-operator
-  namespace: submarine
+  namespace: default
 spec:
   progressDeadlineSeconds: 2147483647
   replicas: 1
diff --git a/dev-support/k8s/pytorchjob/rbac.yaml 
b/dev-support/k8s/pytorchjob/rbac.yaml
index 637b859..0261c85 100644
--- a/dev-support/k8s/pytorchjob/rbac.yaml
+++ b/dev-support/k8s/pytorchjob/rbac.yaml
@@ -4,7 +4,7 @@ metadata:
   labels:
     app: pytorch-operator
   name: pytorch-operator
-  namespace: submarine
+  namespace: default
 ---
 apiVersion: rbac.authorization.k8s.io/v1beta1
 kind: ClusterRole
@@ -49,5 +49,5 @@ roleRef:
 subjects:
 - kind: ServiceAccount
   name: pytorch-operator
-  namespace: submarine
+  namespace: default
 ---
diff --git a/dev-support/k8s/pytorchjob/service.yaml 
b/dev-support/k8s/pytorchjob/service.yaml
index e9c0a70..8e57a49 100644
--- a/dev-support/k8s/pytorchjob/service.yaml
+++ b/dev-support/k8s/pytorchjob/service.yaml
@@ -8,7 +8,7 @@ metadata:
   labels:
     app: pytorch-operator
   name: pytorch-operator
-  namespace: submarine
+  namespace: default
 spec:
   ports:
   - name: monitoring-port
diff --git a/dev-support/k8s/tfjob/crd.yaml b/dev-support/k8s/tfjob/crd.yaml
index b693c40..38f850b 100644
--- a/dev-support/k8s/tfjob/crd.yaml
+++ b/dev-support/k8s/tfjob/crd.yaml
@@ -3,19 +3,16 @@ kind: CustomResourceDefinition
 metadata:
   name: tfjobs.kubeflow.org
 spec:
-  additionalPrinterColumns:
-  - JSONPath: .status.conditions[-1:].type
-    name: State
-    type: string
-  - JSONPath: .metadata.creationTimestamp
-    name: Age
-    type: date
   group: kubeflow.org
+  scope: Namespaced
   names:
     kind: TFJob
-    plural: tfjobs
     singular: tfjob
-  scope: Namespaced
+    plural: tfjobs
+  versions:
+    - name: v1
+      served: true
+      storage: true
   subresources:
     status: {}
   validation:
@@ -25,23 +22,21 @@ spec:
           properties:
             tfReplicaSpecs:
               properties:
-                Chief:
+                # The validation works when the configuration contains
+                # `Worker`, `PS` or `Chief`. Otherwise it will not be 
validated.
+                Worker:
                   properties:
                     replicas:
-                      maximum: 1
-                      minimum: 1
                       type: integer
+                      minimum: 1
                 PS:
                   properties:
                     replicas:
-                      minimum: 1
                       type: integer
-                Worker:
+                      minimum: 1
+                Chief:
                   properties:
                     replicas:
-                      minimum: 1
                       type: integer
-  versions:
-  - name: v1
-    served: true
-    storage: true
+                      minimum: 1
+                      maximum: 1
diff --git a/dev-support/k8s/tfjob/operator/cluster-role.yaml 
b/dev-support/k8s/tfjob/operator/cluster-role.yaml
index 72b2903..2740b98 100644
--- a/dev-support/k8s/tfjob/operator/cluster-role.yaml
+++ b/dev-support/k8s/tfjob/operator/cluster-role.yaml
@@ -6,35 +6,36 @@ metadata:
     app: tf-job-operator
   name: tf-job-operator
 rules:
-- apiGroups:
-  - kubeflow.org
-  resources:
-  - tfjobs
-  - tfjobs/status
-  verbs:
-  - '*'
-- apiGroups:
-  - apiextensions.k8s.io
-  resources:
-  - customresourcedefinitions
-  verbs:
-  - '*'
-- apiGroups:
-  - ""
-  resources:
-  - pods
-  - services
-  - endpoints
-  - events
-  verbs:
-  - '*'
-- apiGroups:
-  - apps
-  - extensions
-  resources:
-  - deployments
-  verbs:
-  - '*'
+  - apiGroups:
+      - kubeflow.org
+    resources:
+      - tfjobs
+      - tfjobs/status
+      - tfjobs/finalizers
+    verbs:
+      - '*'
+  - apiGroups:
+      - apiextensions.k8s.io
+    resources:
+      - customresourcedefinitions
+    verbs:
+      - '*'
+  - apiGroups:
+      - ""
+    resources:
+      - pods
+      - services
+      - endpoints
+      - events
+    verbs:
+      - '*'
+  - apiGroups:
+      - apps
+      - extensions
+    resources:
+      - deployments
+    verbs:
+      - '*'
 
 ---
 
@@ -46,8 +47,8 @@ metadata:
     rbac.authorization.kubeflow.org/aggregate-to-kubeflow-admin: "true"
 aggregationRule:
   clusterRoleSelectors:
-  - matchLabels:
-      rbac.authorization.kubeflow.org/aggregate-to-kubeflow-tfjobs-admin: 
"true"
+    - matchLabels:
+        rbac.authorization.kubeflow.org/aggregate-to-kubeflow-tfjobs-admin: 
"true"
 rules: []
 
 ---
@@ -60,20 +61,20 @@ metadata:
     rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
     rbac.authorization.kubeflow.org/aggregate-to-kubeflow-tfjobs-admin: "true"
 rules:
-- apiGroups:
-  - kubeflow.org
-  resources:
-  - tfjobs
-  - tfjobs/status
-  verbs:
-  - get
-  - list
-  - watch
-  - create
-  - delete
-  - deletecollection
-  - patch
-  - update
+  - apiGroups:
+      - kubeflow.org
+    resources:
+      - tfjobs
+      - tfjobs/status
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+      - delete
+      - deletecollection
+      - patch
+      - update
 
 ---
 
@@ -84,12 +85,12 @@ metadata:
   labels:
     rbac.authorization.kubeflow.org/aggregate-to-kubeflow-view: "true"
 rules:
-- apiGroups:
-  - kubeflow.org
-  resources:
-  - tfjobs
-  - tfjobs/status
-  verbs:
-  - get
-  - list
-  - watch
+  - apiGroups:
+      - kubeflow.org
+    resources:
+      - tfjobs
+      - tfjobs/status
+    verbs:
+      - get
+      - list
+      - watch
diff --git a/dev-support/k8s/tfjob/operator/deployment.yaml 
b/dev-support/k8s/tfjob/operator/deployment.yaml
index bacd44e..411c73d 100644
--- a/dev-support/k8s/tfjob/operator/deployment.yaml
+++ b/dev-support/k8s/tfjob/operator/deployment.yaml
@@ -9,22 +9,23 @@ spec:
     metadata:
       labels:
         name: tf-job-operator
+      annotations:
+        sidecar.istio.io/inject: "false"
     spec:
       containers:
-      - command:
-        - /opt/kubeflow/tf-operator.v1
-        - --alsologtostderr
-        - -v=1
-        - --monitoring-port=8443
-        env:
-        - name: MY_POD_NAMESPACE
-          valueFrom:
-            fieldRef:
-              fieldPath: metadata.namespace
-        - name: MY_POD_NAME
-          valueFrom:
-            fieldRef:
-              fieldPath: metadata.name
-        image: 
gcr.io/kubeflow-images-public/tf_operator:kubeflow-tf-operator-postsubmit-v1-5adee6f-6109-a25c
-        name: tf-job-operator
+        - args:
+            - --alsologtostderr
+            - -v=1
+            - --monitoring-port=8443
+          env:
+            - name: MY_POD_NAMESPACE
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.namespace
+            - name: MY_POD_NAME
+              valueFrom:
+                fieldRef:
+                  fieldPath: metadata.name
+          image: gcr.io/kubeflow-images-public/tf_operator:v1.0.0-g92389064
+          name: tf-job-operator
       serviceAccountName: tf-job-operator
diff --git a/dev-support/k8s/tfjob/operator/kustomization.yaml 
b/dev-support/k8s/tfjob/operator/kustomization.yaml
index 86826d7..5be190f 100644
--- a/dev-support/k8s/tfjob/operator/kustomization.yaml
+++ b/dev-support/k8s/tfjob/operator/kustomization.yaml
@@ -1,6 +1,6 @@
 apiVersion: kustomize.config.k8s.io/v1beta1
 kind: Kustomization
-namespace: submarine
+namespace: default
 resources:
 - cluster-role-binding.yaml
 - cluster-role.yaml
@@ -12,4 +12,4 @@ commonLabels:
 images:
 - name: gcr.io/kubeflow-images-public/tf_operator
   newName: gcr.io/kubeflow-images-public/tf_operator
-  newTag: v0.7.0
+  newTag: v1.0.0-g92389064
diff --git a/docs/submarine-server/README.md b/docs/submarine-server/README.md
index 9f41e29..c1fc875 100644
--- a/docs/submarine-server/README.md
+++ b/docs/submarine-server/README.md
@@ -125,13 +125,12 @@ or
 
 For more info about the spec definition see 
[here](../design/submarine-server/jobspec.md).
 
-### Submit Job
-> Before submit training job, you should make sure you had deployed the 
[submarine server and tf-operator](./setup-kubernetes.md#setup-submarine).
+## Job Operation by REST API
+### Create Job
+`POST /api/v1/jobs`
 
-You can use the Postman to post the job to server or use `curl` with the 
following command:
-
-For Tensorflow Job:
-```bash
+**Example Request**
+```sh
 curl -X POST -H "Content-Type: application/json" -d '
 {
   "name": "mnist",
@@ -149,11 +148,6 @@ curl -X POST -H "Content-Type: application/json" -d '
     "namespace": "submarine"
   },
   "taskSpecs": {
-    "Ps": {
-      "name": "tensorflow",
-      "replicas": 1,
-      "resources": "cpu=1,memory=1024M"
-    },
     "Worker": {
       "name": "tensorflow",
       "replicas": 1,
@@ -161,19 +155,168 @@ curl -X POST -H "Content-Type: application/json" -d '
     }
   }
 }
-' http://127.0.0.1:8080/api/v1/jobs
+' http://127.0.0.1/api/v1/jobs
 ```
 
-For PyTorch Job:
-```bash
-curl -X POST -H "Content-Type: application/json" -d '
+**Example Response:**
+```sh
 {
-  "name": "pytorch-dist-mnist-gloo",
+    "status": "OK",
+    "code": 200,
+    "result": {
+        "jobId": "job_1586156073228_0005",
+        "name": "mnist",
+        "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+        "status": "Accepted",
+        "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+        "spec": {
+            "name": "mnist",
+            "librarySpec": {
+                "name": "TensorFlow",
+                "version": "2.1.0",
+                "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+                "cmd": "python /var/tf_mnist/mnist_with_summaries.py 
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+                "envVars": {
+                    "ENV_1": "ENV1"
+                }
+            },
+            "submitterSpec": {
+                "type": "k8s",
+                "namespace": "submarine"
+            },
+            "taskSpecs": {
+                "Worker": {
+                    "name": "tensorflow",
+                    "resources": "cpu=1,memory=1024M",
+                    "replicas": 1,
+                    "resourceMap": {
+                        "memory": "1024M",
+                        "cpu": "1"
+                    }
+                }
+            }
+        }
+    }
+}
+```
+
+### List Jobs
+`GET /api/v1/jobs`
+
+**Example Request:**
+```sh
+curl -X GET http://127.0.0.1/api/v1/jobs
+```
+
+**Example Response:**
+```sh
+{
+    "status": "OK",
+    "code": 200,
+    "result": [
+        {
+            "jobId": "job_1586156073228_0005",
+            "name": "mnist",
+            "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+            "status": "Created",
+            "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+            "createdTime": "2020-04-06T14:59:29.000+08:00",
+            "spec": {
+                "name": "mnist",
+                "librarySpec": {
+                    "name": "TensorFlow",
+                    "version": "2.1.0",
+                    "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+                    "cmd": "python /var/tf_mnist/mnist_with_summaries.py 
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+                    "envVars": {
+                        "ENV_1": "ENV1"
+                    }
+                },
+                "submitterSpec": {
+                    "type": "k8s",
+                    "namespace": "submarine"
+                },
+                "taskSpecs": {
+                    "Worker": {
+                        "name": "tensorflow",
+                        "resources": "cpu=1,memory=1024M",
+                        "replicas": 1,
+                        "resourceMap": {
+                            "memory": "1024M",
+                            "cpu": "1"
+                        }
+                    }
+                }
+            }
+        }
+    ]
+}
+```
+
+### Get Job
+`GET /api/v1/jobs/{id}`
+
+**Example Request:**
+```sh
+curl -X GET http://127.0.0.1/api/v1/jobs/job_1586156073228_0005
+```
+
+**Example Response:**
+```sh
+{
+    "status": "OK",
+    "code": 200,
+    "result": {
+        "jobId": "job_1586156073228_0005",
+        "name": "mnist",
+        "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+        "status": "Created",
+        "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+        "createdTime": "2020-04-06T14:59:29.000+08:00",
+        "spec": {
+            "name": "mnist",
+            "librarySpec": {
+                "name": "TensorFlow",
+                "version": "2.1.0",
+                "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+                "cmd": "python /var/tf_mnist/mnist_with_summaries.py 
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+                "envVars": {
+                    "ENV_1": "ENV1"
+                }
+            },
+            "submitterSpec": {
+                "type": "k8s",
+                "namespace": "submarine"
+            },
+            "taskSpecs": {
+                "Worker": {
+                    "name": "tensorflow",
+                    "resources": "cpu=1,memory=1024M",
+                    "replicas": 1,
+                    "resourceMap": {
+                        "memory": "1024M",
+                        "cpu": "1"
+                    }
+                }
+            }
+        }
+    }
+}
+```
+
+### Patch Job
+`PATCH /api/v1/jobs/{id}`
+
+**Example Request:**
+```sh
+curl -X PATCH -H "Content-Type: application/json" -d '
+{
+  "name": "mnist",
   "librarySpec": {
-    "name": "pytorch",
+    "name": "TensorFlow",
     "version": "2.1.0",
-    "image": "apache/submarine:pytorch-dist-mnist-1.0",
-    "cmd": "python /var/mnist.py --backend gloo",
+    "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+    "cmd": "python /var/tf_mnist/mnist_with_summaries.py --log_dir=/train/log 
--learning_rate=0.01 --batch_size=150",
     "envVars": {
       "ENV_1": "ENV1"
     }
@@ -183,50 +326,107 @@ curl -X POST -H "Content-Type: application/json" -d '
     "namespace": "submarine"
   },
   "taskSpecs": {
-    "Master": {
-      "name": "master",
-      "replicas": 1,
-      "resources": "cpu=1,memory=1024M"
-    },
     "Worker": {
-      "name": "worker",
-      "replicas": 1,
+      "name": "tensorflow",
+      "replicas": 2,
       "resources": "cpu=1,memory=1024M"
     }
   }
 }
-' http://127.0.0.1:8080/api/v1/jobs      
+' http://127.0.0.1/api/v1/jobs/job_1586156073228_0005
 ```
 
-### Verify Jobs
-You can run following command to get the submitted job:
-```
-kubectl get -n submarine tfjob
-kubectl get -n submarine pytorchjob
-```
-
-**Output:**
-```
-NAME    STATE     AGE
-mnist   Created   7m6s
+**Example Response:**
+```sh
+{
+    "status": "OK",
+    "code": 200,
+    "success": true,
+    "result": {
+        "jobId": "job_1586156073228_0005",
+        "name": "mnist",
+        "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+        "status": "Created",
+        "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+        "createdTime": "2020-04-06T14:59:29.000+08:00",
+        "spec": {
+            "name": "mnist",
+            "librarySpec": {
+                "name": "TensorFlow",
+                "version": "2.1.0",
+                "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+                "cmd": "python /var/tf_mnist/mnist_with_summaries.py 
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+                "envVars": {
+                    "ENV_1": "ENV1"
+                }
+            },
+            "submitterSpec": {
+                "type": "k8s",
+                "namespace": "submarine"
+            },
+            "taskSpecs": {
+                "Worker": {
+                    "name": "tensorflow",
+                    "resources": "cpu=1,memory=1024M",
+                    "replicas": 2,
+                    "resourceMap": {
+                        "memory": "1024M",
+                        "cpu": "1"
+                    }
+                }
+            }
+        }
+    }
+}
 ```
 
-Also you can find pods which running the jobs, run following command:
-```
-kubectl get -n submarine pods
-```
+### Delete Job
+`GET /api/v1/jobs/{id}`
 
-**Output:**
+**Example Request:**
+```sh
+curl -X DELETE http://127.0.0.1/api/v1/jobs/job_123_01
 ```
-NAME                               READY   STATUS              RESTARTS   AGE
-mnist-ps-0                         0/1     ContainerCreating   0          3m47s
-mnist-worker-0                     0/1     Pending             0          3m47s
-tf-job-operator-74cc6bd6cb-fqd5s   1/1     Running             0          98m
-```
-
 
-### Delete Jobs
-```bash
-kubectl -n submarine delete tfjob/mnist
-kubectl -n submarine delete pytorchjob/pytorch-dist-mnist-gloo
+**Example Response:**
+```sh
+{
+    "status": "OK",
+    "code": 200,
+    "result": {
+        "jobId": "job_1586156073228_0005",
+        "name": "mnist",
+        "uid": "28e39dcd-77d4-11ea-8dbb-0242ac110003",
+        "status": "Deleted",
+        "acceptedTime": "2020-04-06T14:59:29.000+08:00",
+        "createdTime": "2020-04-06T14:59:29.000+08:00",
+        "spec": {
+            "name": "mnist",
+            "librarySpec": {
+                "name": "TensorFlow",
+                "version": "2.1.0",
+                "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+                "cmd": "python /var/tf_mnist/mnist_with_summaries.py 
--log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+                "envVars": {
+                    "ENV_1": "ENV1"
+                }
+            },
+            "submitterSpec": {
+                "type": "k8s",
+                "namespace": "submarine"
+            },
+            "taskSpecs": {
+                "Worker": {
+                    "name": "tensorflow",
+                    "resources": "cpu=1,memory=1024M",
+                    "replicas": 1,
+                    "resourceMap": {
+                        "memory": "1024M",
+                        "cpu": "1"
+                    }
+                }
+            }
+        }
+    }
+}
 ```
diff --git a/submarine-cloud/hack/integration-test.sh 
b/submarine-cloud/hack/integration-test.sh
index d4da608..db39469 100755
--- a/submarine-cloud/hack/integration-test.sh
+++ b/submarine-cloud/hack/integration-test.sh
@@ -30,6 +30,7 @@ export KUBECONFIG=~/.kube/kind-config-${clusterName:-kind}
 
 function start() {
   $ROOT/hack/kind-cluster-build.sh
+  $SUBMARINE_HOME/dev-support/k8s/deploy-kubeflow-operators.sh -a
   $ROOT/hack/deploy-submarine.sh --test
 
   for((i=1;i<=30;i++)); do
diff --git a/submarine-cloud/manifests/submarine-cluster/rbac.yaml 
b/submarine-cloud/manifests/submarine-cluster/rbac.yaml
index eab1fa6..a8ecd37 100644
--- a/submarine-cloud/manifests/submarine-cluster/rbac.yaml
+++ b/submarine-cloud/manifests/submarine-cluster/rbac.yaml
@@ -29,6 +29,11 @@ items:
           - endpoints
           - pods
         verbs: ["list", "get"]
+      - apiGroups: ["kubeflow.org"]
+        resources:
+          - tfjobs
+          - pytorchjobs
+        verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
   - apiVersion: rbac.authorization.k8s.io/v1beta1
     kind: ClusterRoleBinding
     metadata:
diff --git a/submarine-cloud/manifests/submarine-cluster/submarine-server.yaml 
b/submarine-cloud/manifests/submarine-cluster/submarine-server.yaml
index b6cf9a0..76f1d73 100644
--- a/submarine-cloud/manifests/submarine-cluster/submarine-server.yaml
+++ b/submarine-cloud/manifests/submarine-cluster/submarine-server.yaml
@@ -28,7 +28,7 @@ spec:
       labels:
         app: cluster-test
     spec:
-      ServiceAccountName: "submarine-node"
+      serviceAccountName: "submarine-node"
       containers:
         - name: submarine-node
           image: "apache/submarine:server-0.4.0-SNAPSHOT"
diff --git 
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/exception/SubmarineRuntimeException.java
 
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/exception/SubmarineRuntimeException.java
index 207270d..0daacd3 100644
--- 
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/exception/SubmarineRuntimeException.java
+++ 
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/exception/SubmarineRuntimeException.java
@@ -22,7 +22,18 @@ package org.apache.submarine.commons.utils.exception;
 public class SubmarineRuntimeException extends RuntimeException {
   private static final long serialVersionUID = 7159777541471705348L;
 
+  private int code;
+
   public SubmarineRuntimeException(String message) {
     super(message);
   }
+
+  public SubmarineRuntimeException(int code, String message) {
+    super(message);
+    this.code = code;
+  }
+
+  public int getCode() {
+    return code;
+  }
 }
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
deleted file mode 100644
index 36e741e..0000000
--- 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.api;
-
-import org.apache.submarine.server.api.exception.InvalidSpecException;
-import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
-import org.apache.submarine.server.api.job.Job;
-import org.apache.submarine.server.api.spec.JobSpec;
-
-/**
- * Handle the job's operate (CRUD)
- */
-public interface JobHandler {
-  /**
-   * Submit job
-   * @param jobSpec job spec
-   * @return job object
-   * @throws UnsupportedJobTypeException caused by the unsupported job type
-   */
-  Job submitJob(JobSpec jobSpec) throws UnsupportedJobTypeException, 
InvalidSpecException;
-
-  /**
-   * Get job info
-   * @param jobSpec job spec
-   * @return job object
-   * @throws UnsupportedJobTypeException caused by the unsupported job type
-   */
-  default Job getJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
-    // TODO(submarine) should implementing later
-    return null;
-  }
-
-  /**
-   * Update job info
-   * @param jobSpec job spec
-   * @return job object
-   * @throws UnsupportedJobTypeException caused by the unsupported job type
-   */
-  default Job updateJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
-    // TODO(submarine) should implementing later
-    return null;
-  }
-
-  /**
-   * Delete the specified job
-   * @param jobSpec job spec
-   * @return job object
-   * @throws UnsupportedJobTypeException caused by the unsupported job type
-   */
-  default Job deleteJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
-    // TODO(submarine) should implementing later
-    return null;
-  }
-}
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
deleted file mode 100644
index 866fd74..0000000
--- 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.api.exception;
-
-public class UnsupportedJobTypeException extends Exception {
-  private static final long serialVersionUID = 4752254162145918312L;
-
-  public UnsupportedJobTypeException() {
-    super("Unsupported Job Type Exception");
-  }
-}
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
index 03068ae..443773a 100644
--- 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
@@ -19,28 +19,21 @@
 
 package org.apache.submarine.server.api.job;
 
+import org.apache.submarine.server.api.spec.JobSpec;
+
 /**
  * The Generic Machine Learning Job in Submarine.
  */
 public class Job {
   private JobId jobId;
   private String name;
-  private String identifier;
-
-  /**
-   * Get the job instance
-   * @param jobId job id
-   * @param name job name
-   * @param identifier identifier
-   * @return object
-   */
-  public static Job newInstance(JobId jobId, String name, String identifier) {
-    Job job = new Job();
-    job.setJobId(jobId);
-    job.setName(name);
-    job.setIdentifier(identifier);
-    return job;
-  }
+  private String uid;
+  private String status;
+  private String acceptedTime;
+  private String createdTime;
+  private String runningTime;
+  private String finishedTime;
+  private JobSpec spec;
 
   /**
    * Get the job id which is unique in submarine
@@ -81,16 +74,115 @@ public class Job {
    * In YARN cluster it best to set the ApplicationId, and in K8s cluster it 
maybe the job name.
    * @return the unique identifier
    */
-  public String getIdentifier() {
-    return identifier;
+  public String getUid() {
+    return uid;
   }
 
   /**
    * Set the job identifier, in YARN cluster it best to set the application 
id, and in K8s cluster
-   * it maybe the job name.
-   * @param identifier application id (YARN) or Job Name (K8s)
+   * it maybe the uid.
+   * @param uid application id (YARN) or uid (K8s)
    */
-  public void setIdentifier(String identifier) {
-    this.identifier = identifier;
+  public void setUid(String uid) {
+    this.uid = uid;
+  }
+
+  public String getStatus() {
+    return status;
+  }
+
+  public void setStatus(String status) {
+    this.status = status;
+  }
+
+  public String getAcceptedTime() {
+    return acceptedTime;
+  }
+
+  public void setAcceptedTime(String acceptedTime) {
+    this.acceptedTime = acceptedTime;
+  }
+
+  public String getCreatedTime() {
+    return createdTime;
+  }
+
+  public void setCreatedTime(String creatTime) {
+    this.createdTime = creatTime;
+  }
+
+  public String getRunningTime() {
+    return runningTime;
+  }
+
+  public void setRunningTime(String runningTime) {
+    this.runningTime = runningTime;
+  }
+
+  public String getFinishedTime() {
+    return finishedTime;
+  }
+
+  public void setFinishedTime(String finishedTime) {
+    this.finishedTime = finishedTime;
+  }
+
+  public JobSpec getSpec() {
+    return spec;
+  }
+
+  public void setSpec(JobSpec spec) {
+    this.spec = spec;
+  }
+
+  public enum Status {
+    STATUS_ACCEPTED("Accepted"),
+    STATUS_CREATED("Created"),
+    STATUS_RUNNING("Running"),
+    STATUS_SUCCEEDED("Succeeded"),
+    STATUS_DELETED("Deleted");
+
+    private String value;
+    Status(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    @Override
+    public String toString() {
+      return value;
+    }
+  }
+
+  public void rebuild(Job job) {
+    if (job != null) {
+      if (job.getName() != null) {
+        this.setName(job.getName());
+      }
+      if (job.getUid() != null) {
+        this.setUid(job.getUid());
+      }
+      if (job.getSpec() != null) {
+        this.setSpec(job.getSpec());
+      }
+      if (job.getStatus() != null) {
+        this.setStatus(job.getStatus());
+      }
+      if (job.getAcceptedTime() != null) {
+        this.setAcceptedTime(job.getAcceptedTime());
+      }
+      if (job.getCreatedTime() != null) {
+        this.setCreatedTime(job.getCreatedTime());
+      }
+      if (job.getRunningTime() != null) {
+        this.setRunningTime(job.getRunningTime());
+      }
+      if (job.getFinishedTime() != null) {
+        this.setFinishedTime(job.getFinishedTime());
+      }
+    }
   }
 }
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
index 7026ed0..2a48329 100644
--- 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
@@ -116,7 +116,10 @@ public class JobId implements Comparable<JobId> {
       return true;
     }
     JobId other = (JobId) obj;
-    return this.getId() != other.getId();
+    if (this.getServerTimestamp() != other.getServerTimestamp()) {
+      return false;
+    }
+    return this.getId() == other.getId();
   }
 
   @Override
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
similarity index 53%
rename from 
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
rename to 
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
index 8e455bb..46bef31 100644
--- 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
@@ -17,15 +17,16 @@
  * under the License.
  */
 
-package org.apache.submarine.server.api;
+package org.apache.submarine.server.api.job;
 
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.api.spec.JobSpec;
 
 /**
  * The submitter should implement this interface.
  */
-public interface JobSubmitter extends JobHandler {
-
+public interface JobSubmitter {
   /**
    * Initialize the submitter related code
    */
@@ -38,4 +39,35 @@ public interface JobSubmitter extends JobHandler {
    */
   String getSubmitterType();
 
+  /**
+   * Create job with job spec
+   * @param jobSpec job spec
+   * @return object
+   * @throws SubmarineRuntimeException running error
+   */
+  Job createJob(JobSpec jobSpec) throws SubmarineRuntimeException;
+
+  /**
+   * Find job by job spec
+   * @param jobSpec job spec
+   * @return object
+   * @throws SubmarineRuntimeException running error
+   */
+  Job findJob(JobSpec jobSpec) throws SubmarineRuntimeException;
+
+  /**
+   * Patch job with job spec
+   * @param jobSpec job spec
+   * @return object
+   * @throws SubmarineRuntimeException running error
+   */
+  Job patchJob(JobSpec jobSpec) throws SubmarineRuntimeException;
+
+  /**
+   * Delete job by job spec
+   * @param jobSpec job spec
+   * @return object
+   * @throws SubmarineRuntimeException running error
+   */
+  Job deleteJob(JobSpec jobSpec) throws SubmarineRuntimeException;
 }
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
deleted file mode 100644
index f475ed4..0000000
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server;
-
-import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.submarine.server.api.JobHandler;
-import org.apache.submarine.server.api.JobSubmitter;
-import org.apache.submarine.server.api.exception.InvalidSpecException;
-import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
-import org.apache.submarine.server.api.job.Job;
-import org.apache.submarine.server.api.job.JobId;
-import org.apache.submarine.server.api.spec.JobSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * It responsible for manage the job (CRUD) and cached the job.
- */
-public class JobManager implements JobHandler {
-  private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
-  private static volatile JobManager manager;
-
-  private final AtomicInteger jobCounter = new AtomicInteger(0);
-
-  private final ConcurrentMap<JobId, Job> jobs = new ConcurrentHashMap<>();
-
-  private SubmitterManager submitterManager;
-  private ExecutorService executorService;
-
-  /**
-   * Get the singleton instance
-   * @return object
-   */
-  public static JobManager getInstance() {
-    if (manager == null) {
-      synchronized (JobManager.class) {
-        if (manager == null) {
-          SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
-          SubmitterManager submitterManager = new SubmitterManager(conf);
-          manager = new JobManager(submitterManager);
-        }
-      }
-    }
-    return manager;
-  }
-
-  private JobManager(SubmitterManager submitterManager) {
-    this.submitterManager = submitterManager;
-    this.executorService = Executors.newFixedThreadPool(50);
-  }
-
-  @Override
-  public Job submitJob(JobSpec spec) throws UnsupportedJobTypeException {
-    if (!spec.validate()) {
-      return null;
-    }
-
-    JobSubmitter submitter = submitterManager.getSubmitterByType(
-        spec.getSubmitterSpec().getType());
-    if (submitter == null) {
-      throw new UnsupportedJobTypeException();
-    }
-
-    Job job = new Job();
-    job.setJobId(generateJobId());
-    executorService.submit(() -> {
-      try {
-        jobs.putIfAbsent(job.getJobId(), submitter.submitJob(spec));
-      } catch (UnsupportedJobTypeException e) {
-        LOG.error(e.getMessage(), e);
-      } catch (InvalidSpecException e) {
-        LOG.error("Invalid job spec: " + spec + ", " + e.getMessage());
-      }
-    });
-    return job;
-  }
-
-  private JobId generateJobId() {
-    return JobId.newInstance(SubmarineServer.getServerTimeStamp(), 
jobCounter.incrementAndGet());
-  }
-}
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
index b6a0a0e..f955eeb 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
@@ -19,6 +19,7 @@
 package org.apache.submarine.server;
 
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.submarine.server.rest.provider.YamlEntityProvider;
 import org.apache.submarine.server.rpc.SubmarineRpcServer;
 import org.apache.submarine.server.workbench.websocket.NotebookServer;
 import org.apache.submarine.commons.cluster.ClusterServer;
@@ -126,9 +127,9 @@ public class SubmarineServer extends ResourceConfig {
   @Inject
   public SubmarineServer() {
     packages("org.apache.submarine.server.workbench.rest",
-             "org.apache.submarine.server.jobserver.rest",
              "org.apache.submarine.server.rest"
     );
+    register(YamlEntityProvider.class);
   }
 
   private static void startServer() throws InterruptedException {
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
index 36e3d6c..77cc524 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
@@ -20,7 +20,7 @@
 package org.apache.submarine.server;
 
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.submarine.server.api.JobSubmitter;
+import org.apache.submarine.server.api.job.JobSubmitter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
new file mode 100644
index 0000000..df346a0
--- /dev/null
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.job;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.SubmarineServer;
+import org.apache.submarine.server.SubmitterManager;
+import org.apache.submarine.server.api.job.JobSubmitter;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobId;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * It's responsible for managing the jobs CRUD and cache them
+ */
+public class JobManager {
+  private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
+
+  private static volatile JobManager manager;
+
+  private final AtomicInteger jobCounter = new AtomicInteger(0);
+
+  /**
+   * Used to cache the specs by the job id.
+   *  key: the string of job id
+   *  value: Job object
+   */
+  private final ConcurrentMap<String, Job> cachedJobMap = new 
ConcurrentHashMap<>();
+
+  private SubmitterManager submitterManager;
+
+  /**
+   * Get the singleton instance
+   * @return object
+   */
+  public static JobManager getInstance() {
+    if (manager == null) {
+      synchronized (JobManager.class) {
+        if (manager == null) {
+          SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
+          SubmitterManager submitterManager = new SubmitterManager(conf);
+          manager = new JobManager(submitterManager);
+        }
+      }
+    }
+    return manager;
+  }
+
+  private JobManager(SubmitterManager submitterManager) {
+    this.submitterManager = submitterManager;
+  }
+
+  /**
+   * Create job
+   * @param spec job spec
+   * @return object
+   * @throws SubmarineRuntimeException the service error
+   */
+  public Job createJob(JobSpec spec) throws SubmarineRuntimeException {
+    checkSpec(spec);
+    JobSubmitter submitter = getSubmitter(spec.getSubmitterSpec().getType());
+    Job job = submitter.createJob(spec);
+    job.setJobId(generateJobId());
+    job.setSpec(spec);
+    cachedJobMap.putIfAbsent(job.getJobId().toString(), job);
+    return job;
+  }
+
+  private JobId generateJobId() {
+    return JobId.newInstance(SubmarineServer.getServerTimeStamp(), 
jobCounter.incrementAndGet());
+  }
+
+  /**
+   * Get job
+   * @param id job id
+   * @return object
+   * @throws SubmarineRuntimeException the service error
+   */
+  public Job getJob(String id) throws SubmarineRuntimeException {
+    checkJobId(id);
+    Job job = cachedJobMap.get(id);
+    JobSpec spec = job.getSpec();
+    Job patchJob = 
getSubmitter(spec.getSubmitterSpec().getType()).findJob(spec);
+    job.rebuild(patchJob);
+    return job;
+  }
+
+  /**
+   * List jobs
+   * @param status job status, if null will return all jobs
+   * @return job list
+   * @throws SubmarineRuntimeException the service error
+   */
+  public List<Job> listJobsByStatus(String status) throws 
SubmarineRuntimeException {
+    List<Job> jobList = new ArrayList<>();
+    for (Map.Entry<String, Job> entry : cachedJobMap.entrySet()) {
+      Job job = entry.getValue();
+      JobSpec spec = job.getSpec();
+      JobSubmitter submitter = getSubmitter(spec.getSubmitterSpec().getType());
+      Job patchJob = submitter.findJob(spec);
+      LOG.info("Found job: {}", patchJob.getStatus());
+      if (status == null || 
status.toLowerCase().equals(patchJob.getStatus().toLowerCase())) {
+        job.rebuild(patchJob);
+        jobList.add(job);
+      }
+    }
+    LOG.info("List job: {}", jobList.size());
+    return jobList;
+  }
+
+  /**
+   * Patch the job
+   * @param id job id
+   * @param spec job spec
+   * @return object
+   * @throws SubmarineRuntimeException the service error
+   */
+  public Job patchJob(String id, JobSpec spec) throws 
SubmarineRuntimeException {
+    checkJobId(id);
+    checkSpec(spec);
+    Job job = cachedJobMap.get(id);
+    JobSubmitter submitter = getSubmitter(spec.getSubmitterSpec().getType());
+    Job patchJob = submitter.patchJob(spec);
+    job.setSpec(spec);
+    job.rebuild(patchJob);
+    return job;
+  }
+
+  /**
+   * Delete job
+   * @param id job id
+   * @return object
+   * @throws SubmarineRuntimeException the service error
+   */
+  public Job deleteJob(String id) throws SubmarineRuntimeException {
+    checkJobId(id);
+    Job job = cachedJobMap.remove(id);
+    JobSpec spec = job.getSpec();
+    Job patchJob = 
getSubmitter(spec.getSubmitterSpec().getType()).deleteJob(spec);
+    job.rebuild(patchJob);
+    return job;
+  }
+
+  private JobSubmitter getSubmitter(String type) throws 
SubmarineRuntimeException {
+    JobSubmitter submitter = submitterManager.getSubmitterByType(type);
+    if (submitter == null) {
+      throw new 
SubmarineRuntimeException(Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+          "Invalid submitter.");
+    }
+    return submitter;
+  }
+
+  private void checkSpec(JobSpec spec) throws SubmarineRuntimeException {
+    if (spec == null) {
+      throw new SubmarineRuntimeException(Status.OK.getStatusCode(), "Invalid 
job spec.");
+    }
+  }
+
+  private void checkJobId(String id) throws SubmarineRuntimeException {
+    JobId jobId = JobId.fromString(id);
+    if (jobId == null || !cachedJobMap.containsKey(id)) {
+      throw new SubmarineRuntimeException(Status.NOT_FOUND.getStatusCode(), 
"Not found job.");
+    }
+  }
+}
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/response/JsonResponse.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/response/JsonResponse.java
index 72d4c2b..f389599 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/response/JsonResponse.java
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/response/JsonResponse.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.core.NewCookie;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import java.util.ArrayList;
 import java.util.Date;
@@ -59,13 +60,17 @@ public class JsonResponse<T> {
   private static final String CGLIB_PROPERTY_PREFIX = "\\$cglib_prop_";
 
   private JsonResponse(Builder<T> builder) {
-    this.status = builder.status;
     this.code = builder.code;
     this.success = builder.success;
     this.message = builder.message;
     this.attributes = builder.attributes;
     this.result = (T) builder.result;
     this.cookies = builder.cookies;
+    if (builder.status != null) {
+      this.status = builder.status;
+    } else {
+      status = Response.Status.fromStatusCode(this.code);
+    }
   }
 
   public T getResult() {
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
index 551cfc8..e3511ec 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
@@ -19,32 +19,32 @@
 
 package org.apache.submarine.server.rest;
 
-import org.apache.submarine.server.JobManager;
-import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
-import org.apache.submarine.server.api.job.Job;
-import org.apache.submarine.server.api.job.JobId;
-import org.apache.submarine.server.api.spec.JobSpec;
-import org.apache.submarine.server.response.JsonResponse;
-
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
+import javax.ws.rs.PATCH;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+import org.apache.submarine.server.job.JobManager;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.apache.submarine.server.response.JsonResponse;
+
 /**
- * {@link JobManager}'s REST API v1. It can accept {@link JobSpec} to create a 
job.
+ * Job Service REST API v1. It can accept {@link JobSpec} to create a job.
  */
 @Path(RestConstants.V1 + "/" + RestConstants.JOBS)
 @Produces({MediaType.APPLICATION_JSON + "; " + RestConstants.CHARSET_UTF8})
 public class JobManagerRestApi {
-
+  private final JobManager jobManager = JobManager.getInstance();
   /**
    * Return the Pong message for test the connectivity
    * @return Pong message
@@ -59,24 +59,17 @@ public class JobManagerRestApi {
 
   /**
    * Returns the contents of {@link Job} that submitted by user.
-   * @param jobSpec job spec
+   * @param spec job spec
    * @return the contents of job
    */
   @POST
   @Consumes({RestConstants.MEDIA_TYPE_YAML, MediaType.APPLICATION_JSON})
-  public Response submitJob(JobSpec jobSpec) {
-    if (!jobSpec.validate()) {
-      return new JsonResponse.Builder<String>(Response.Status.ACCEPTED)
-          .success(false).result("Invalid params.").build();
-    }
-
+  public Response createJob(JobSpec spec) {
     try {
-      Job job = JobManager.getInstance().submitJob(jobSpec);
-      return new JsonResponse.Builder<Job>(Response.Status.OK)
-          .success(true).result(job).build();
-    } catch (UnsupportedJobTypeException e) {
-      return new JsonResponse.Builder<String>(Response.Status.ACCEPTED)
-          .success(false).result(e.getMessage()).build();
+      Job job = jobManager.createJob(spec);
+      return new 
JsonResponse.Builder<Job>(Response.Status.OK).result(job).build();
+    } catch (SubmarineRuntimeException e) {
+      return parseJobServiceException(e);
     }
   }
 
@@ -85,10 +78,13 @@ public class JobManagerRestApi {
    * @return job list
    */
   @GET
-  public Response listJob() {
-    // TODO(jiwq): Hook JobManager when 0.4.0 released
-    return new JsonResponse.Builder<List<Job>>(Response.Status.OK)
-        .success(true).result(new ArrayList<>()).build();
+  public Response listJob(@QueryParam("status") String status) {
+    try {
+      List<Job> jobList = jobManager.listJobsByStatus(status);
+      return new 
JsonResponse.Builder<List<Job>>(Response.Status.OK).result(jobList).build();
+    } catch (SubmarineRuntimeException e) {
+      return parseJobServiceException(e);
+    }
   }
 
   /**
@@ -97,13 +93,27 @@ public class JobManagerRestApi {
    * @return the detailed info of job
    */
   @GET
-  @Path("{" + RestConstants.JOB_ID + "}")
+  @Path("/{id}")
   public Response getJob(@PathParam(RestConstants.JOB_ID) String id) {
-    // TODO(jiwq): Hook JobManager when 0.4.0 released
-    Job job = new Job();
-    job.setJobId(JobId.fromString(id));
-    return new JsonResponse.Builder<Job>(Response.Status.OK)
-        .success(true).result(job).build();
+    try {
+      Job job = jobManager.getJob(id);
+      return new 
JsonResponse.Builder<Job>(Response.Status.OK).result(job).build();
+    } catch (SubmarineRuntimeException e) {
+      return parseJobServiceException(e);
+    }
+  }
+
+  @PATCH
+  @Path("/{id}")
+  @Consumes({RestConstants.MEDIA_TYPE_YAML, MediaType.APPLICATION_JSON})
+  public Response patchJob(@PathParam(RestConstants.JOB_ID) String id, JobSpec 
spec) {
+    try {
+      Job job = jobManager.patchJob(id, spec);
+      return new JsonResponse.Builder<Job>(Response.Status.OK).success(true)
+          .result(job).build();
+    } catch (SubmarineRuntimeException e) {
+      return parseJobServiceException(e);
+    }
   }
 
   /**
@@ -112,10 +122,18 @@ public class JobManagerRestApi {
    * @return the detailed info about deleted job
    */
   @DELETE
-  @Path("{" + RestConstants.JOB_ID + "}")
+  @Path("/{id}")
   public Response deleteJob(@PathParam(RestConstants.JOB_ID) String id) {
-    // TODO(jiwq): Hook JobManager when 0.4.0 released
-    return new JsonResponse.Builder<Job>(Response.Status.OK)
-        .success(true).result(new Job()).build();
+    try {
+      Job job = jobManager.deleteJob(id);
+      return new JsonResponse.Builder<Job>(Response.Status.OK)
+          .result(job).build();
+    } catch (SubmarineRuntimeException e) {
+      return parseJobServiceException(e);
+    }
+  }
+
+  private Response parseJobServiceException(SubmarineRuntimeException e) {
+    return new 
JsonResponse.Builder<String>(e.getCode()).message(e.getMessage()).build();
   }
 }
diff --git 
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
 
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
index 33ad4bb..ead3c4a 100644
--- 
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
+++ 
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
@@ -176,14 +176,23 @@ public abstract class AbstractSubmarineServerTest {
     return httpPost(path, body, StringUtils.EMPTY, StringUtils.EMPTY);
   }
 
+  protected static PostMethod httpPost(String path, String body, String 
mediaType)
+      throws IOException {
+    return httpPost(path, body, mediaType, StringUtils.EMPTY, 
StringUtils.EMPTY);
+  }
+
   protected static PostMethod httpPost(String path, String request, String 
user, String pwd)
       throws IOException {
+    return httpPost(path, request, MediaType.APPLICATION_JSON, user, pwd);
+  }
+  protected static PostMethod httpPost(String path, String request, String 
mediaType, String user,
+      String pwd) throws IOException {
     LOG.info("Connecting to {}", URL + path);
 
     HttpClient httpClient = new HttpClient();
     PostMethod postMethod = new PostMethod(URL + path);
     postMethod.setRequestBody(request);
-    postMethod.setRequestHeader("Content-type", MediaType.APPLICATION_JSON);
+    postMethod.setRequestHeader("Content-type", mediaType);
     postMethod.getParams().setCookiePolicy(CookiePolicy.IGNORE_COOKIES);
 
     if (userAndPasswordAreNotBlank(user, pwd)) {
diff --git 
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerTest.java
 
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerTest.java
index fa7f5cf..466e225 100644
--- 
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerTest.java
+++ 
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/SubmarineServerTest.java
@@ -19,12 +19,9 @@
 package org.apache.submarine.server;
 
 import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.submarine.server.rest.JobManagerRestApiTest;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -32,7 +29,6 @@ import java.util.ArrayList;
 import static org.junit.Assert.assertFalse;
 
 public class SubmarineServerTest extends AbstractSubmarineServerTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerRestApiTest.class);
 
   @BeforeClass
   public static void init() throws Exception {
diff --git 
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/rest/JobManagerRestApiTest.java
 
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/rest/JobManagerRestApiTest.java
deleted file mode 100644
index 52d52b7..0000000
--- 
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/rest/JobManagerRestApiTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.rest;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-import org.apache.commons.httpclient.methods.DeleteMethod;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.io.FileUtils;
-import org.apache.submarine.server.AbstractSubmarineServerTest;
-import org.apache.submarine.server.api.job.Job;
-import org.apache.submarine.server.api.job.JobId;
-import org.apache.submarine.server.json.JobIdDeserializer;
-import org.apache.submarine.server.json.JobIdSerializer;
-import org.apache.submarine.server.response.JsonResponse;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.Assert.assertEquals;
-
-public class JobManagerRestApiTest extends AbstractSubmarineServerTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerRestApiTest.class);
-
-  @BeforeClass
-  public static void init() throws Exception {
-    
AbstractSubmarineServerTest.startUp(JobManagerRestApiTest.class.getSimpleName());
-  }
-
-  @AfterClass
-  public static void destroy() throws Exception {
-    AbstractSubmarineServerTest.shutDown();
-  }
-
-  @Test
-  public void testJobServerPing() throws IOException {
-    GetMethod response = httpGet("/api/" + RestConstants.V1 + "/"
-        + RestConstants.JOBS + "/" + RestConstants.PING);
-    LOG.info(response.toString());
-
-    String requestBody = response.getResponseBodyAsString();
-    LOG.info(requestBody);
-
-    Gson gson = new Gson();
-    JsonResponse jsonResponse = gson.fromJson(requestBody, JsonResponse.class);
-    assertEquals("Response code should be 200 ",
-        Response.Status.OK.getStatusCode(), jsonResponse.getCode());
-    assertEquals("Response result should be Pong", "Pong",
-        jsonResponse.getResult().toString());
-  }
-
-  // Test job created with correct JSON input
-  @Test
-  public void testCreateJobWhenJsonInputIsCorrectThenResponseCodeAccepted() 
throws Exception {
-    URL fileUrl = this.getClass().getResource("/tf-mnist-req.json");
-    String jobSpec = FileUtils.readFileToString(new File(fileUrl.toURI()), 
StandardCharsets.UTF_8);
-
-    PostMethod response = httpPost("/api/" + RestConstants.V1 + "/" + 
RestConstants.JOBS, jobSpec);
-    LOG.info(response.toString());
-
-    String responseBodyAsString = response.getResponseBodyAsString();
-    LOG.info(responseBodyAsString);
-
-    Gson gson = new Gson();
-    JsonResponse jsonResponse = gson.fromJson(responseBodyAsString, 
JsonResponse.class);
-    assertEquals("Response code should be 202 ",
-        Response.Status.ACCEPTED.getStatusCode(), jsonResponse.getCode());
-  }
-
-  // Test job created with incorrect JSON input
-  @Test
-  public void testCreateJobWhenJsonInputIsWrongThenResponseCodeBadRequest() 
throws IOException {
-    String jobSpec = "{\"ttype\": \"tensorflow\", \"version\":\"v1.13\"}";
-
-    PostMethod response = httpPost("/api/" + RestConstants.V1 + "/" + 
RestConstants.JOBS, jobSpec);
-
-    assertEquals("Http Response should be 400 ",
-        Response.Status.BAD_REQUEST.getStatusCode(), response.getStatusCode());
-  }
-
-  // Test get job list
-  @Test
-  public void testGetJobList() throws IOException {
-    GetMethod response = httpGet("/api/" + RestConstants.V1 + "/" + 
RestConstants.JOBS);
-    LOG.info(response.toString());
-
-    String requestBody = response.getResponseBodyAsString();
-    LOG.info(requestBody);
-
-    Gson gson = new Gson();
-    JsonResponse jsonResponse = gson.fromJson(requestBody, JsonResponse.class);
-    assertEquals("Response code should be 200 ",
-        Response.Status.OK.getStatusCode(), jsonResponse.getCode());
-  }
-
-  // Test get job by id
-  @Test
-  public void testGetJobById() throws IOException {
-    String jobId = "job_1577810970_0001";
-    GetMethod response = httpGet("/api/" + RestConstants.V1 + "/"
-        + RestConstants.JOBS + "/" + jobId);
-    LOG.info(response.toString());
-
-    String responseBodyAsString = response.getResponseBodyAsString();
-    LOG.info(responseBodyAsString);
-
-    Gson gson = new GsonBuilder()
-        .registerTypeAdapter(JobId.class, new JobIdSerializer())
-        .registerTypeAdapter(JobId.class, new JobIdDeserializer())
-        .create();
-    JsonResponse<Job> jsonResponse = gson.fromJson(responseBodyAsString,
-        new TypeToken<JsonResponse<Job>>(){}.getType());
-    assertEquals("Response code should be 200 ",
-        Response.Status.OK.getStatusCode(), jsonResponse.getCode());
-    assertEquals("Job id should be " + jobId, jobId,
-        jsonResponse.getResult().getJobId().toString());
-  }
-
-  // Test delete job by id
-  @Test
-  public void testDeleteJobById() throws IOException {
-    String jobId = "job1";
-    DeleteMethod response = httpDelete("/api/" + RestConstants.V1 + "/"
-        + RestConstants.JOBS + "/" + jobId);
-    LOG.info(response.toString());
-
-    String requestBody = response.getResponseBodyAsString();
-    LOG.info(requestBody);
-
-    Gson gson = new Gson();
-    JsonResponse jsonResponse = gson.fromJson(requestBody, JsonResponse.class);
-    assertEquals("Response code should be 200 ",
-        Response.Status.OK.getStatusCode(), jsonResponse.getCode());
-  }
-
-  /**
-   * FiXME. The manual YAML test with postman works but failed here.
-   * We need to figure out why the YAML entity provider not work in this test.
-   * */
-  @Test
-  public void testCreateJobWhenYamlInputIsCorrectThenResponseCodeAccepted() 
throws IOException {
-//    Client client = ClientBuilder.newBuilder()
-//        .register(new YamlEntityProvider<>()).build();
-//    this.setClient(client);
-//    String jobSpec = "type: tf";
-//    Response response = target(RestConstants.V1 + "/"
-//        + RestConstants.JOBS + "/" + "test")
-//        .request()
-//        .put(Entity.entity(jobSpec, "application/yaml"));
-//
-//    assertEquals("Http Response should be 202 ",
-//        Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
-  }
-}
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
deleted file mode 100644
index 4e61010..0000000
--- 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.submitter.k8s;
-
-/**
- * Job request for Kubernetes Submitter.
- */
-public class K8sJobRequest {
-  private Path path;
-  private Object body;
-  private String jobName;
-
-  public K8sJobRequest(Path path, Object body) {
-    this(path, body, null);
-  }
-
-  public K8sJobRequest(Path path, Object body, String jobName) {
-    this.path = path;
-    this.body = body;
-    this.jobName = jobName;
-  }
-
-  public void setPath(Path path) {
-    this.path = path;
-  }
-
-  public Path getPath() {
-    return path;
-  }
-
-  public void setBody(Object body) {
-    this.body = body;
-  }
-
-  public Object getBody() {
-    return body;
-  }
-
-  public String getJobName() {
-    return jobName;
-  }
-
-  static class Path {
-    private String group;
-    private String apiVersion;
-    private String namespace;
-    private String plural;
-
-    Path(String group, String apiVersion, String namespace, String plural) {
-      this.group = group;
-      this.apiVersion = apiVersion;
-      this.namespace = namespace;
-      this.plural = plural;
-    }
-
-    public String getGroup() {
-      return group;
-    }
-
-    public String getApiVersion() {
-      return apiVersion;
-    }
-
-    public String getNamespace() {
-      return namespace;
-    }
-
-    public String getPlural() {
-      return plural;
-    }
-  }
-}
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
index 43387c9..37e0e86 100644
--- 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
@@ -19,33 +19,33 @@
 
 package org.apache.submarine.server.submitter.k8s;
 
+import java.io.FileReader;
+import java.io.IOException;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
 import io.kubernetes.client.ApiClient;
 import io.kubernetes.client.ApiException;
 import io.kubernetes.client.Configuration;
+import io.kubernetes.client.JSON;
 import io.kubernetes.client.apis.CustomObjectsApi;
-import io.kubernetes.client.models.V1DeleteOptions;
-import io.kubernetes.client.models.V1DeleteOptionsBuilder;
+import io.kubernetes.client.models.V1Status;
 import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
 import org.apache.submarine.commons.utils.SubmarineConfVars;
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
 import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
-import org.apache.submarine.server.api.JobSubmitter;
 import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.job.JobSubmitter;
 import org.apache.submarine.server.api.job.Job;
 import org.apache.submarine.server.api.spec.JobSpec;
-import org.apache.submarine.server.submitter.k8s.model.CustomResourceJob;
-import org.apache.submarine.server.submitter.k8s.model.CustomResourceJobList;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
 import org.apache.submarine.server.submitter.k8s.model.MLJob;
 import org.apache.submarine.server.submitter.k8s.parser.JobSpecParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileReader;
-import java.io.IOException;
-
 /**
  * JobSubmitter for Kubernetes Cluster.
  */
@@ -59,6 +59,7 @@ public class K8sJobSubmitter implements JobSubmitter {
 
   public K8sJobSubmitter() {}
 
+  @VisibleForTesting
   public K8sJobSubmitter(String confPath) {
     this.confPath = confPath;
   }
@@ -90,7 +91,8 @@ public class K8sJobSubmitter implements JobSubmitter {
         ApiClient client = ClientBuilder.cluster().build();
         Configuration.setDefaultApiClient(client);
       } catch (IOException e1) {
-        throw new SubmarineRuntimeException("Failed to initialize k8s client");
+        LOG.error("Initialize K8s submitter failed. " + e.getMessage(), e1);
+        throw new SubmarineRuntimeException(500, "Initialize K8s submitter 
failed.");
       }
     }
   }
@@ -101,98 +103,91 @@ public class K8sJobSubmitter implements JobSubmitter {
   }
 
   @Override
-  public Job submitJob(JobSpec jobSpec)
-      throws InvalidSpecException {
-    Job job = null;
-
-    boolean success = createJob(JobSpecParser.parseJob(jobSpec));
-    if (success) {
-      job = new Job();
-      job.setName(jobSpec.getName());
-    } else {
-      LOG.error("Failed to create job." + jobSpec.toString());
+  public Job createJob(JobSpec jobSpec) throws SubmarineRuntimeException {
+    Job job;
+    try {
+      MLJob mlJob = JobSpecParser.parseJob(jobSpec);
+      Object object = api.createNamespacedCustomObject(mlJob.getGroup(), 
mlJob.getVersion(),
+          mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob, 
"true");
+      job = parseResponseObject(object, ParseOp.PARSE_OP_RESULT);
+    } catch (InvalidSpecException e) {
+      throw new SubmarineRuntimeException(200, e.getMessage());
+    } catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
     }
     return job;
   }
 
-  @VisibleForTesting
-  boolean createJob(MLJob job) {
+  @Override
+  public Job findJob(JobSpec jobSpec) throws SubmarineRuntimeException {
+    Job job;
     try {
-      api.createNamespacedCustomObject(job.getGroup(), job.getVersion(),
-          job.getMetadata().getNamespace(), job.getPlural(),
-          job, "true");
+      MLJob mlJob = JobSpecParser.parseJob(jobSpec);
+      Object object = api.getNamespacedCustomObject(mlJob.getGroup(), 
mlJob.getVersion(),
+          mlJob.getMetadata().getNamespace(), mlJob.getPlural(), 
mlJob.getMetadata().getName());
+      job = parseResponseObject(object, ParseOp.PARSE_OP_RESULT);
+    } catch (InvalidSpecException e) {
+      throw new SubmarineRuntimeException(200, e.getMessage());
     } catch (ApiException e) {
-      LOG.error("Failed to create job. " + e.getMessage(), e);
-      return false;
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
     }
-    return true;
+    return job;
   }
 
-  @VisibleForTesting
-  CustomResourceJob createCustomJob(K8sJobRequest request) {
+  @Override
+  public Job patchJob(JobSpec jobSpec) throws SubmarineRuntimeException {
+    Job job;
     try {
-      K8sJobRequest.Path path = request.getPath();
-      Object o = api.createNamespacedCustomObject(path.getGroup(),
-          path.getApiVersion(), path.getNamespace(), path.getPlural(),
-          request.getBody(), "true");
-      Gson gson = new Gson();
-      return gson.fromJson(gson.toJson(o), CustomResourceJob.class);
-    } catch (ApiException ae) {
-      LOG.error("Exceptions when creating CRD job: " + ae.getMessage(), ae);
+      MLJob mlJob = JobSpecParser.parseJob(jobSpec);
+      Object object = api.patchNamespacedCustomObject(mlJob.getGroup(), 
mlJob.getVersion(),
+          mlJob.getMetadata().getNamespace(), mlJob.getPlural(), 
mlJob.getMetadata().getName(),
+          mlJob);
+      job = parseResponseObject(object, ParseOp.PARSE_OP_RESULT);
+    } catch (InvalidSpecException e) {
+      throw new SubmarineRuntimeException(200, e.getMessage());
+    } catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
     }
-    return null;
+    return job;
   }
 
-  @VisibleForTesting
-  CustomResourceJob getCustomResourceJob(K8sJobRequest request) {
+  @Override
+  public Job deleteJob(JobSpec jobSpec) throws SubmarineRuntimeException {
+    Job job;
     try {
-      K8sJobRequest.Path path = request.getPath();
-      Object o = api.getNamespacedCustomObject(path.getGroup(),
-          path.getApiVersion(),
-          path.getNamespace(), path.getPlural(), request.getJobName());
-      Gson gson = new Gson();
-      return gson.fromJson(gson.toJson(o), CustomResourceJob.class);
-    } catch (ApiException ae) {
-      // The API getNamespacedCustomObject throws exception when cannot found 
resource
-      // So the ApiException  seems not a big issue
-      LOG.warn("Exceptions when getting CRD job: " + ae.getMessage());
+      MLJob mlJob = JobSpecParser.parseJob(jobSpec);
+      Object object = api.deleteNamespacedCustomObject(mlJob.getGroup(), 
mlJob.getVersion(),
+          mlJob.getMetadata().getNamespace(), mlJob.getPlural(), 
mlJob.getMetadata().getName(),
+          MLJobConverter.toDeleteOptionsFromMLJob(mlJob), null, null, null);
+      job = parseResponseObject(object, ParseOp.PARSE_OP_DELETE);
+    } catch (InvalidSpecException e) {
+      throw new SubmarineRuntimeException(200, e.getMessage());
+    } catch (ApiException e) {
+      throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
     }
-    return null;
+    return job;
   }
 
-  @VisibleForTesting
-  CustomResourceJob deleteCustomResourceJob(K8sJobRequest request) {
+  private Job parseResponseObject(Object object, ParseOp op) throws 
SubmarineRuntimeException {
+    Gson gson = new JSON().getGson();
+    String jsonString = gson.toJson(object);
+    LOG.info("Upstream response JSON: {}", jsonString);
     try {
-      K8sJobRequest.Path path = request.getPath();
-      V1DeleteOptions body =
-          new V1DeleteOptionsBuilder().withApiVersion(
-              path.getApiVersion()).build();
-      Object o = api.deleteNamespacedCustomObject(path.getGroup(),
-          path.getApiVersion(), path.getNamespace(), path.getPlural(),
-          request.getJobName(), body, null,
-          null, null);
-      Gson gson = new Gson();
-      return gson.fromJson(gson.toJson(o), CustomResourceJob.class);
-    } catch (ApiException ae) {
-      LOG.error("Exceptions when deleting CRD job: " + ae.getMessage(), ae);
+      if (op == ParseOp.PARSE_OP_RESULT) {
+        MLJob mlJob = gson.fromJson(jsonString, MLJob.class);
+        return MLJobConverter.toJobFromMLJob(mlJob);
+      } else if (op == ParseOp.PARSE_OP_DELETE) {
+        V1Status status = gson.fromJson(jsonString, V1Status.class);
+        return MLJobConverter.toJobFromStatus(status);
+      }
+    } catch (JsonSyntaxException e) {
+      LOG.warn("K8s submitter: parse response object failed by " + 
e.getMessage(), e);
     }
-    return null;
+    throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream 
response failed.");
   }
 
-  @VisibleForTesting
-  CustomResourceJobList listCustomResourceJobs(K8sJobRequest request) {
-    try {
-      K8sJobRequest.Path path = request.getPath();
-      Object o = api.listNamespacedCustomObject(path.getGroup(),
-          path.getApiVersion(),
-          path.getNamespace(), path.getPlural(), "true",
-          null, null, null,
-          null, null);
-      Gson gson = new Gson();
-      return gson.fromJson(gson.toJson(o), CustomResourceJobList.class);
-    } catch (ApiException ae) {
-      LOG.error("Exceptions when listing CRD jobs: " + ae.getMessage(), ae);
-    }
-    return null;
+  private enum ParseOp {
+    PARSE_OP_RESULT,
+    PARSE_OP_DELETE
   }
 }
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
index 81efc1f..af77365 100644
--- 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
@@ -20,6 +20,7 @@
 package org.apache.submarine.server.submitter.k8s.model;
 
 import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.models.V1JobStatus;
 import io.kubernetes.client.models.V1ObjectMeta;
 
 /**
@@ -43,6 +44,9 @@ public class MLJob {
 
   private transient String plural;
 
+  @SerializedName("status")
+  private V1JobStatus status;
+
   /**
    * Set the api with version
    *
@@ -135,4 +139,12 @@ public class MLJob {
   public void setPlural(String plural) {
     this.plural = plural;
   }
+
+  public V1JobStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(V1JobStatus status) {
+    this.status = status;
+  }
 }
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
new file mode 100644
index 0000000..ec59a67
--- /dev/null
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/util/MLJobConverter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.util;
+
+import java.util.List;
+
+import io.kubernetes.client.models.V1DeleteOptions;
+import io.kubernetes.client.models.V1DeleteOptionsBuilder;
+import io.kubernetes.client.models.V1JobCondition;
+import io.kubernetes.client.models.V1JobStatus;
+import io.kubernetes.client.models.V1Status;
+import io.kubernetes.client.models.V1StatusDetails;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.submitter.k8s.model.MLJob;
+import org.joda.time.DateTime;
+
+/**
+ * Converter for different types.
+ * Such as MLJob to Job, V1Status to Job and others.
+ */
+public class MLJobConverter {
+  public static Job toJobFromMLJob(MLJob mlJob) {
+    Job job = new Job();
+    job.setUid(mlJob.getMetadata().getUid());
+    job.setName(mlJob.getMetadata().getName());
+
+    DateTime dateTime = mlJob.getMetadata().getCreationTimestamp();
+    if (dateTime != null) {
+      job.setAcceptedTime(dateTime.toString());
+      job.setStatus(Job.Status.STATUS_ACCEPTED.getValue());
+    }
+
+    V1JobStatus status = mlJob.getStatus();
+    if (status != null) {
+      dateTime = status.getStartTime();
+      if (dateTime != null) {
+        job.setCreatedTime(dateTime.toString());
+        job.setStatus(Job.Status.STATUS_CREATED.getValue());
+      }
+
+      List<V1JobCondition> conditions = status.getConditions();
+      if (conditions != null && conditions.size() > 1) {
+        job.setStatus(Job.Status.STATUS_RUNNING.getValue());
+        for (V1JobCondition condition : conditions) {
+          if (Boolean.parseBoolean(condition.getStatus())
+              && condition.getType().toLowerCase().equals(
+              "running")) {
+            dateTime = condition.getLastTransitionTime();
+            job.setRunningTime(dateTime.toString());
+            break;
+          }
+        }
+      }
+
+      dateTime = status.getCompletionTime();
+      if (dateTime != null) {
+        job.setFinishedTime(dateTime.toString());
+        job.setStatus(Job.Status.STATUS_SUCCEEDED.getValue());
+      }
+    }
+    return job;
+  }
+
+  public static Job toJobFromStatus(V1Status status) {
+    Job job = new Job();
+    V1StatusDetails details = status.getDetails();
+    if (details != null) {
+      job.setUid(details.getUid());
+      job.setName(details.getName());
+    }
+    if (status.getStatus().toLowerCase().equals("success")) {
+      job.setStatus(Job.Status.STATUS_DELETED.getValue());
+    }
+    return job;
+  }
+
+  public static V1DeleteOptions toDeleteOptionsFromMLJob(MLJob job) {
+    return new 
V1DeleteOptionsBuilder().withApiVersion(job.getApiVersion()).build();
+  }
+}
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/JobSpecParserTest.java
 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/JobSpecParserTest.java
index 8bf7a35..36bb28a 100644
--- 
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/JobSpecParserTest.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/JobSpecParserTest.java
@@ -19,8 +19,9 @@
 
 package org.apache.submarine.server.submitter.k8s;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
 import org.apache.submarine.server.api.exception.InvalidSpecException;
 import org.apache.submarine.server.api.spec.JobLibrarySpec;
 import org.apache.submarine.server.api.spec.JobSpec;
@@ -36,19 +37,7 @@ import 
org.apache.submarine.server.submitter.k8s.parser.JobSpecParser;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Reader;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-
-public class JobSpecParserTest {
-
-  private final String pytorchJobReqFile = "/pytorch_job_req.json";
-  private final String tfJobReqFile = "/tf_mnist_req.json";
-
+public class JobSpecParserTest extends SpecBuilder {
   @Test
   public void testValidTensorflowJobSpec() throws IOException,
       URISyntaxException, InvalidSpecException {
@@ -182,19 +171,4 @@ public class JobSpecParserTest {
     Assert.assertEquals(expectedMasterContainerCpu,
         actualMasterContainerCpu);
   }
-
-  private JobSpec buildFromJsonFile(String filePath) throws IOException,
-      URISyntaxException {
-    Gson gson = new GsonBuilder().create();
-    try (Reader reader = Files.newBufferedReader(
-        getCustomJobSpecFile(filePath).toPath(),
-        StandardCharsets.UTF_8)) {
-      return gson.fromJson(reader, JobSpec.class);
-    }
-  }
-
-  private File getCustomJobSpecFile(String path) throws URISyntaxException {
-    URL fileUrl = this.getClass().getResource(path);
-    return new File(fileUrl.toURI());
-  }
 }
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
index 4d91a60..3191f4e 100644
--- 
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/K8SJobSubmitterTest.java
@@ -19,32 +19,21 @@
 
 package org.apache.submarine.server.submitter.k8s;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
 import io.kubernetes.client.ApiException;
 import io.kubernetes.client.apis.CoreV1Api;
 import io.kubernetes.client.models.V1Namespace;
 import io.kubernetes.client.models.V1NamespaceList;
-import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
 import org.apache.submarine.server.api.job.Job;
 import org.apache.submarine.server.api.spec.JobSpec;
-import org.apache.submarine.server.submitter.k8s.model.CustomResourceJob;
-import org.apache.submarine.server.submitter.k8s.model.CustomResourceJobList;
-import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
-import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Reader;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-
 /**
  * We have two ways to test submitter for K8s cluster, local and travis CI.
  * <p>
@@ -62,21 +51,10 @@ import java.nio.file.Files;
  * Local: docker run -it --privileged -p 8443:8443 -p 10080:10080 
bsycorp/kind:latest-1.15
  * Travis: See '.travis.yml'
  */
-public class K8SJobSubmitterTest {
-  private final String tfJobName = "mnist";
-  private final String pytorchJobName = "pytorch-dist-mnist-gloo";
-
-  // The spec files in test/resources
-  private final String tfJobSpecFile = "/tf_job_mnist.json";
-  private final String tfJobReqFile = "/tf_mnist_req.json";
-  private final String pytorchJobSpecFile = "/pytorch_job_mnist_gloo.json";
-  private final String pytorchJobReqFile = "/pytorch_job_req.json";
+public class K8SJobSubmitterTest extends SpecBuilder {
 
   private K8sJobSubmitter submitter;
 
-  private K8sJobRequest.Path tfPath;
-  private K8sJobRequest.Path pyTorchPath;
-
   @Before
   public void before() throws IOException, ApiException {
     String confPath = System.getProperty("user.home") + "/.kube/config";
@@ -87,12 +65,8 @@ public class K8SJobSubmitterTest {
     submitter.initialize(null);
     String ns = "submarine";
     if (!isEnvReady()) {
-      throw new ApiException(" Please create a namespace 'submarine'.");
+      throw new ApiException("Please create a namespace 'submarine'.");
     }
-    tfPath = new K8sJobRequest.Path(TFJob.CRD_TF_GROUP_V1,
-        TFJob.CRD_TF_VERSION_V1, ns, TFJob.CRD_TF_PLURAL_V1);
-    pyTorchPath = new K8sJobRequest.Path(PyTorchJob.CRD_PYTORCH_GROUP_V1,
-        PyTorchJob.CRD_PYTORCH_VERSION_V1, ns, 
PyTorchJob.CRD_PYTORCH_PLURAL_V1);
   }
 
   private boolean isEnvReady() throws ApiException {
@@ -106,101 +80,37 @@ public class K8SJobSubmitterTest {
     return false;
   }
 
-  // Delete the job might take time
-  @After
-  public void after() {
-    try {
-      tryDeleteCustomJob(tfPath, tfJobName);
-      tryDeleteCustomJob(pyTorchPath, pytorchJobName);
-      Thread.sleep(2000);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Test
-  public void testRawTFJobSpec() throws URISyntaxException {
-    tryCreateCustomJob(tfPath, tfJobName, tfJobSpecFile);
-    CustomResourceJob job = getCustomJob(tfPath, tfJobName);
-    Assert.assertNotNull(job);
-    Assert.assertEquals(tfJobName, job.getMetadata().getName());
-    CustomResourceJobList jobList = listCustomJobs(tfPath, tfJobSpecFile);
-    Assert.assertEquals(1, jobList.getItems().size());
-  }
-
-  @Test
-  public void testRunRawPyTorchJobSpec() throws URISyntaxException {
-    tryCreateCustomJob(pyTorchPath, pytorchJobName, pytorchJobSpecFile);
-    CustomResourceJob job = getCustomJob(pyTorchPath, pytorchJobName);
-    Assert.assertNotNull(job);
-    Assert.assertEquals(pytorchJobName, job.getMetadata().getName());
-  }
-
   @Test
   public void testRunPyTorchJobPerRequest() throws URISyntaxException,
-      IOException, InvalidSpecException {
-    JobSpec jobSpec = buildFromJsonFile(pytorchJobReqFile);
-    Job job = submitter.submitJob(jobSpec);
-    Assert.assertNotNull(job);
-    CustomResourceJob cjob = getCustomJob(pyTorchPath, pytorchJobName);
-    Assert.assertEquals(pytorchJobName, cjob.getMetadata().getName());
-    Assert.assertNotNull(cjob);
+      IOException, SubmarineRuntimeException {
+    JobSpec spec = buildFromJsonFile(pytorchJobReqFile);
+    run(spec);
   }
 
   @Test
   public void testRunTFJobPerRequest() throws URISyntaxException,
-      IOException, InvalidSpecException {
-    JobSpec jobSpec = buildFromJsonFile(tfJobReqFile);
-    Job job = submitter.submitJob(jobSpec);
-    Assert.assertNotNull(job);
-    CustomResourceJob cjob = getCustomJob(tfPath, tfJobName);
-    Assert.assertEquals(tfJobName, cjob.getMetadata().getName());
-    Assert.assertNotNull(cjob);
-  }
-
-  private JobSpec buildFromJsonFile(String filePath) throws IOException,
-      URISyntaxException {
-    Gson gson = new GsonBuilder().create();
-    try (Reader reader = Files.newBufferedReader(
-        getCustomJobSpecFile(filePath).toPath(),
-        StandardCharsets.UTF_8)) {
-      return gson.fromJson(reader, JobSpec.class);
-    }
-  }
-
-  public CustomResourceJob tryCreateCustomJob(K8sJobRequest.Path requestPath,
-      String jobName, String jobSpecFile) throws URISyntaxException {
-    CustomResourceJob job = getCustomJob(requestPath, jobName);
-    if (job != null) {
-      return job;
-    }
-    return submitter.createCustomJob(
-        new K8sJobRequest(requestPath, getCustomJobSpecFile(jobSpecFile)));
-  }
-
-  public CustomResourceJobList listCustomJobs(K8sJobRequest.Path requestPath,
-      String jobSpecFile) throws URISyntaxException {
-    return submitter.listCustomResourceJobs(
-        new K8sJobRequest(requestPath, getCustomJobSpecFile(jobSpecFile)));
-  }
-
-  public CustomResourceJob tryDeleteCustomJob(K8sJobRequest.Path requestPath,
-      String jobName) {
-    if (getCustomJob(requestPath, jobName) != null) {
-      K8sJobRequest request = new K8sJobRequest(requestPath, null, jobName);
-      return submitter.deleteCustomResourceJob(request);
-    }
-    return null;
-  }
-
-  private CustomResourceJob getCustomJob(K8sJobRequest.Path requestPath,
-      String jobName) {
-    K8sJobRequest request = new K8sJobRequest(requestPath, null, jobName);
-    return submitter.getCustomResourceJob(request);
+      IOException, SubmarineRuntimeException {
+    JobSpec spec = buildFromJsonFile(tfJobReqFile);
+    run(spec);
   }
 
-  private File getCustomJobSpecFile(String path) throws URISyntaxException {
-    URL fileUrl = this.getClass().getResource(path);
-    return new File(fileUrl.toURI());
+  private void run(JobSpec spec) throws SubmarineRuntimeException {
+    // create
+    Job jobCreated = submitter.createJob(spec);
+    Assert.assertNotNull(jobCreated);
+
+    // find
+    Job jobFound = submitter.findJob(spec);
+    Assert.assertNotNull(jobFound);
+    Assert.assertEquals(jobCreated.getUid(), jobFound.getUid());
+    Assert.assertEquals(jobCreated.getName(), jobFound.getName());
+    Assert.assertEquals(jobCreated.getAcceptedTime(), 
jobFound.getAcceptedTime());
+
+    // delete
+    Job jobDeleted = submitter.deleteJob(spec);
+    Assert.assertNotNull(jobDeleted);
+    Assert.assertEquals(Job.Status.STATUS_DELETED.getValue(), 
jobDeleted.getStatus());
+    Assert.assertEquals(jobFound.getUid(), jobDeleted.getUid());
+    Assert.assertEquals(jobFound.getName(), jobDeleted.getName());
   }
 }
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
new file mode 100644
index 0000000..99feee4
--- /dev/null
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/MLJobConverterTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import io.kubernetes.client.models.V1DeleteOptions;
+import io.kubernetes.client.models.V1JobCondition;
+import io.kubernetes.client.models.V1JobConditionBuilder;
+import io.kubernetes.client.models.V1JobStatus;
+import io.kubernetes.client.models.V1JobStatusBuilder;
+import io.kubernetes.client.models.V1Status;
+import io.kubernetes.client.models.V1StatusBuilder;
+import org.apache.submarine.server.api.exception.InvalidSpecException;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
+import org.apache.submarine.server.submitter.k8s.model.MLJob;
+import org.apache.submarine.server.submitter.k8s.parser.JobSpecParser;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MLJobConverterTest extends SpecBuilder {
+  @Test
+  public void testMLJob2Job() throws IOException, URISyntaxException, 
InvalidSpecException {
+    // Accepted Status
+    JobSpec spec = buildFromJsonFile(tfJobReqFile);
+    MLJob mlJob = JobSpecParser.parseJob(spec);
+    V1JobStatus status = new V1JobStatusBuilder().build();
+    mlJob.setStatus(status);
+    Job job = MLJobConverter.toJobFromMLJob(mlJob);
+    Assert.assertNull(job.getStatus());
+    Assert.assertNull(job.getStatus());
+
+    // Created Status
+    DateTime startTime = new DateTime();
+    mlJob.getStatus().setStartTime(startTime);
+
+    List<V1JobCondition> conditions = new ArrayList<>();
+    DateTime createdTime = new DateTime();
+    V1JobCondition condition = new V1JobConditionBuilder().withStatus("True")
+        .withType("Created").withLastTransitionTime(createdTime).build();
+    conditions.add(condition);
+    mlJob.getStatus().setConditions(conditions);
+
+    job = MLJobConverter.toJobFromMLJob(mlJob);
+    Assert.assertEquals(Job.Status.STATUS_CREATED.getValue(), job.getStatus());
+    Assert.assertEquals(startTime.toString(), job.getCreatedTime());
+
+    // Running Status
+    DateTime runningTime = new DateTime();
+    condition = new V1JobConditionBuilder().withStatus("True")
+        .withType("Running").withLastTransitionTime(runningTime).build();
+    conditions.add(condition);
+
+    mlJob.getStatus().setConditions(conditions);
+    job = MLJobConverter.toJobFromMLJob(mlJob);
+    Assert.assertEquals(Job.Status.STATUS_RUNNING.toString(), job.getStatus());
+    Assert.assertEquals(runningTime.toString(), job.getRunningTime());
+
+    // Succeeded Status
+    DateTime finishedTime = new DateTime();
+    mlJob.getStatus().setCompletionTime(finishedTime);
+    job = MLJobConverter.toJobFromMLJob(mlJob);
+    Assert.assertEquals(Job.Status.STATUS_SUCCEEDED.toString(), 
job.getStatus());
+    Assert.assertEquals(finishedTime.toString(), job.getFinishedTime());
+  }
+
+  @Test
+  public void testStatus2Job() {
+    V1Status status = new V1StatusBuilder().withStatus("Success").build();
+    Job job = MLJobConverter.toJobFromStatus(status);
+    Assert.assertNotNull(job);
+    Assert.assertEquals(Job.Status.STATUS_DELETED.getValue(), job.getStatus());
+  }
+
+  @Test
+  public void testMLJob2DeleteOptions() throws IOException, URISyntaxException,
+      InvalidSpecException {
+    JobSpec spec = buildFromJsonFile(tfJobReqFile);
+    MLJob mlJob = JobSpecParser.parseJob(spec);
+    V1DeleteOptions options = MLJobConverter.toDeleteOptionsFromMLJob(mlJob);
+    Assert.assertNotNull(options);
+    Assert.assertEquals(mlJob.getApiVersion(), options.getApiVersion());
+  }
+}
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
new file mode 100644
index 0000000..c8ac752
--- /dev/null
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/SpecBuilder.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.submarine.server.api.spec.JobSpec;
+
+public abstract class SpecBuilder {
+  // The spec files in test/resources
+  protected final String tfJobReqFile = "/tf_mnist_req.json";
+  protected final String pytorchJobReqFile = "/pytorch_job_req.json";
+
+  protected JobSpec buildFromJsonFile(String filePath) throws IOException,
+      URISyntaxException {
+    Gson gson = new GsonBuilder().create();
+    try (Reader reader = 
Files.newBufferedReader(getCustomJobSpecFile(filePath).toPath(),
+        StandardCharsets.UTF_8)) {
+      return gson.fromJson(reader, JobSpec.class);
+    }
+  }
+
+  private File getCustomJobSpecFile(String path) throws URISyntaxException {
+    URL fileUrl = this.getClass().getResource(path);
+    return new File(fileUrl.toURI());
+  }
+}
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/pytorch_job_mnist_gloo.json
 
b/submarine-server/server-submitter/submitter-k8s/src/test/resources/pytorch_job_mnist_gloo.json
deleted file mode 100644
index a19a33d..0000000
--- 
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/pytorch_job_mnist_gloo.json
+++ /dev/null
@@ -1,47 +0,0 @@
-{
-  "apiVersion": "kubeflow.org/v1",
-  "kind": "PyTorchJob",
-  "metadata": {
-    "name": "pytorch-dist-mnist-gloo"
-  },
-  "spec": {
-    "pytorchReplicaSpecs": {
-      "Master": {
-        "replicas": 1,
-        "restartPolicy": "Never",
-        "template": {
-          "spec": {
-            "containers": [
-              {
-                "name": "pytorch",
-                "image": "apache/submarine:pytorch-dist-mnist-1.0",
-                "args": [
-                  "--backend",
-                  "gloo"
-                ]
-              }
-            ]
-          }
-        }
-      },
-      "Worker": {
-        "replicas": 1,
-        "restartPolicy": "Never",
-        "template": {
-          "spec": {
-            "containers": [
-              {
-                "name": "pytorch",
-                "image": "apache/submarine:pytorch-dist-mnist-1.0",
-                "args": [
-                  "--backend",
-                  "gloo"
-                ]
-              }
-            ]
-          }
-        }
-      }
-    }
-  }
-}
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.json
 
b/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.json
deleted file mode 100644
index 6d36d1d..0000000
--- 
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.json
+++ /dev/null
@@ -1,48 +0,0 @@
-{
-  "apiVersion": "kubeflow.org/v1",
-  "kind": "TFJob",
-  "metadata": {
-    "name": "mnist",
-    "namespace": "submarine"
-  },
-  "spec": {
-    "cleanPodPolicy": "None",
-    "tfReplicaSpecs": {
-      "Worker": {
-        "replicas": 1,
-        "restartPolicy": "Never",
-        "template": {
-          "spec": {
-            "containers": [
-              {
-                "name": "tensorflow",
-                "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
-                "command": [
-                  "python",
-                  "/var/tf_mnist/mnist_with_summaries.py",
-                  "--log_dir=/train/logs",
-                  "--learning_rate=0.01",
-                  "--batch_size=150"
-                ],
-                "volumeMounts": [
-                  {
-                    "mountPath": "/train",
-                    "name": "training"
-                  }
-                ]
-              }
-            ],
-            "volumes": [
-              {
-                "name": "training",
-                "persistentVolumeClaim": {
-                  "claimName": "tfevent-volume"
-                }
-              }
-            ]
-          }
-        }
-      }
-    }
-  }
-}
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.yaml
 
b/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.yaml
deleted file mode 100644
index 370f4cc..0000000
--- 
a/submarine-server/server-submitter/submitter-k8s/src/test/resources/tf_job_mnist.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-apiVersion: "kubeflow.org/v1"
-kind: "TFJob"
-metadata:
-  name: "mnist"
-  namespace: kubeflow
-spec:
-  cleanPodPolicy: None
-  tfReplicaSpecs:
-    Worker:
-      replicas: 1
-      restartPolicy: Never
-      template:
-        spec:
-          containers:
-            - name: tensorflow
-              image: gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0
-              command:
-                - "python"
-                - "/var/tf_mnist/mnist_with_summaries.py"
-                - "--log_dir=/train/logs"
-                - "--learning_rate=0.01"
-                - "--batch_size=150"
-              volumeMounts:
-                - mountPath: "/train"
-                  name: "training"
-          volumes:
-            - name: "training"
-              persistentVolumeClaim:
-                claimName: "tfevent-volume"
diff --git a/submarine-test/test-k8s/pom.xml b/submarine-test/test-k8s/pom.xml
index b6b83c5..1232180 100644
--- a/submarine-test/test-k8s/pom.xml
+++ b/submarine-test/test-k8s/pom.xml
@@ -41,6 +41,12 @@
       <groupId>org.apache.submarine</groupId>
       <artifactId>submarine-server-core</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>joda-time</groupId>
+          <artifactId>joda-time</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
@@ -72,6 +78,10 @@
           <groupId>commons-logging</groupId>
           <artifactId>commons-logging</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>commons-codec</groupId>
+          <artifactId>commons-codec</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -94,6 +104,12 @@
       <artifactId>snakeyaml</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.kubernetes</groupId>
+      <artifactId>client-java</artifactId>
+      <version>${k8s.client-java.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
 
b/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
index ec63518..f9a0fb0 100644
--- 
a/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
+++ 
b/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
@@ -19,48 +19,285 @@
 
 package org.apache.submarine.rest;
 
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.Configuration;
+import io.kubernetes.client.JSON;
+import io.kubernetes.client.apis.CustomObjectsApi;
+import io.kubernetes.client.util.ClientBuilder;
+import io.kubernetes.client.util.KubeConfig;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.io.FileUtils;
 import org.apache.submarine.server.AbstractSubmarineServerTest;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobId;
+import org.apache.submarine.server.json.JobIdDeserializer;
+import org.apache.submarine.server.json.JobIdSerializer;
 import org.apache.submarine.server.response.JsonResponse;
 import org.apache.submarine.server.rest.RestConstants;
+import org.joda.time.DateTime;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.junit.BeforeClass;
-
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.Assert.assertEquals;
 
+@SuppressWarnings("rawtypes")
 public class JobManagerRestApiIT extends AbstractSubmarineServerTest {
   private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerRestApiIT.class);
 
+  private static CustomObjectsApi k8sApi;
+  /** Key is the ml framework name, the value is the operator */
+  private static Map<String, KfOperator> kfOperatorMap;
+  private static String JOB_PATH = "/api/" + RestConstants.V1 + "/" + 
RestConstants.JOBS;
+
+  private Gson gson = new GsonBuilder()
+      .registerTypeAdapter(JobId.class, new JobIdSerializer())
+      .registerTypeAdapter(JobId.class, new JobIdDeserializer())
+      .create();
+
   @BeforeClass
-  public static void startUp(){
+  public static void startUp() throws IOException {
     Assert.assertTrue(checkIfServerIsRunning());
+
+    // The kube config path defined by kind-cluster-build.sh
+    String confPath = System.getProperty("user.home") + 
"/.kube/kind-config-kind";
+    KubeConfig config = KubeConfig.loadKubeConfig(new FileReader(confPath));
+    ApiClient client = ClientBuilder.kubeconfig(config).build();
+    Configuration.setDefaultApiClient(client);
+    k8sApi = new CustomObjectsApi();
+
+    kfOperatorMap = new HashMap<>();
+    kfOperatorMap.put("tensorflow", new KfOperator("v1", "tfjobs"));
+    kfOperatorMap.put("pytorch", new KfOperator("v1", "pytorchjobs"));
+  }
+
+  @Test
+  public void testJobServerPing() throws IOException {
+    GetMethod response = httpGet("/api/" + RestConstants.V1 + "/"
+        + RestConstants.JOBS + "/" + RestConstants.PING);
+    String requestBody = response.getResponseBodyAsString();
+    Gson gson = new Gson();
+    JsonResponse jsonResponse = gson.fromJson(requestBody, JsonResponse.class);
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
jsonResponse.getCode());
+    Assert.assertEquals("Pong", jsonResponse.getResult().toString());
   }
 
-  // Test job created with correct JSON input
   @Test
-  public void testCreateJob() throws Exception {
-    URL fileUrl = this.getClass().getResource("/tf-mnist-req.json");
-    String jobSpec = FileUtils.readFileToString(new File(fileUrl.toURI()), 
StandardCharsets.UTF_8);
+  public void testTensorFlowWithJsonSpec() throws Exception {
+    String body = loadContent("tensorflow/tf-mnist-req.json");
+    String patchBody = loadContent("tensorflow/tf-mnist-patch-req.json");
+    run(body, patchBody, "application/json");
+  }
 
-    PostMethod response = httpPost("/api/" + RestConstants.V1 + "/" + 
RestConstants.JOBS, jobSpec);
-    LOG.debug(response.toString());
+  @Test
+  public void testTensorFlowWithYamlSpec() throws Exception {
+    String body = loadContent("tensorflow/tf-mnist-req.yaml");
+    String patchBody = loadContent("tensorflow/tf-mnist-patch-req.yaml");
+    run(body, patchBody, "application/yaml");
+  }
 
-    String responseBodyAsString = response.getResponseBodyAsString();
-    LOG.debug(responseBodyAsString);
+  @Test
+  public void testPyTorchWithJsonSpec() throws Exception {
+    String body = loadContent("pytorch/pt-mnist-req.json");
+    String patchBody = loadContent("pytorch/pt-mnist-patch-req.json");
+    run(body, patchBody, "application/json");
+  }
 
-    Gson gson = new Gson();
-    JsonResponse jsonResponse = gson.fromJson(responseBodyAsString, 
JsonResponse.class);
-    assertEquals("Response code should be 202 ",
-        Response.Status.ACCEPTED.getStatusCode(), jsonResponse.getCode());
+  @Test
+  public void testPyTorchWithYamlSpec() throws Exception {
+    String body = loadContent("pytorch/pt-mnist-req.yaml");
+    String patchBody = loadContent("pytorch/pt-mnist-patch-req.yaml");
+    run(body, patchBody, "application/yaml");
+  }
+
+  private void run(String body, String patchBody, String contentType) throws 
Exception {
+    // create
+    LOG.info("Create training job by Job REST API");
+    PostMethod postMethod = httpPost(JOB_PATH, body, contentType);
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
postMethod.getStatusCode());
+
+    String json = postMethod.getResponseBodyAsString();
+    JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
jsonResponse.getCode());
+
+    Job createdJob = gson.fromJson(gson.toJson(jsonResponse.getResult()), 
Job.class);
+    verifyCreateJobApiResult(createdJob);
+
+    // find
+    GetMethod getMethod = httpGet(JOB_PATH + "/" + 
createdJob.getJobId().toString());
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
getMethod.getStatusCode());
+
+    json = getMethod.getResponseBodyAsString();
+    jsonResponse = gson.fromJson(json, JsonResponse.class);
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
jsonResponse.getCode());
+
+    Job foundJob = gson.fromJson(gson.toJson(jsonResponse.getResult()), 
Job.class);
+    verifyGetJobApiResult(createdJob, foundJob);
+
+    // patch
+    // TODO(jiwq): the commons-httpclient not support patch method
+    // https://tools.ietf.org/html/rfc5789
+
+    // delete
+    DeleteMethod deleteMethod = httpDelete(JOB_PATH + "/" + 
createdJob.getJobId().toString());
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
deleteMethod.getStatusCode());
+
+    json = deleteMethod.getResponseBodyAsString();
+    jsonResponse = gson.fromJson(json, JsonResponse.class);
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
jsonResponse.getCode());
+
+    Job deletedJob = gson.fromJson(gson.toJson(jsonResponse.getResult()), 
Job.class);
+    verifyDeleteJobApiResult(createdJob, deletedJob);
+  }
+
+  private void verifyCreateJobApiResult(Job createdJob) throws Exception {
+    Assert.assertNotNull(createdJob.getUid());
+    Assert.assertNotNull(createdJob.getAcceptedTime());
+    Assert.assertEquals(Job.Status.STATUS_ACCEPTED.getValue(), 
createdJob.getStatus());
+
+    assertK8sResultEquals(createdJob);
+  }
+
+  private void verifyGetJobApiResult(Job createdJob, Job foundJob) throws 
Exception {
+    Assert.assertEquals(createdJob.getJobId(), foundJob.getJobId());
+    Assert.assertEquals(createdJob.getUid(), foundJob.getUid());
+    Assert.assertEquals(createdJob.getName(), foundJob.getName());
+    Assert.assertEquals(createdJob.getAcceptedTime(), 
foundJob.getAcceptedTime());
+
+    assertK8sResultEquals(foundJob);
+  }
+
+  private void assertK8sResultEquals(Job job) throws Exception {
+    KfOperator operator = 
kfOperatorMap.get(job.getSpec().getLibrarySpec().getName()
+        .toLowerCase());
+    JsonObject rootObject = getJobByK8sApi(operator.getGroup(), 
operator.getVersion(),
+        operator.getNamespace(), operator.getPlural(), job.getName());
+    JsonObject metadataObject = rootObject.getAsJsonObject("metadata");
+
+    String uid = metadataObject.getAsJsonPrimitive("uid").getAsString();
+    LOG.info("Uid from Job REST is {}", job.getUid());
+    LOG.info("Uid from K8s REST is {}", uid);
+    Assert.assertEquals(job.getUid(), uid);
+
+    String creationTimestamp = 
metadataObject.getAsJsonPrimitive("creationTimestamp")
+        .getAsString();
+    Date expectedDate = new DateTime(job.getAcceptedTime()).toDate();
+    Date actualDate = new DateTime(creationTimestamp).toDate();
+    LOG.info("CreationTimestamp from Job REST is {}", expectedDate);
+    LOG.info("CreationTimestamp from K8s REST is {}", actualDate);
+    Assert.assertEquals(expectedDate, actualDate);
+  }
+
+  private void verifyDeleteJobApiResult(Job createdJob, Job deletedJob) {
+    Assert.assertEquals(createdJob.getName(), deletedJob.getName());
+    Assert.assertEquals(Job.Status.STATUS_DELETED.getValue(), 
deletedJob.getStatus());
+
+    // verify the result by K8s api
+    KfOperator operator = 
kfOperatorMap.get(createdJob.getSpec().getLibrarySpec().getName()
+        .toLowerCase());
+    JsonObject rootObject = null;
+    try {
+      rootObject = getJobByK8sApi(operator.getGroup(), operator.getVersion(),
+          operator.getNamespace(), operator.getPlural(), createdJob.getName());
+    } catch (ApiException e) {
+      Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
e.getCode());
+    } finally {
+      Assert.assertNull(rootObject);
+    }
+  }
+
+  private JsonObject getJobByK8sApi(String group, String version, String 
namespace, String plural,
+      String name) throws ApiException {
+    Object obj = k8sApi.getNamespacedCustomObject(group, version, namespace, 
plural, name);
+    Gson gson = new JSON().getGson();
+    JsonObject rootObject = gson.toJsonTree(obj).getAsJsonObject();
+    Assert.assertNotNull("Parse the K8s API Server response failed.", 
rootObject);
+    return rootObject;
+  }
+
+  @Test
+  public void testCreateJobWithInvalidSpec() throws Exception {
+    PostMethod postMethod = httpPost(JOB_PATH, "", MediaType.APPLICATION_JSON);
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
postMethod.getStatusCode());
+
+    String json = postMethod.getResponseBodyAsString();
+    JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
jsonResponse.getCode());
+  }
+
+  @Test
+  public void testCreateJobWithInvalidSubmitter() throws Exception {
+    String body = loadContent("tensorflow/tf-mnist-req.json");
+    body = body.replace("k8s", "InvalidSubmitter");
+    PostMethod postMethod = httpPost(JOB_PATH, body, 
MediaType.APPLICATION_JSON);
+    Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+        postMethod.getStatusCode());
+
+    String json = postMethod.getResponseBodyAsString();
+    JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+    Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+        jsonResponse.getCode());
+  }
+
+  @Test
+  public void testNotFoundJob() throws Exception {
+    GetMethod getMethod = httpGet(JOB_PATH + "/" + "job_123456789_0001");
+    Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
getMethod.getStatusCode());
+
+    String json = getMethod.getResponseBodyAsString();
+    JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+    Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
jsonResponse.getCode());
+  }
+
+  String loadContent(String resourceName) throws Exception {
+    URL fileUrl = this.getClass().getResource("/" + resourceName);
+    LOG.info("Resource file: " + fileUrl);
+    return FileUtils.readFileToString(new File(fileUrl.toURI()), 
StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Direct used by K8s api. It storage the operator's base info.
+   */
+  private static class KfOperator {
+    private String version;
+    private String plural;
+
+    KfOperator(String version, String plural) {
+      this.version = version;
+      this.plural = plural;
+    }
+
+    public String getGroup() {
+      return "kubeflow.org";
+    }
+
+    public String getNamespace() {
+      return "default";
+    }
+
+    public String getVersion() {
+      return version;
+    }
+
+    public String getPlural() {
+      return plural;
+    }
   }
 }
diff --git 
a/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.json 
b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.json
new file mode 100644
index 0000000..dd4467c
--- /dev/null
+++ b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.json
@@ -0,0 +1,28 @@
+{
+  "name": "mnist",
+  "librarySpec": {
+    "name": "pytorch",
+    "version": "2.1.0",
+    "image": "apache/submarine:pytorch-dist-mnist-1.0",
+    "cmd": "python /var/mnist.py --backend gloo",
+    "envVars": {
+      "ENV_1": "ENV1"
+    }
+  },
+  "submitterSpec": {
+    "type": "k8s",
+    "namespace": "default"
+  },
+  "taskSpecs": {
+    "Master": {
+      "name": "master",
+      "replicas": 1,
+      "resources": "cpu=1,memory=1024M"
+    },
+    "Worker": {
+      "name": "worker",
+      "replicas": 2,
+      "resources": "cpu=1,memory=1024M"
+    }
+  }
+}
diff --git 
a/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.yaml 
b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.yaml
new file mode 100644
index 0000000..8b47741
--- /dev/null
+++ b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-patch-req.yaml
@@ -0,0 +1,21 @@
+---
+name: mnist
+librarySpec:
+  name: pytorch
+  version: 2.1.0
+  image: apache/submarine:pytorch-dist-mnist-1.0
+  cmd: python /var/mnist.py --backend gloo
+  envVars:
+    ENV_1: ENV1
+submitterSpec:
+  type: k8s
+  namespace: default
+taskSpecs:
+  Master:
+    name: master
+    replicas: 1
+    resources: cpu=1,memory=1024M
+  Worker:
+    name: worker
+    replicas: s
+    resources: cpu=1,memory=1024M
diff --git 
a/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.json 
b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.json
new file mode 100644
index 0000000..cb884bf
--- /dev/null
+++ b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.json
@@ -0,0 +1,28 @@
+{
+  "name": "mnist",
+  "librarySpec": {
+    "name": "pytorch",
+    "version": "2.1.0",
+    "image": "apache/submarine:pytorch-dist-mnist-1.0",
+    "cmd": "python /var/mnist.py --backend gloo",
+    "envVars": {
+      "ENV_1": "ENV1"
+    }
+  },
+  "submitterSpec": {
+    "type": "k8s",
+    "namespace": "default"
+  },
+  "taskSpecs": {
+    "Master": {
+      "name": "master",
+      "replicas": 1,
+      "resources": "cpu=1,memory=1024M"
+    },
+    "Worker": {
+      "name": "worker",
+      "replicas": 1,
+      "resources": "cpu=1,memory=1024M"
+    }
+  }
+}
diff --git 
a/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.yaml 
b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.yaml
new file mode 100644
index 0000000..414d6ef
--- /dev/null
+++ b/submarine-test/test-k8s/src/test/resources/pytorch/pt-mnist-req.yaml
@@ -0,0 +1,21 @@
+---
+name: mnist
+librarySpec:
+  name: pytorch
+  version: 2.1.0
+  image: apache/submarine:pytorch-dist-mnist-1.0
+  cmd: python /var/mnist.py --backend gloo
+  envVars:
+    ENV_1: ENV1
+submitterSpec:
+  type: k8s
+  namespace: default
+taskSpecs:
+  Master:
+    name: master
+    replicas: 1
+    resources: cpu=1,memory=1024M
+  Worker:
+    name: worker
+    replicas: 1
+    resources: cpu=1,memory=1024M
diff --git a/submarine-server/server-core/src/test/resources/tf-mnist-req.json 
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.json
similarity index 58%
rename from submarine-server/server-core/src/test/resources/tf-mnist-req.json
rename to 
submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.json
index 0850e54..d8df243 100644
--- a/submarine-server/server-core/src/test/resources/tf-mnist-req.json
+++ 
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.json
@@ -11,21 +11,13 @@
   },
   "submitterSpec": {
     "type": "k8s",
-    "configPath": null,
-    "namespace": "submarine",
-    "kind": "TFJob",
-    "apiVersion": "kubeflow.org/v1"
+    "namespace": "default"
   },
   "taskSpecs": {
-    "Ps": {
-      "name": "tensorflow",
-      "replicas": 2,
-      "resources": "cpu=4,memory=2048M,nvidia.com/gpu=1"
-    },
     "Worker": {
       "name": "tensorflow",
-      "replicas": 2,
-      "resources": "cpu=4,memory=2048M,nvidia.com/gpu=1"
+      "replicas": 1,
+      "resources": "cpu=1,memory=1024M"
     }
   }
-}
\ No newline at end of file
+}
diff --git a/submarine-server/server-core/src/test/resources/tf-mnist-req.yaml 
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.yaml
similarity index 67%
copy from submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
copy to 
submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.yaml
index 02e2125..edae4cb 100644
--- a/submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
+++ 
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-patch-req.yaml
@@ -9,15 +9,11 @@ librarySpec:
 submitterSpec:
   type: "k8s"
   configPath:
-  namespace: "submarine"
+  namespace: "default"
   kind: "TFJob"
   apiVersion: "kubeflow.org/v1"
 taskSpecs:
-  Ps:
-    name: tensorflow
-    replicas: 2
-    resources: "cpu=4,memory=2048M,nvidia.com/gpu=1"
   Worker:
     name: tensorflow
-    replicas: 2
-    resources: "cpu=4,memory=2048M,nvidia.com/gpu=1"
+    replicas: 1
+    resources: "cpu=4,memory=2048M"
diff --git a/submarine-test/test-k8s/src/test/resources/tf-mnist-req.json 
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.json
similarity index 72%
rename from submarine-test/test-k8s/src/test/resources/tf-mnist-req.json
rename to 
submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.json
index 13fc934..d7f7473 100644
--- a/submarine-test/test-k8s/src/test/resources/tf-mnist-req.json
+++ b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.json
@@ -11,18 +11,18 @@
   },
   "submitterSpec": {
     "type": "k8s",
-    "namespace": "submarine"
+    "namespace": "default"
   },
   "taskSpecs": {
     "Ps": {
       "name": "tensorflow",
-      "replicas": 2,
-      "resources": "cpu=4,memory=2048M,nvidia.com/gpu=1"
+      "replicas": 1,
+      "resources": "cpu=1,memory=512M"
     },
     "Worker": {
       "name": "tensorflow",
-      "replicas": 2,
-      "resources": "cpu=4,memory=2048M,nvidia.com/gpu=1"
+      "replicas": 1,
+      "resources": "cpu=1,memory=512M"
     }
   }
 }
diff --git a/submarine-server/server-core/src/test/resources/tf-mnist-req.yaml 
b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.yaml
similarity index 70%
rename from submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
rename to 
submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.yaml
index 02e2125..bf729a6 100644
--- a/submarine-server/server-core/src/test/resources/tf-mnist-req.yaml
+++ b/submarine-test/test-k8s/src/test/resources/tensorflow/tf-mnist-req.yaml
@@ -8,16 +8,15 @@ librarySpec:
     ENV_1: "ENV1"
 submitterSpec:
   type: "k8s"
-  configPath:
-  namespace: "submarine"
+  namespace: "default"
   kind: "TFJob"
   apiVersion: "kubeflow.org/v1"
 taskSpecs:
   Ps:
     name: tensorflow
-    replicas: 2
-    resources: "cpu=4,memory=2048M,nvidia.com/gpu=1"
+    replicas: 1
+    resources: "cpu=1,memory=512M"
   Worker:
     name: tensorflow
-    replicas: 2
-    resources: "cpu=4,memory=2048M,nvidia.com/gpu=1"
+    replicas: 1
+    resources: "cpu=1,memory=512M"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to