This is an automated email from the ASF dual-hosted git repository.

ztang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f2700f  SUBMARINE-321. Add JobManager and SubmitterManager components
6f2700f is described below

commit 6f2700f09abb0eafd207c1ddcc0652067cd98ebf
Author: Wanqiang Ji <[email protected]>
AuthorDate: Mon Dec 30 00:53:53 2019 +0800

    SUBMARINE-321. Add JobManager and SubmitterManager components
    
    ### What is this PR for?
    The JobManager is responsible for cache and schedule the job to submitter.
    The SubmitterManager help to load the different submitter plugins.
    
    ### What type of PR is it?
    Feature
    
    ### Todos
    [ ] Hook REST server and JobManager
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-321
    
    ### How should this be tested?
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Wanqiang Ji <[email protected]>
    
    Closes #135 from jiwq/SUBMARINE-321 and squashes the following commits:
    
    33c84e4 [Wanqiang Ji] SUBMARINE-321. Add JobManager and SubmitterManager 
components
---
 .../commons/utils/SubmarineConfiguration.java      |  44 ++++-
 submarine-server/pom.xml                           |   1 +
 submarine-server/server-api/pom.xml                |  34 ++++
 .../apache/submarine/server/api/JobHandler.java    |  70 +++++++
 .../apache/submarine/server/api/JobSubmitter.java} |  47 ++---
 .../exception/UnsupportedJobTypeException.java}    |  35 +---
 .../org/apache/submarine/server/api/job/Job.java   |  98 +++++++++
 .../org/apache/submarine/server/api/job/JobId.java | 129 ++++++++++++
 .../submarine/server/api/spec/JobLibrarySpec.java  | 119 +++++++++++
 .../apache/submarine/server/api/spec/JobSpec.java  | 118 +++++++++++
 .../server/api/spec/JobSubmitterSpec.java          | 114 +++++++++++
 .../submarine/server/api/spec/JobTaskSpec.java     | 219 +++++++++++++++++++++
 submarine-server/server-core/pom.xml               |   6 +
 .../org/apache/submarine/server/JobManager.java    |  93 +++++++++
 .../apache/submarine/server/SubmarineServer.java   |   6 +
 .../apache/submarine/server/SubmitterManager.java  | 102 ++++++++++
 .../submarine/server/jobserver/dao/Component.java  |  69 -------
 .../submarine/server/jobserver/dao/MLJobSpec.java  | 177 -----------------
 .../server/jobserver/rest/JobServerRestApi.java    |  12 +-
 .../server/jobserver/JobServerRestApiTest.java     |   2 +-
 .../submarine/server/rpc/SubmarineRpcServer.java   |  27 +--
 .../server/rpc/SubmarineRpcServerProto.java        |  27 +--
 .../server-submitter/submitter-k8s/pom.xml         |   6 +
 .../server/submitter/k8s/K8sJobRequest.java        |   1 -
 .../server/submitter/k8s/K8sJobSubmitter.java      |  57 +++++-
 .../server/submitter/k8s/model/MLJob.java          | 101 ++++++++++
 .../server/submitter/k8s/model/tfjob/TFJob.java}   |  48 ++---
 .../submitter/k8s/model/tfjob/TFJobSpec.java}      |  53 +++--
 .../submitter/k8s/model/tfjob/TFReplicaSpec.java   |  95 +++++++++
 .../server/submitter/k8s/parser/JobSpecParser.java | 135 +++++++++++++
 .../submitter/k8s/model/tfjob/TFJobTest.java}      |  49 ++---
 31 files changed, 1662 insertions(+), 432 deletions(-)

diff --git 
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
 
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
index 8de9655..5d6b4c2 100644
--- 
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
+++ 
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
@@ -28,9 +28,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.StringTokenizer;
 
 public class SubmarineConfiguration extends XMLConfiguration {
   private static final Logger LOG = 
LoggerFactory.getLogger(SubmarineConfiguration.class);
@@ -311,6 +313,43 @@ public class SubmarineConfiguration extends 
XMLConfiguration {
     return getString(ConfVars.WORKBENCH_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE);
   }
 
+  /**
+   * Get all submitters from configuration file
+   * @return list
+   */
+  public List<String> listSubmitter() {
+    List<String> values = new ArrayList<>();
+
+    String submitters = getString(ConfVars.SUBMARINE_SUBMITTERS.getVarName());
+    if (submitters != null) {
+      final String delim = ",";
+      StringTokenizer tokenizer = new StringTokenizer(submitters, delim);
+      while (tokenizer.hasMoreTokens()) {
+        values.add(tokenizer.nextToken());
+      }
+    }
+
+    return values;
+  }
+
+  /**
+   * Get the entry class name by the specified name
+   * @param name the submitter's name
+   * @return class name
+   */
+  public String getSubmitterEntry(String name) {
+    return 
getString(String.format(ConfVars.SUBMARINE_SUBMITTERS_ENTRY.getVarName(), 
name));
+  }
+
+  /**
+   * Get the submitter's classpath by the specified name
+   * @param name the submitter's name
+   * @return classpath
+   */
+  public String getSubmitterClassPath(String name) {
+    return 
getString(String.format(ConfVars.SUBMARINE_SUBMITTERS_CLASSPATH.getVarName(), 
name));
+  }
+
   private String getStringValue(String name, String d) {
     String value = this.properties.get(name);
     if (value != null) {
@@ -484,7 +523,10 @@ public class SubmarineConfiguration extends 
XMLConfiguration {
         "workbench.websocket.max.text.message.size", "1024000"),
     WORKBENCH_WEB_WAR("workbench.web.war", 
"submarine-workbench/workbench-web/dist"),
     SUBMARINE_RUNTIME_CLASS("submarine.runtime.class",
-        "org.apache.submarine.server.submitter.yarn.YarnRuntimeFactory");
+        "org.apache.submarine.server.submitter.yarn.YarnRuntimeFactory"),
+    SUBMARINE_SUBMITTERS("submarine.submitters", ""),
+    SUBMARINE_SUBMITTERS_ENTRY("submarine.submitters.%s.class", ""),
+    SUBMARINE_SUBMITTERS_CLASSPATH("submarine.submitters.%s.classpath", "");
 
     private String varName;
     @SuppressWarnings("rawtypes")
diff --git a/submarine-server/pom.xml b/submarine-server/pom.xml
index b026bd4..094ebd7 100644
--- a/submarine-server/pom.xml
+++ b/submarine-server/pom.xml
@@ -37,6 +37,7 @@
     <module>server-submitter</module>
     <module>server-core</module>
     <module>server-rpc</module>
+    <module>server-api</module>
   </modules>
 
   <dependencyManagement>
diff --git a/submarine-server/server-api/pom.xml 
b/submarine-server/server-api/pom.xml
new file mode 100644
index 0000000..84adc7f
--- /dev/null
+++ b/submarine-server/server-api/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>submarine-server</artifactId>
+    <groupId>org.apache.submarine</groupId>
+    <version>0.3.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>server-api</artifactId>
+  <name>Submarine: Server API</name>
+
+</project>
\ No newline at end of file
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
new file mode 100644
index 0000000..da3b686
--- /dev/null
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api;
+
+import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.spec.JobSpec;
+
+/**
+ * Handle the job's operate (CRUD)
+ */
+public interface JobHandler {
+  /**
+   * Submit job
+   * @param jobSpec job spec
+   * @return job object
+   * @throws UnsupportedJobTypeException caused by the unsupported job type
+   */
+  Job submitJob(JobSpec jobSpec) throws UnsupportedJobTypeException;
+
+  /**
+   * Get job info
+   * @param jobSpec job spec
+   * @return job object
+   * @throws UnsupportedJobTypeException caused by the unsupported job type
+   */
+  default Job getJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
+    // TODO should implementing later
+    return null;
+  }
+
+  /**
+   * Update job info
+   * @param jobSpec job spec
+   * @return job object
+   * @throws UnsupportedJobTypeException caused by the unsupported job type
+   */
+  default Job updateJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
+    // TODO should implementing later
+    return null;
+  }
+
+  /**
+   * Delete the specified job
+   * @param jobSpec job spec
+   * @return job object
+   * @throws UnsupportedJobTypeException caused by the unsupported job type
+   */
+  default Job deleteJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
+    // TODO should implementing later
+    return null;
+  }
+}
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
similarity index 62%
copy from 
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
copy to 
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
index 7533907..87f90d4 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
@@ -17,37 +17,22 @@
  * under the License.
  */
 
-package org.apache.submarine.server.jobserver.dao;
-
-// A process level environment variable.
-public class EnvVaraible {
-
-  public String getKey() {
-    return key;
-  }
-
-  public void setKey(String key) {
-    this.key = key;
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  public void setValue(String value) {
-    this.value = value;
-  }
-
-  String key;
-  String value;
-
-  public EnvVaraible() {}
-
-  public EnvVaraible(String k, String v) {
-    this.key = k;
-    this.value = v;
-  }
-
+package org.apache.submarine.server.api;
 
+/**
+ * The submitter should implement this interface.
+ */
+public interface JobSubmitter extends JobHandler {
+
+  /**
+   * Initialize the submitter related code
+   */
+  void initialize();
+
+  /**
+   * Get the submitter type which is the unique identifier.
+   * @return unique identifier
+   */
+  String getSubmitterType();
 
 }
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
similarity index 62%
copy from 
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
copy to 
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
index 7533907..2ede150 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
@@ -17,37 +17,10 @@
  * under the License.
  */
 
-package org.apache.submarine.server.jobserver.dao;
+package org.apache.submarine.server.api.exception;
 
-// A process level environment variable.
-public class EnvVaraible {
-
-  public String getKey() {
-    return key;
-  }
-
-  public void setKey(String key) {
-    this.key = key;
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  public void setValue(String value) {
-    this.value = value;
+public class UnsupportedJobTypeException extends Exception {
+  public UnsupportedJobTypeException() {
+    super("Unsupported Job Type Exception");
   }
-
-  String key;
-  String value;
-
-  public EnvVaraible() {}
-
-  public EnvVaraible(String k, String v) {
-    this.key = k;
-    this.value = v;
-  }
-
-
-
 }
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
new file mode 100644
index 0000000..b317d9d
--- /dev/null
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.job;
+
+import org.apache.submarine.server.api.spec.JobSpec;
+
+/**
+ * The Generic Machine Learning Job in Submarine.
+ */
+public class Job {
+  private JobId jobId;
+  private String name;
+  private String identifier;
+
+  /**
+   * Get the job instance
+   * @param jobId job id
+   * @param name job name
+   * @param identifier identifier
+   * @return object
+   */
+  public static Job newInstance(JobId jobId, String name, String identifier) {
+    Job job = new Job();
+    job.setJobId(jobId);
+    job.setName(name);
+    job.setIdentifier(identifier);
+    return job;
+  }
+
+  /**
+   * Get the job id which is unique in submarine
+   * @return job id
+   */
+  public JobId getJobId() {
+    return jobId;
+  }
+
+  /**
+   * Set the job id which generated by submarine
+   * @param jobId job id
+   */
+  public void setJobId(JobId jobId) {
+    this.jobId = jobId;
+  }
+
+  /**
+   * Get the job name which specified by user through the {@link JobSpec}
+   * @return the job name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Set the job name which specified by user
+   * @param name job name
+   */
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * The unique identifier for the job, used to retire the job info from the 
cluster management
+   * system.
+   *
+   * In YARN cluster it best to set the ApplicationId, and in K8s cluster it 
maybe the job name.
+   * @return the unique identifier
+   */
+  public String getIdentifier() {
+    return identifier;
+  }
+
+  /**
+   * Set the job identifier, in YARN cluster it best to set the application 
id, and in K8s cluster
+   * it maybe the job name.
+   * @param identifier application id (YARN) or Job Name (K8s)
+   */
+  public void setIdentifier(String identifier) {
+    this.identifier = identifier;
+  }
+}
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
new file mode 100644
index 0000000..bb0a4b6
--- /dev/null
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.job;
+
+/**
+ * The unique id for submarine's job. Formatter: 
job_${server_timestamp}_${counter}
+ * Such as: job_1577627710_0001
+ */
+public class JobId implements Comparable<JobId> {
+  private static final String jobIdStrPrefix = "job";
+  private static final String JOB_ID_PREFIX = jobIdStrPrefix + '_';
+  private static final int JOB_ID_MIN_DIGITS = 4;
+
+  private int id;
+  private long serverTimestamp;
+
+  /**
+   * Ge the object of JobId.
+   * @param serverTimestamp the timestamp when the server start
+   * @param id count
+   * @return object
+   */
+  public static JobId newInstance(long serverTimestamp, int id) {
+    JobId jobId = new JobId();
+    jobId.setId(id);
+    return jobId;
+  }
+
+  /**
+   * Get the count of job since the server started
+   * @return number
+   */
+  public int getId() {
+    return id;
+  }
+
+  /**
+   * Set the count of job
+   * @param id number
+   */
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  /**
+   * Get the timestamp(s) when the server started
+   * @return timestamp(s)
+   */
+  public long getServerTimestamp() {
+    return serverTimestamp;
+  }
+
+  /**
+   * Set the server started timestamp(s)
+   * @param timestamp seconds
+   */
+  public void setServerTimestamp(long timestamp) {
+    this.serverTimestamp = timestamp;
+  }
+
+  @Override
+  public int compareTo(JobId o) {
+    return this.getId() > o.getId() ? 1 : 0;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 371237;
+    int result = 6521;
+    result = prime * result + (int) (serverTimestamp ^ (serverTimestamp >>> 
32));
+    result = prime * result + getId();
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    JobId other = (JobId) obj;
+    return this.getId() != other.getId();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(64);
+    sb.append(JOB_ID_PREFIX).append(serverTimestamp).append("_");
+    format(sb, getId());
+    return sb.toString();
+  }
+
+  private void format(StringBuilder sb, long value) {
+    int minimumDigits = JOB_ID_MIN_DIGITS;
+    if (value < 0) {
+      sb.append('-');
+      value = -value;
+    }
+
+    long tmp = value;
+    do {
+      tmp /= 10;
+    } while (--minimumDigits > 0 && tmp > 0);
+
+    for (int i = minimumDigits; i > 0; --i) {
+      sb.append('0');
+    }
+    sb.append(value);
+  }
+}
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobLibrarySpec.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobLibrarySpec.java
new file mode 100644
index 0000000..236c138
--- /dev/null
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobLibrarySpec.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.spec;
+
+import java.util.Map;
+
+/**
+ * The machine learning related spec for job.
+ */
+public class JobLibrarySpec {
+  /**
+   * Machine Learning Framework name. Such as: TensorFlow/PyTorch etc.
+   */
+  private String name;
+
+  /**
+   * The version of ML framework. Such as: 2.1.0
+   */
+  private String version;
+
+  /**
+   * The public image used for each task if not specified. Such as: 
apache/submarine
+   */
+  private String image;
+
+  /**
+   * The public entry cmd for the task if not specified.
+   */
+  private String cmd;
+
+  /**
+   * The public env vars for the task if not specified.
+   */
+  private Map<String, String> envVars;
+
+  public JobLibrarySpec() {
+
+  }
+
+  /**
+   * Get the name of the machine learning library. Such as: TensorFlow or 
PyTorch
+   * @return the library's name
+   */
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Get the version of the library.
+   * @return the library's version
+   */
+  public String getVersion() {
+    return version;
+  }
+
+  public void setVersion(String version) {
+    this.version = version;
+  }
+
+  /**
+   * Get the image for the machine learning job. If the {@link 
JobTaskSpec#getImage()} not
+   * specified the image replaced with it.
+   * @return image url link.
+   */
+  public String getImage() {
+    return image;
+  }
+
+  public void setImage(String image) {
+    this.image = image;
+  }
+
+  /**
+   * The default entry command for job task. If the {@link 
JobTaskSpec#getCmd()} not specified
+   * the cmd replaced with it.
+   * @return cmd
+   */
+  public String getCmd() {
+    return cmd;
+  }
+
+  public void setCmd(String cmd) {
+    this.cmd = cmd;
+  }
+
+  /**
+   * The default env vars for job task. If the @{@link 
JobTaskSpec#getEnvVars()} not specified
+   * replaced with it.
+   * @return env vars
+   */
+  public Map<String, String> getEnvVars() {
+    return envVars;
+  }
+
+  public void setEnvVars(Map<String, String> envVars) {
+    this.envVars = envVars;
+  }
+}
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSpec.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSpec.java
new file mode 100644
index 0000000..bb88004
--- /dev/null
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSpec.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.spec;
+
+import java.util.Map;
+
+/**
+ * The submarine job spec for submarine job server. It consist of name, 
JobLibrarySpec,
+ * JobSubmitterSpec, the tasks.
+ */
+public class JobSpec {
+  /**
+   * The user-specified job name. Such as: mnist
+   */
+  private String name;
+
+  /**
+   * The user-specified ML framework spec.
+   */
+  private JobLibrarySpec librarySpec;
+
+  /**
+   * The user-specified submitter spec.
+   */
+  private JobSubmitterSpec submitterSpec;
+
+  /**
+   * The tasks of the job.
+   * Such as:
+   *   TensorFlow: Chief, Ps, Worker, Evaluator
+   *   PyTorch: Master, Worker
+   */
+  private Map<String, JobTaskSpec> taskSpecs;
+
+  public JobSpec() {
+
+  }
+
+  /**
+   * Get the job name which specified by user.
+   * @return job name
+   */
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Get the library spec.
+   * @return JobLibrarySpec
+   */
+  public JobLibrarySpec getLibrarySpec() {
+    return librarySpec;
+  }
+
+  public void setLibrarySpec(JobLibrarySpec librarySpec) {
+    this.librarySpec = librarySpec;
+  }
+
+  /**
+   * Get the submitter spec.
+   * @return JobSubmitterSpec
+   */
+  public JobSubmitterSpec getSubmitterSpec() {
+    return submitterSpec;
+  }
+
+  public void setSubmitterSpec(JobSubmitterSpec submitterSpec) {
+    this.submitterSpec = submitterSpec;
+  }
+
+  /**
+   * Get all tasks spec
+   * @return Map of JobTaskSpec
+   */
+  public Map<String, JobTaskSpec> getTaskSpecs() {
+    return taskSpecs;
+  }
+
+  public void setTaskSpecs(Map<String, JobTaskSpec> taskSpecs) {
+    this.taskSpecs = taskSpecs;
+  }
+
+  /**
+   * This could be file/directory which contains multiple python scripts.
+   * We should solve dependencies distribution in k8s or yarn.
+   * Or we could build images for each projects before submitting the job
+   * */
+  String projects;
+
+  public String getProjects() {
+    return projects;
+  }
+
+  public void setProjects(String projects) {
+    this.projects = projects;
+  }
+}
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSubmitterSpec.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSubmitterSpec.java
new file mode 100644
index 0000000..d012a26
--- /dev/null
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSubmitterSpec.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.spec;
+
+/**
+ * The spec of the specified JobSubmitter
+ */
+public class JobSubmitterSpec {
+  /**
+   * The type of JobSubmitter which will be selected to submit job. Such as: 
yarn/yarnservice/k8s
+   */
+  private String type;
+
+  /**
+   * The config path of the specified Resource Manager.
+   */
+  private String configPath;
+
+  /**
+   * It known as queue in Apache Hadoop YARN and namespace in Kubernetes.
+   */
+  private String namespace;
+
+  private String kind;
+
+  private String apiVersion;
+
+  public JobSubmitterSpec() {
+
+  }
+
+  /**
+   * Get the submitter type, which used to select the specified submitter to 
submit this job
+   * @return submitter type, such as yarn/yarnservice/k8s
+   */
+  public String getType() {
+    return type;
+  }
+
+  /**
+   * Set the submitter type, now supports the yarn/yarnservice/k8s.
+   * @param type yarn/yarnservice/k8s
+   */
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  /**
+   * Get the config path to initialize the submitter. In Apache Hadoop YARN 
cluster it maybe the
+   * path of xxx-site.xml and the kube config for Kubernetes.
+   * @return config path
+   */
+  public String getConfigPath() {
+    return configPath;
+  }
+
+  public void setConfigPath(String configPath) {
+    this.configPath = configPath;
+  }
+
+  /**
+   * Get the namespace to submit the job.
+   * It known as queue in Apache Hadoop YARN and namespace in Kubernetes.
+   * @return namespace
+   */
+  public String getNamespace() {
+    return namespace;
+  }
+
+  public void setNamespace(String namespace) {
+    this.namespace = namespace;
+  }
+
+  /**
+   * (K8s) Get the CRD kind, which will accept the job
+   * @return CRD kind, such as: TFJob/PyTorchJob
+   */
+  public String getKind() {
+    return kind;
+  }
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+  /**
+   * (K8s) Get the CRD api version
+   * @return version, such as: apache.org/submarine/v1
+   */
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+}
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobTaskSpec.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobTaskSpec.java
new file mode 100644
index 0000000..586f3ee
--- /dev/null
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobTaskSpec.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.spec;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The task spec for job, a job consists of tasks.
+ */
+public class JobTaskSpec {
+  /**
+   * The job task name, the range is [Chief, Ps, Worker, Evaluator, Master]
+   */
+  private String name;
+
+  /**
+   * The user-specified job task image. Such as: gcr.io/your-project/your-image
+   * If not specified will used the {@link JobLibrarySpec#getImage()}.
+   */
+  private String image;
+
+  /**
+   * The entry command for running task. If not specified will used the
+   * {@link JobLibrarySpec#getCmd()}.
+   */
+  private String cmd;
+
+  /**
+   * The env vars for the task.
+   */
+  private Map<String, String> envVars;
+
+  /**
+   * The limit resource for the task. Formatter: 
cpu=%s,memory=%s,nvidia.com/gpu=%s
+   * Resource type list:
+   *   <ul>
+   *     <li>cpu: In units of core. It known as vcore in YARN.</li>
+   *     <li>memory: In units of bytes. Using one of these suffixes: E, P, T, 
G, M, K</li>
+   *     <li>nvidia.com/gpu: </li>
+   *   </ul>
+   * Such as: cpu=4,memory=2048M,nvidia.com/gpu=1
+   */
+  private String resources;
+
+  /**
+   * Number of desired tasks. Defaults to 1.
+   */
+  private Integer replicas = 1;
+
+  private Map<String, String> resourceMap;
+
+  public JobTaskSpec() {
+
+  }
+
+  /**
+   * Get the task name which range is [Chief, Ps, Worker, Evaluator, Master]
+   * @return task name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Set the task name
+   * @param name task name which range is [Chief, Ps, Worker, Evaluator, 
Master]
+   */
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Get the image to start container for running task.
+   * @return image
+   */
+  public String getImage() {
+    return image;
+  }
+
+  /**
+   * Set the task image
+   * @param image image
+   */
+  public void setImage(String image) {
+    this.image = image;
+  }
+
+  /**
+   * Get the entry cmd.
+   * @return cmd
+   */
+  public String getCmd() {
+    return cmd;
+  }
+
+  /**
+   * Set the entry command to start the tasks
+   * @param cmd
+   */
+  public void setCmd(String cmd) {
+    this.cmd = cmd;
+  }
+
+  /**
+   * Get env vars.
+   * @return map
+   */
+  public Map<String, String> getEnvVars() {
+    return envVars;
+  }
+
+  /**
+   * Set the env vars for task
+   * @param envVars env map
+   */
+  public void setEnvVars(Map<String, String> envVars) {
+    this.envVars = envVars;
+  }
+
+  /**
+   * Get the resources. Formatter: cpu=%s,memory=%s,nvidia.com/gpu=%s
+   * Resource type list:
+   *   <ul>
+   *     <li>cpu: In units of core. It known as vcore in YARN.</li>
+   *     <li>memory: In units of bytes. Using one of these suffixes: E, P, T, 
G, M, K</li>
+   *     <li>nvidia.com/gpu: </li>
+   *   </ul>
+   * Such as: cpu=4,memory=2048M,nvidia.com/gpu=1
+   * @return resource format string
+   */
+  public String getResources() {
+    return resources;
+  }
+
+  /**
+   * Set the limit resources for task. Formatter: 
cpu=%s,memory=%s,nvidia.com/gpu=%s
+   * Resource type list:
+   *   <ul>
+   *     <li>cpu: In units of core. It known as vcore in YARN.</li>
+   *     <li>memory: In units of bytes. Using one of these suffixes: E, P, T, 
G, M, K</li>
+   *     <li>nvidia.com/gpu: </li>
+   *   </ul>
+   * @param resources resource, such as: cpu=4,memory=2048M,nvidia.com/gpu=1
+   */
+  public void setResources(String resources) {
+    this.resources = resources;
+    parseResources();
+  }
+
+  private void parseResources() {
+    if (resources != null) {
+      resourceMap = new HashMap<>();
+      for (String item : resources.split(",")) {
+        String[] r = item.split("=");
+        if (r.length == 2) {
+          resourceMap.put(r[0], r[1]);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get the number of desired tasks.
+   * @return number
+   */
+  public Integer getReplicas() {
+    return replicas;
+  }
+
+  /**
+   * Set the number of desired tasks.
+   * @param replicas number
+   */
+  public void setReplicas(Integer replicas) {
+    this.replicas = replicas;
+  }
+
+  /**
+   * Get cpu resource
+   * @return cpu
+   */
+  public String getCpu() {
+    return resourceMap.get("cpu");
+  }
+
+  /**
+   * Get memory resource
+   * @return memory
+   */
+  public String getMemory() {
+    return resourceMap.get("memory");
+  }
+
+  /**
+   * Get gpu resource
+   * @return gpu
+   */
+  public String getGpu() {
+    return resourceMap.get("nvidia.com/gpu");
+  }
+}
diff --git a/submarine-server/server-core/pom.xml 
b/submarine-server/server-core/pom.xml
index fc4f4b2..86339a2 100644
--- a/submarine-server/server-core/pom.xml
+++ b/submarine-server/server-core/pom.xml
@@ -40,6 +40,12 @@
 
     <dependency>
       <groupId>org.apache.submarine</groupId>
+      <artifactId>server-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.submarine</groupId>
       <artifactId>commons-metastore</artifactId>
       <version>${project.version}</version>
       <exclusions>
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
new file mode 100644
index 0000000..03c386d
--- /dev/null
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server;
+
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.api.JobHandler;
+import org.apache.submarine.server.api.JobSubmitter;
+import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobId;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * It responsible for manage the job (CRUD) and cached the job.
+ */
+public class JobManager implements JobHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
+  private static volatile JobManager manager;
+
+  private final AtomicInteger jobCounter = new AtomicInteger(0);
+
+  private final ConcurrentMap<JobId, Job> jobs = new ConcurrentHashMap<>();
+
+  private SubmitterManager submitterManager;
+  private ExecutorService executorService;
+
+  /**
+   * Get the singleton instance
+   * @return object
+   */
+  public static JobManager getInstance() {
+    if (manager == null) {
+      synchronized (JobManager.class) {
+        if (manager == null) {
+          SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
+          SubmitterManager submitterManager = new SubmitterManager(conf);
+          manager = new JobManager(submitterManager);
+        }
+      }
+    }
+    return manager;
+  }
+
+  private JobManager(SubmitterManager submitterManager) {
+    this.submitterManager = submitterManager;
+    this.executorService = Executors.newFixedThreadPool(50);
+  }
+
+  @Override
+  public Job submitJob(JobSpec spec) throws UnsupportedJobTypeException {
+    Job job = new Job();
+    job.setJobId(generateJobId());
+    executorService.submit(() -> {
+      try {
+        JobSubmitter submitter = submitterManager.getSubmitterByType(
+            spec.getSubmitterSpec().getType());
+        jobs.putIfAbsent(job.getJobId(), submitter.submitJob(spec));
+      } catch (UnsupportedJobTypeException e) {
+        LOG.warn(e.getMessage(), e);
+      }
+    });
+    return job;
+  }
+
+  private JobId generateJobId() {
+    return JobId.newInstance(SubmarineServer.getServerTimeStamp(), 
jobCounter.incrementAndGet());
+  }
+}
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
index 3cbe0f6..86f174b 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
@@ -58,12 +58,18 @@ import java.io.IOException;
 public class SubmarineServer extends ResourceConfig {
   private static final Logger LOG = 
LoggerFactory.getLogger(SubmarineServer.class);
 
+  private static long serverTimeStamp = System.currentTimeMillis();
+
   public static Server jettyWebServer;
   public static SubmarineRpcServer rpcServer;
   public static ServiceLocator sharedServiceLocator;
 
   private static SubmarineConfiguration conf = 
SubmarineConfiguration.getInstance();
 
+  public static long getServerTimeStamp() {
+    return serverTimeStamp;
+  }
+
   public static void main(String[] args) throws InterruptedException,
       IOException {
     
PropertyConfigurator.configure(ClassLoader.getSystemResource("log4j.properties"));
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
new file mode 100644
index 0000000..07069c8
--- /dev/null
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server;
+
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.api.JobSubmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Submitter Manager, load all the submitter plugin, configured by
+ * {@link SubmarineConfiguration.ConfVars#SUBMARINE_SUBMITTERS} and related 
key.
+ */
+public class SubmitterManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SubmitterManager.class);
+
+  private SubmarineConfiguration conf;
+  private ConcurrentMap<String, JobSubmitter> submitterMap = new 
ConcurrentHashMap<>();
+
+  public SubmitterManager(SubmarineConfiguration conf) {
+    this.conf = conf;
+    loadSubmitters();
+  }
+
+  private void loadSubmitters() {
+    LOG.info("Start load submitter plugins...");
+    List<String> list = conf.listSubmitter();
+    for (String name : list) {
+      String clazzName = conf.getSubmitterEntry(name);
+      String classpath = conf.getSubmitterClassPath(name);
+      try {
+        ClassLoader classLoader = new 
URLClassLoader(constructUrlsFromClasspath(classpath));
+        Class<?> clazz = Class.forName(clazzName, true, classLoader);
+        Class<? extends JobSubmitter> sClass = 
clazz.asSubclass(JobSubmitter.class);
+        Constructor<? extends JobSubmitter> method = 
sClass.getDeclaredConstructor();
+        JobSubmitter submitter = method.newInstance();
+        submitter.initialize();
+        submitterMap.put(submitter.getSubmitterType(), submitter);
+      } catch (Exception e) {
+        LOG.error(e.toString(), e);
+      }
+    }
+    LOG.info("Success loaded {} submitters.", submitterMap.size());
+  }
+
+  private URL[] constructUrlsFromClasspath(String classpath) throws 
MalformedURLException {
+    List<URL> urls = new ArrayList<>();
+    for (String path : classpath.split(File.pathSeparator)) {
+      if (path.endsWith("/*")) {
+        path = path.substring(0, path.length() - 2);
+      }
+
+      File file = new File(path);
+      if (file.isDirectory()) {
+        String[] items = file.list();
+        if (items != null) {
+          for (String item : items) {
+            urls.add(new File(item).toURI().toURL());
+          }
+        }
+      } else {
+        urls.add(file.toURI().toURL());
+      }
+    }
+    return urls.toArray(new URL[0]);
+  }
+
+  /**
+   * Get the specified submitter by submitter type
+   * @return submitter
+   */
+  public JobSubmitter getSubmitterByType(String type) {
+    return submitterMap.get(type);
+  }
+}
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/Component.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/Component.java
deleted file mode 100644
index 93b1279..0000000
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/Component.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.jobserver.dao;
-
-
-/**
- * One component contains role, count and resources.
- * The role name could be tensorflow ps, Pytorch master or tensorflow worker
- * The count is the count of the role instance
- * The resource is the memory/vcore/gpu resource strings in the format:
- * "memory=2048M,vcore=4,nvidia.com/gpu=1"
- */
-
-public class Component {
-
-  public String getRole() {
-    return role;
-  }
-
-  public void setRole(String role) {
-    this.role = role;
-  }
-
-  public String getCount() {
-    return count;
-  }
-
-  public void setCount(String count) {
-    this.count = count;
-  }
-
-  public String getResources() {
-    return resources;
-  }
-
-  public void setResources(String resources) {
-    this.resources = resources;
-  }
-
-  String role;
-  String count;
-  String resources;
-
-  public Component() {}
-
-  public Component(String r, String ct, String res) {
-    this.count = ct;
-    this.role = r;
-    this.resources = res;
-  }
-
-}
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/MLJobSpec.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/MLJobSpec.java
deleted file mode 100644
index d913a14..0000000
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/MLJobSpec.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.jobserver.dao;
-
-/**
- * The machine learning job spec the submarine job server can accept.
- * */
-public class MLJobSpec {
-
-  String apiVersion;
-  // The engine type the job will use, can be tensorflow or pytorch
-  String type;
-  // The engine version, not image version/tag. For instance, tensorflow v1.13
-  String version;
-  /**
-   * Advanced property. The RM this job will submit to, k8s or yarn.
-   * Could be the path of yarn’s config file or k8s kubeconfig file
-   * This should be settled when deploy submarine job server.
-   */
-  String rmConfig;
-
-  /**
-   * Advanced property. The image should be inferred from type and version.
-   * The normal user should not set this.
-   * */
-  String dockerImage;
-
-  // The process aware environment variable
-  EnvVaraible[] envVars;
-
-  // The components this cluster will consists.
-  Component[] components;
-
-  // The user id who submit job
-  String uid;
-
-  /**
-   * The queue this job will submitted to.
-   * In YARN, we call it queue. In k8s, no such concept.
-   * It could be namespace’s name.
-   */
-  String queue;
-
-  /**
-   * The user-specified job name for easy search
-   * */
-  String name;
-
-  /**
-   * This could be file/directory which contains multiple python scripts.
-   * We should solve dependencies distribution in k8s or yarn.
-   * Or we could build images for each projects before submitting the job
-   * */
-  String projects;
-
-  /**
-   * The command uses to start the job. This is very job-specific.
-   * We assume the cmd is the same for all containers in a cluster
-   * */
-  String cmd;
-
-  public MLJobSpec() {}
-
-  public String getUid() {
-    return uid;
-  }
-
-  public void setUid(String uid) {
-    this.uid = uid;
-  }
-
-  public String getQueue() {
-    return queue;
-  }
-
-  public void setQueue(String queue) {
-    this.queue = queue;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  public String getProjects() {
-    return projects;
-  }
-
-  public void setProjects(String projects) {
-    this.projects = projects;
-  }
-  public String getCmd() {
-    return cmd;
-  }
-
-  public void setCmd(String cmd) {
-    this.cmd = cmd;
-  }
-
-  public Component[] getComponents() {
-    return components;
-  }
-
-  public void setComponents(
-      Component[] components) {
-    this.components = components;
-  }
-  public String getType() {
-    return type;
-  }
-
-  public void setType(String type) {
-    this.type = type;
-  }
-
-  public String getVersion() {
-    return version;
-  }
-
-  public void setVersion(String version) {
-    this.version = version;
-  }
-
-  public String getRmConfig() {
-    return rmConfig;
-  }
-
-  public void setRmConfig(String rmConfig) {
-    this.rmConfig = rmConfig;
-  }
-
-  public String getDockerImage() {
-    return dockerImage;
-  }
-
-  public void setDockerImage(String dockerImage) {
-    this.dockerImage = dockerImage;
-  }
-
-  public EnvVaraible[] getEnvVars() {
-    return envVars;
-  }
-
-  public void setEnvVars(EnvVaraible[] envVars) {
-    this.envVars = envVars;
-  }
-
-  public String getApiVersion() {
-    return apiVersion;
-  }
-
-  public void setApiVersion(String apiVersion) {
-    this.apiVersion = apiVersion;
-  }
-
-
-}
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
index 8121241..f2b3f41 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
@@ -19,7 +19,7 @@
 package org.apache.submarine.server.jobserver.rest;
 
 import org.apache.submarine.server.rest.RestConstants;
-import org.apache.submarine.server.jobserver.dao.MLJobSpec;
+import org.apache.submarine.server.api.spec.JobSpec;
 import org.apache.submarine.server.response.JsonResponse;
 
 import javax.ws.rs.Consumes;
@@ -61,9 +61,9 @@ public class JobServerRestApi {
 
   @POST
   @Consumes({RestConstants.MEDIA_TYPE_YAML, MediaType.APPLICATION_JSON})
-  public Response submitJob(MLJobSpec jobSpec) {
+  public Response submitJob(JobSpec jobSpec) {
     // Submit the job spec through submitter
-    return new JsonResponse.Builder<MLJobSpec>(Response.Status.ACCEPTED)
+    return new JsonResponse.Builder<JobSpec>(Response.Status.ACCEPTED)
         .success(true).result(jobSpec).build();
   }
 
@@ -71,14 +71,14 @@ public class JobServerRestApi {
   @Path("{" + RestConstants.JOB_ID + "}")
   public Response listJob(@PathParam(RestConstants.JOB_ID) String id) {
     // Query the job status though submitter
-    return new JsonResponse.Builder<MLJobSpec>(Response.Status.OK)
+    return new JsonResponse.Builder<JobSpec>(Response.Status.OK)
         .success(true).result(id).build();
   }
 
   @GET
   public Response listAllJob() {
     // Query all the job status though submitter
-    return new JsonResponse.Builder<MLJobSpec>(Response.Status.OK)
+    return new JsonResponse.Builder<JobSpec>(Response.Status.OK)
         .success(true).build();
   }
 
@@ -86,7 +86,7 @@ public class JobServerRestApi {
   @Path("{" + RestConstants.JOB_ID + "}")
   public Response deleteJob(@PathParam(RestConstants.JOB_ID) String id) {
     // Delete the job though submitter
-    return new JsonResponse.Builder<MLJobSpec>(Response.Status.OK)
+    return new JsonResponse.Builder<JobSpec>(Response.Status.OK)
         .success(true).result(id).build();
   }
 
diff --git 
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
 
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
index 448faa7..d580e9d 100644
--- 
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
+++ 
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
@@ -70,7 +70,7 @@ public class JobServerRestApiTest extends 
AbstractSubmarineServerTest {
   // Test job created with correct JSON input
   @Test
   public void testCreateJobWhenJsonInputIsCorrectThenResponseCodeAccepted() 
throws IOException {
-    String jobSpec = "{\"type\": \"tensorflow\", \"version\":\"v1.13\"}";
+    String jobSpec = "{\"name\": \"mnist\"}";
 
     PostMethod response = httpPost("/api/" + RestConstants.V1 + "/" + 
RestConstants.JOBS, jobSpec);
     LOG.info(response.toString());
diff --git 
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
 
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
index 4ab0d31..2347659 100644
--- 
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
+++ 
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
@@ -1,18 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.submarine.server.rpc;
@@ -179,6 +181,7 @@ public class SubmarineRpcServer {
 
     protected ApplicationId run(ClientContext clientContext, Parameter 
parameter)
         throws IOException, YarnException {
+      // TODO replaced with JobManager
       JobSubmitter jobSubmitter =
           clientContext.getRuntimeFactory().getJobSubmitterInstance();
       ApplicationId applicationId = jobSubmitter.submitJob(parameter);
diff --git 
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
 
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
index 3bfe393..4c90cf2 100644
--- 
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
+++ 
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
@@ -1,23 +1,24 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.submarine.server.rpc;
 
-import com.google.protobuf.LazyStringArrayList;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
diff --git a/submarine-server/server-submitter/submitter-k8s/pom.xml 
b/submarine-server/server-submitter/submitter-k8s/pom.xml
index 4a2e3ed..96288b9 100644
--- a/submarine-server/server-submitter/submitter-k8s/pom.xml
+++ b/submarine-server/server-submitter/submitter-k8s/pom.xml
@@ -51,6 +51,12 @@
       <version>${k8s.client-java.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.submarine</groupId>
+      <artifactId>server-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <!--  Unit Tests  -->
     <dependency>
       <groupId>junit</groupId>
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
index 327a082..4e61010 100644
--- 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
@@ -22,7 +22,6 @@ package org.apache.submarine.server.submitter.k8s;
 /**
  * Job request for Kubernetes Submitter.
  */
-// TODO(jiwq): It should implement the JobRequest interface
 public class K8sJobRequest {
   private Path path;
   private Object body;
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
index 0e6f0ad..de946ff 100644
--- 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
@@ -19,6 +19,11 @@
 
 package org.apache.submarine.server.submitter.k8s;
 
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import io.kubernetes.client.ApiClient;
@@ -29,29 +34,67 @@ import io.kubernetes.client.models.V1DeleteOptions;
 import io.kubernetes.client.models.V1DeleteOptionsBuilder;
 import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
+import org.apache.submarine.server.api.JobSubmitter;
+import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.spec.JobSpec;
 import org.apache.submarine.server.submitter.k8s.model.CustomResourceJob;
 import org.apache.submarine.server.submitter.k8s.model.CustomResourceJobList;
+import org.apache.submarine.server.submitter.k8s.model.MLJob;
+import org.apache.submarine.server.submitter.k8s.parser.JobSpecParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileReader;
-import java.io.IOException;
-
 /**
  * JobSubmitter for Kubernetes Cluster.
  */
-// TODO(jiwq): It should implement the JobSubmitter interface
-public class K8sJobSubmitter {
+public class K8sJobSubmitter implements JobSubmitter {
   private final Logger LOG = LoggerFactory.getLogger(K8sJobSubmitter.class);
 
+  /**
+   * Key: kind of CRD, such as TFJob/PyTorchJob
+   * Value: the CRD api with version
+   */
+  private Map<String, String> supportedCRDMap;
+
   public K8sJobSubmitter(String confPath) throws IOException {
     ApiClient client =
         ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(new 
FileReader(confPath))).build();
     Configuration.setDefaultApiClient(client);
   }
 
-  public String submitJob(K8sJobRequest request) {
-    return "job_id";
+  @Override
+  public void initialize() {
+    supportedCRDMap = new HashMap<>();
+    supportedCRDMap.put("TFJob", "kubeflow.org/v1");
+  }
+
+  @Override
+  public String getSubmitterType() {
+    return "k8s";
+  }
+
+  @Override
+  public Job submitJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
+    if (!supportedCRDMap.containsKey(jobSpec.getSubmitterSpec().getType())) {
+      throw new UnsupportedJobTypeException();
+    }
+
+    Job job = new Job();
+    job.setName(jobSpec.getName());
+    createJob(JobSpecParser.parseTFJob(jobSpec));
+    return job;
+  }
+
+  @VisibleForTesting
+  void createJob(MLJob job) {
+    try {
+      CustomObjectsApi api = new CustomObjectsApi();
+      api.createNamespacedCustomObject(job.getGroup(), job.getVersion(),
+          job.getMetadata().getNamespace(), job.getMetadata().getName(), job, 
"true");
+    } catch (ApiException e) {
+      LOG.error("Create {} job: " + e.getMessage(), e);
+    }
   }
 
   @VisibleForTesting
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
new file mode 100644
index 0000000..04a3d53
--- /dev/null
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.model;
+
+import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.models.V1ObjectMeta;
+
+/**
+ * The abstract machine learning job for the CRD job
+ */
+public abstract class MLJob {
+  @SerializedName("apiVersion")
+  private String apiVersion;
+
+  @SerializedName("kind")
+  private String kind;
+
+  @SerializedName("metadata")
+  private V1ObjectMeta metadata;
+
+  /**
+   * Get the api with version
+   * @return api with version
+   */
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+  /**
+   * Set the api with version
+   * @param apiVersion api with version
+   */
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+
+  /**
+   * Get the kind
+   * @return kind, Default is TFJob
+   */
+  public String getKind() {
+    return kind;
+  }
+
+  /**
+   * Set the CRD's name, Default is TFJob
+   * @param kind the CRD's name
+   */
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+  /**
+   * Get the metadata
+   * @return meta
+   */
+  public V1ObjectMeta getMetadata() {
+    return metadata;
+  }
+
+  /**
+   * Set metadata
+   * @param metadata meta
+   */
+  public void setMetadata(V1ObjectMeta metadata) {
+    this.metadata = metadata;
+  }
+
+  /**
+   * Get the resource's group name
+   * @return group name
+   */
+  public String getGroup() {
+    return apiVersion.split("/")[0];
+  }
+
+  /**
+   * Get the resource's version
+   * @return version
+   */
+  public String getVersion() {
+    return apiVersion.split("/")[1];
+  }
+}
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
similarity index 55%
copy from 
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
copy to 
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
index 7533907..b290885 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
@@ -17,37 +17,37 @@
  * under the License.
  */
 
-package org.apache.submarine.server.jobserver.dao;
+package org.apache.submarine.server.submitter.k8s.model.tfjob;
 
-// A process level environment variable.
-public class EnvVaraible {
+import com.google.gson.annotations.SerializedName;
+import org.apache.submarine.server.submitter.k8s.model.MLJob;
 
-  public String getKey() {
-    return key;
-  }
+/**
+ * It's the tf-operator's entry model.
+ */
+public class TFJob extends MLJob {
 
-  public void setKey(String key) {
-    this.key = key;
-  }
+  @SerializedName("spec")
+  private TFJobSpec spec;
 
-  public String getValue() {
-    return value;
+  public TFJob() {
+    setApiVersion("kubeflow.org/v1");
+    setKind("TFJob");
   }
 
-  public void setValue(String value) {
-    this.value = value;
+  /**
+   * Get the job spec which contains all the info for TFJob.
+   * @return job spec
+   */
+  public TFJobSpec getSpec() {
+    return spec;
   }
 
-  String key;
-  String value;
-
-  public EnvVaraible() {}
-
-  public EnvVaraible(String k, String v) {
-    this.key = k;
-    this.value = v;
+  /**
+   * Set the spec, the entry of the TFJob
+   * @param spec job spec
+   */
+  public void setSpec(TFJobSpec spec) {
+    this.spec = spec;
   }
-
-
-
 }
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
similarity index 50%
copy from 
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
copy to 
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
index 7533907..5dde0f3 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
@@ -17,37 +17,36 @@
  * under the License.
  */
 
-package org.apache.submarine.server.jobserver.dao;
+package org.apache.submarine.server.submitter.k8s.model.tfjob;
 
-// A process level environment variable.
-public class EnvVaraible {
+import com.google.gson.annotations.SerializedName;
 
-  public String getKey() {
-    return key;
-  }
-
-  public void setKey(String key) {
-    this.key = key;
-  }
+import java.util.Map;
 
-  public String getValue() {
-    return value;
+/**
+ * The entry spec of TFJob. It contains some {@link TFReplicaSpec} which 
describe the info about
+ * the job task.
+ */
+public class TFJobSpec {
+  /**
+   * Key: Chief, Ps, Worker, Evaluator
+   */
+  @SerializedName("tfReplicaSpecs")
+  private Map<String, TFReplicaSpec> tfReplicaSpecs;
+
+  /**
+   * Get the replica specs.
+   * @return map
+   */
+  public Map<String, TFReplicaSpec> getTfReplicaSpecs() {
+    return tfReplicaSpecs;
   }
 
-  public void setValue(String value) {
-    this.value = value;
+  /**
+   * Set replica specs, the key's range is [Chief, Ps, Worker, Evaluator]
+   * @param tfReplicaSpecs map
+   */
+  public void setTfReplicaSpecs(Map<String, TFReplicaSpec> tfReplicaSpecs) {
+    this.tfReplicaSpecs = tfReplicaSpecs;
   }
-
-  String key;
-  String value;
-
-  public EnvVaraible() {}
-
-  public EnvVaraible(String k, String v) {
-    this.key = k;
-    this.value = v;
-  }
-
-
-
 }
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFReplicaSpec.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFReplicaSpec.java
new file mode 100644
index 0000000..1002ece
--- /dev/null
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFReplicaSpec.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.model.tfjob;
+
+import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.models.V1PodTemplateSpec;
+
+/**
+ * The replica spec for TFJob. It contains replicas, restart policy and pod 
template.
+ * The template describe the running instance for task.
+ */
+public class TFReplicaSpec {
+  @SerializedName("replicas")
+  private Integer replicas;
+
+  @SerializedName("template")
+  private V1PodTemplateSpec template;
+
+  /**
+   * Always, OnFailure, ExitCode, Never
+   */
+  @SerializedName("restartPolicy")
+  private String restartPolicy = "OnFailure";
+
+  public TFReplicaSpec() {
+
+  }
+
+  /**
+   * Number of desired pod.
+   * @return number
+   */
+  public Integer getReplicas() {
+    return replicas;
+  }
+
+  /**
+   * Set the number of desired pod
+   * @param replicas number
+   */
+  public void setReplicas(Integer replicas) {
+    this.replicas = replicas;
+  }
+
+  /**
+   * Get the pod template
+   * @return pod template spec
+   */
+  public V1PodTemplateSpec getTemplate() {
+    return template;
+  }
+
+  /**
+   * Set the pod template
+   * @param template pod template
+   */
+  public void setTemplate(V1PodTemplateSpec template) {
+    this.template = template;
+  }
+
+  /**
+   * Get the restart policy.
+   * Default is OnFailure.
+   * Supports: Always, OnFailure, ExitCode, Never
+   * @return policy name
+   */
+  public String getRestartPolicy() {
+    return restartPolicy;
+  }
+
+  /**
+   * Set the restart policy.
+   * @param restartPolicy policy name, the range is [Always, OnFailure, 
ExitCode, Never]
+   */
+  public void setRestartPolicy(String restartPolicy) {
+    this.restartPolicy = restartPolicy;
+  }
+}
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/JobSpecParser.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/JobSpecParser.java
new file mode 100644
index 0000000..2b4f633
--- /dev/null
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/JobSpecParser.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.parser;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.models.V1Container;
+import io.kubernetes.client.models.V1EnvVar;
+import io.kubernetes.client.models.V1ObjectMeta;
+import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.models.V1PodTemplateSpec;
+import io.kubernetes.client.models.V1ResourceRequirements;
+import org.apache.submarine.server.api.spec.JobLibrarySpec;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.apache.submarine.server.api.spec.JobTaskSpec;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobSpec;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFReplicaSpec;
+
+public class JobSpecParser {
+  /**
+   * Parse the job spec to {@link TFJob}
+   * @param jobSpec job spec
+   * @return the TFJob object
+   */
+  public static TFJob parseTFJob(JobSpec jobSpec) {
+    TFJob tfJob = new TFJob();
+    tfJob.setApiVersion(jobSpec.getSubmitterSpec().getApiVersion());
+    tfJob.setMetadata(parseTFMetadata(jobSpec));
+    tfJob.setSpec(parseTFJobSpec(jobSpec));
+    return tfJob;
+  }
+
+  private static V1ObjectMeta parseTFMetadata(JobSpec jobSpec) {
+    V1ObjectMeta meta = new V1ObjectMeta();
+    meta.setNamespace(jobSpec.getSubmitterSpec().getNamespace());
+    meta.setName(jobSpec.getName());
+    return meta;
+  }
+
+  private static TFJobSpec parseTFJobSpec(JobSpec jobSpec) {
+    TFJobSpec tfJobSpec = new TFJobSpec();
+    Map<String, TFReplicaSpec> replicaSpecMap = new HashMap<>();
+    for (Map.Entry<String, JobTaskSpec> entry : 
jobSpec.getTaskSpecs().entrySet()) {
+      TFReplicaSpec spec = new TFReplicaSpec();
+      spec.setReplicas(entry.getValue().getReplicas());
+      spec.setTemplate(parseTemplateSpec(entry.getValue(), 
jobSpec.getLibrarySpec()));
+      replicaSpecMap.put(entry.getValue().getName(), spec);
+    }
+    tfJobSpec.setTfReplicaSpecs(replicaSpecMap);
+    return tfJobSpec;
+  }
+
+  private static V1PodTemplateSpec parseTemplateSpec(JobTaskSpec taskSpec, 
JobLibrarySpec libSpec) {
+    V1PodTemplateSpec templateSpec = new V1PodTemplateSpec();
+    V1PodSpec podSpec = new V1PodSpec();
+    List<V1Container> containers = new ArrayList<>();
+    V1Container container = new V1Container();
+    container.setName(libSpec.getName().toLowerCase());
+    // image
+    if (taskSpec.getImage() != null) {
+      container.setImage(taskSpec.getImage());
+    } else {
+      container.setImage(libSpec.getImage());
+    }
+    // cmd
+    if (taskSpec.getCmd() != null) {
+      container.setCommand(Arrays.asList(taskSpec.getCmd().split(" ")));
+    } else {
+      container.setCommand(Arrays.asList(libSpec.getCmd().split(" ")));
+    }
+    // resources
+    V1ResourceRequirements resources = new V1ResourceRequirements();
+    resources.setLimits(parseResources(taskSpec));
+    container.setResources(resources);
+    container.setEnv(parseEnvVars(taskSpec, libSpec.getEnvVars()));
+    podSpec.setContainers(containers);
+    templateSpec.setSpec(podSpec);
+    return templateSpec;
+  }
+
+  private static List<V1EnvVar> parseEnvVars(JobTaskSpec spec, Map<String, 
String> defaultEnvs) {
+    if (spec.getEnvVars() != null) {
+      return parseEnvVars(spec.getEnvVars());
+    }
+    return parseEnvVars(defaultEnvs);
+  }
+
+  private static List<V1EnvVar> parseEnvVars(Map<String, String> envMap) {
+    List<V1EnvVar> envVars = new ArrayList<>();
+    for (Map.Entry<String, String> entry : envMap.entrySet()) {
+      V1EnvVar env = new V1EnvVar();
+      env.setName(entry.getKey());
+      env.setValue(entry.getValue());
+      envVars.add(env);
+    }
+    return envVars;
+  }
+
+  private static Map<String, Quantity> parseResources(JobTaskSpec taskSpec) {
+    Map<String, Quantity> resources = new HashMap<>();
+    if (taskSpec.getCpu() != null) {
+      resources.put("cpu", new Quantity(taskSpec.getCpu()));
+    }
+    if (taskSpec.getMemory() != null) {
+      resources.put("memory", new Quantity(taskSpec.getMemory()));
+    }
+    if (taskSpec.getGpu() != null) {
+      resources.put("nvidia.com/gpu", new Quantity(taskSpec.getGpu()));
+    }
+    return resources;
+  }
+}
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobTest.java
similarity index 58%
rename from 
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
rename to 
submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobTest.java
index 7533907..f3489ce 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobTest.java
@@ -17,37 +17,22 @@
  * under the License.
  */
 
-package org.apache.submarine.server.jobserver.dao;
-
-// A process level environment variable.
-public class EnvVaraible {
-
-  public String getKey() {
-    return key;
-  }
-
-  public void setKey(String key) {
-    this.key = key;
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  public void setValue(String value) {
-    this.value = value;
+package org.apache.submarine.server.submitter.k8s.model.tfjob;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.net.URL;
+
+public class TFJobTest {
+  @Test
+  public void testFromJson() throws Exception {
+    URL fileUrl = this.getClass().getResource("/tf_job_mnist.json");
+    Gson gson = new Gson();
+    TFJob tfJob = gson.fromJson(new FileReader(new File(fileUrl.toURI())), 
TFJob.class);
+    System.out.println(new 
GsonBuilder().setPrettyPrinting().create().toJson(tfJob));
   }
-
-  String key;
-  String value;
-
-  public EnvVaraible() {}
-
-  public EnvVaraible(String k, String v) {
-    this.key = k;
-    this.value = v;
-  }
-
-
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to