This is an automated email from the ASF dual-hosted git repository.
ztang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 6f2700f SUBMARINE-321. Add JobManager and SubmitterManager components
6f2700f is described below
commit 6f2700f09abb0eafd207c1ddcc0652067cd98ebf
Author: Wanqiang Ji <[email protected]>
AuthorDate: Mon Dec 30 00:53:53 2019 +0800
SUBMARINE-321. Add JobManager and SubmitterManager components
### What is this PR for?
The JobManager is responsible for cache and schedule the job to submitter.
The SubmitterManager help to load the different submitter plugins.
### What type of PR is it?
Feature
### Todos
[ ] Hook REST server and JobManager
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-321
### How should this be tested?
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Wanqiang Ji <[email protected]>
Closes #135 from jiwq/SUBMARINE-321 and squashes the following commits:
33c84e4 [Wanqiang Ji] SUBMARINE-321. Add JobManager and SubmitterManager
components
---
.../commons/utils/SubmarineConfiguration.java | 44 ++++-
submarine-server/pom.xml | 1 +
submarine-server/server-api/pom.xml | 34 ++++
.../apache/submarine/server/api/JobHandler.java | 70 +++++++
.../apache/submarine/server/api/JobSubmitter.java} | 47 ++---
.../exception/UnsupportedJobTypeException.java} | 35 +---
.../org/apache/submarine/server/api/job/Job.java | 98 +++++++++
.../org/apache/submarine/server/api/job/JobId.java | 129 ++++++++++++
.../submarine/server/api/spec/JobLibrarySpec.java | 119 +++++++++++
.../apache/submarine/server/api/spec/JobSpec.java | 118 +++++++++++
.../server/api/spec/JobSubmitterSpec.java | 114 +++++++++++
.../submarine/server/api/spec/JobTaskSpec.java | 219 +++++++++++++++++++++
submarine-server/server-core/pom.xml | 6 +
.../org/apache/submarine/server/JobManager.java | 93 +++++++++
.../apache/submarine/server/SubmarineServer.java | 6 +
.../apache/submarine/server/SubmitterManager.java | 102 ++++++++++
.../submarine/server/jobserver/dao/Component.java | 69 -------
.../submarine/server/jobserver/dao/MLJobSpec.java | 177 -----------------
.../server/jobserver/rest/JobServerRestApi.java | 12 +-
.../server/jobserver/JobServerRestApiTest.java | 2 +-
.../submarine/server/rpc/SubmarineRpcServer.java | 27 +--
.../server/rpc/SubmarineRpcServerProto.java | 27 +--
.../server-submitter/submitter-k8s/pom.xml | 6 +
.../server/submitter/k8s/K8sJobRequest.java | 1 -
.../server/submitter/k8s/K8sJobSubmitter.java | 57 +++++-
.../server/submitter/k8s/model/MLJob.java | 101 ++++++++++
.../server/submitter/k8s/model/tfjob/TFJob.java} | 48 ++---
.../submitter/k8s/model/tfjob/TFJobSpec.java} | 53 +++--
.../submitter/k8s/model/tfjob/TFReplicaSpec.java | 95 +++++++++
.../server/submitter/k8s/parser/JobSpecParser.java | 135 +++++++++++++
.../submitter/k8s/model/tfjob/TFJobTest.java} | 49 ++---
31 files changed, 1662 insertions(+), 432 deletions(-)
diff --git
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
index 8de9655..5d6b4c2 100644
---
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
+++
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
@@ -28,9 +28,11 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.StringTokenizer;
public class SubmarineConfiguration extends XMLConfiguration {
private static final Logger LOG =
LoggerFactory.getLogger(SubmarineConfiguration.class);
@@ -311,6 +313,43 @@ public class SubmarineConfiguration extends
XMLConfiguration {
return getString(ConfVars.WORKBENCH_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE);
}
+ /**
+ * Get all submitters from configuration file
+ * @return list
+ */
+ public List<String> listSubmitter() {
+ List<String> values = new ArrayList<>();
+
+ String submitters = getString(ConfVars.SUBMARINE_SUBMITTERS.getVarName());
+ if (submitters != null) {
+ final String delim = ",";
+ StringTokenizer tokenizer = new StringTokenizer(submitters, delim);
+ while (tokenizer.hasMoreTokens()) {
+ values.add(tokenizer.nextToken());
+ }
+ }
+
+ return values;
+ }
+
+ /**
+ * Get the entry class name by the specified name
+ * @param name the submitter's name
+ * @return class name
+ */
+ public String getSubmitterEntry(String name) {
+ return
getString(String.format(ConfVars.SUBMARINE_SUBMITTERS_ENTRY.getVarName(),
name));
+ }
+
+ /**
+ * Get the submitter's classpath by the specified name
+ * @param name the submitter's name
+ * @return classpath
+ */
+ public String getSubmitterClassPath(String name) {
+ return
getString(String.format(ConfVars.SUBMARINE_SUBMITTERS_CLASSPATH.getVarName(),
name));
+ }
+
private String getStringValue(String name, String d) {
String value = this.properties.get(name);
if (value != null) {
@@ -484,7 +523,10 @@ public class SubmarineConfiguration extends
XMLConfiguration {
"workbench.websocket.max.text.message.size", "1024000"),
WORKBENCH_WEB_WAR("workbench.web.war",
"submarine-workbench/workbench-web/dist"),
SUBMARINE_RUNTIME_CLASS("submarine.runtime.class",
- "org.apache.submarine.server.submitter.yarn.YarnRuntimeFactory");
+ "org.apache.submarine.server.submitter.yarn.YarnRuntimeFactory"),
+ SUBMARINE_SUBMITTERS("submarine.submitters", ""),
+ SUBMARINE_SUBMITTERS_ENTRY("submarine.submitters.%s.class", ""),
+ SUBMARINE_SUBMITTERS_CLASSPATH("submarine.submitters.%s.classpath", "");
private String varName;
@SuppressWarnings("rawtypes")
diff --git a/submarine-server/pom.xml b/submarine-server/pom.xml
index b026bd4..094ebd7 100644
--- a/submarine-server/pom.xml
+++ b/submarine-server/pom.xml
@@ -37,6 +37,7 @@
<module>server-submitter</module>
<module>server-core</module>
<module>server-rpc</module>
+ <module>server-api</module>
</modules>
<dependencyManagement>
diff --git a/submarine-server/server-api/pom.xml
b/submarine-server/server-api/pom.xml
new file mode 100644
index 0000000..84adc7f
--- /dev/null
+++ b/submarine-server/server-api/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>submarine-server</artifactId>
+ <groupId>org.apache.submarine</groupId>
+ <version>0.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>server-api</artifactId>
+ <name>Submarine: Server API</name>
+
+</project>
\ No newline at end of file
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
new file mode 100644
index 0000000..da3b686
--- /dev/null
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobHandler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api;
+
+import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.spec.JobSpec;
+
+/**
+ * Handle the job's operate (CRUD)
+ */
+public interface JobHandler {
+ /**
+ * Submit job
+ * @param jobSpec job spec
+ * @return job object
+ * @throws UnsupportedJobTypeException caused by the unsupported job type
+ */
+ Job submitJob(JobSpec jobSpec) throws UnsupportedJobTypeException;
+
+ /**
+ * Get job info
+ * @param jobSpec job spec
+ * @return job object
+ * @throws UnsupportedJobTypeException caused by the unsupported job type
+ */
+ default Job getJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
+ // TODO should implementing later
+ return null;
+ }
+
+ /**
+ * Update job info
+ * @param jobSpec job spec
+ * @return job object
+ * @throws UnsupportedJobTypeException caused by the unsupported job type
+ */
+ default Job updateJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
+ // TODO should implementing later
+ return null;
+ }
+
+ /**
+ * Delete the specified job
+ * @param jobSpec job spec
+ * @return job object
+ * @throws UnsupportedJobTypeException caused by the unsupported job type
+ */
+ default Job deleteJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
+ // TODO should implementing later
+ return null;
+ }
+}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
similarity index 62%
copy from
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
copy to
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
index 7533907..87f90d4 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/JobSubmitter.java
@@ -17,37 +17,22 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.dao;
-
-// A process level environment variable.
-public class EnvVaraible {
-
- public String getKey() {
- return key;
- }
-
- public void setKey(String key) {
- this.key = key;
- }
-
- public String getValue() {
- return value;
- }
-
- public void setValue(String value) {
- this.value = value;
- }
-
- String key;
- String value;
-
- public EnvVaraible() {}
-
- public EnvVaraible(String k, String v) {
- this.key = k;
- this.value = v;
- }
-
+package org.apache.submarine.server.api;
+/**
+ * The submitter should implement this interface.
+ */
+public interface JobSubmitter extends JobHandler {
+
+ /**
+ * Initialize the submitter related code
+ */
+ void initialize();
+
+ /**
+ * Get the submitter type which is the unique identifier.
+ * @return unique identifier
+ */
+ String getSubmitterType();
}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
similarity index 62%
copy from
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
copy to
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
index 7533907..2ede150 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/exception/UnsupportedJobTypeException.java
@@ -17,37 +17,10 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.dao;
+package org.apache.submarine.server.api.exception;
-// A process level environment variable.
-public class EnvVaraible {
-
- public String getKey() {
- return key;
- }
-
- public void setKey(String key) {
- this.key = key;
- }
-
- public String getValue() {
- return value;
- }
-
- public void setValue(String value) {
- this.value = value;
+public class UnsupportedJobTypeException extends Exception {
+ public UnsupportedJobTypeException() {
+ super("Unsupported Job Type Exception");
}
-
- String key;
- String value;
-
- public EnvVaraible() {}
-
- public EnvVaraible(String k, String v) {
- this.key = k;
- this.value = v;
- }
-
-
-
}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
new file mode 100644
index 0000000..b317d9d
--- /dev/null
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/Job.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.job;
+
+import org.apache.submarine.server.api.spec.JobSpec;
+
+/**
+ * The Generic Machine Learning Job in Submarine.
+ */
+public class Job {
+ private JobId jobId;
+ private String name;
+ private String identifier;
+
+ /**
+ * Get the job instance
+ * @param jobId job id
+ * @param name job name
+ * @param identifier identifier
+ * @return object
+ */
+ public static Job newInstance(JobId jobId, String name, String identifier) {
+ Job job = new Job();
+ job.setJobId(jobId);
+ job.setName(name);
+ job.setIdentifier(identifier);
+ return job;
+ }
+
+ /**
+ * Get the job id which is unique in submarine
+ * @return job id
+ */
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ /**
+ * Set the job id which generated by submarine
+ * @param jobId job id
+ */
+ public void setJobId(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ /**
+ * Get the job name which specified by user through the {@link JobSpec}
+ * @return the job name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Set the job name which specified by user
+ * @param name job name
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * The unique identifier for the job, used to retire the job info from the
cluster management
+ * system.
+ *
+ * In YARN cluster it best to set the ApplicationId, and in K8s cluster it
maybe the job name.
+ * @return the unique identifier
+ */
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ /**
+ * Set the job identifier, in YARN cluster it best to set the application
id, and in K8s cluster
+ * it maybe the job name.
+ * @param identifier application id (YARN) or Job Name (K8s)
+ */
+ public void setIdentifier(String identifier) {
+ this.identifier = identifier;
+ }
+}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
new file mode 100644
index 0000000..bb0a4b6
--- /dev/null
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobId.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.job;
+
+/**
+ * The unique id for submarine's job. Formatter:
job_${server_timestamp}_${counter}
+ * Such as: job_1577627710_0001
+ */
+public class JobId implements Comparable<JobId> {
+ private static final String jobIdStrPrefix = "job";
+ private static final String JOB_ID_PREFIX = jobIdStrPrefix + '_';
+ private static final int JOB_ID_MIN_DIGITS = 4;
+
+ private int id;
+ private long serverTimestamp;
+
+ /**
+ * Ge the object of JobId.
+ * @param serverTimestamp the timestamp when the server start
+ * @param id count
+ * @return object
+ */
+ public static JobId newInstance(long serverTimestamp, int id) {
+ JobId jobId = new JobId();
+ jobId.setId(id);
+ return jobId;
+ }
+
+ /**
+ * Get the count of job since the server started
+ * @return number
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Set the count of job
+ * @param id number
+ */
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ /**
+ * Get the timestamp(s) when the server started
+ * @return timestamp(s)
+ */
+ public long getServerTimestamp() {
+ return serverTimestamp;
+ }
+
+ /**
+ * Set the server started timestamp(s)
+ * @param timestamp seconds
+ */
+ public void setServerTimestamp(long timestamp) {
+ this.serverTimestamp = timestamp;
+ }
+
+ @Override
+ public int compareTo(JobId o) {
+ return this.getId() > o.getId() ? 1 : 0;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 371237;
+ int result = 6521;
+ result = prime * result + (int) (serverTimestamp ^ (serverTimestamp >>>
32));
+ result = prime * result + getId();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ JobId other = (JobId) obj;
+ return this.getId() != other.getId();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(64);
+ sb.append(JOB_ID_PREFIX).append(serverTimestamp).append("_");
+ format(sb, getId());
+ return sb.toString();
+ }
+
+ private void format(StringBuilder sb, long value) {
+ int minimumDigits = JOB_ID_MIN_DIGITS;
+ if (value < 0) {
+ sb.append('-');
+ value = -value;
+ }
+
+ long tmp = value;
+ do {
+ tmp /= 10;
+ } while (--minimumDigits > 0 && tmp > 0);
+
+ for (int i = minimumDigits; i > 0; --i) {
+ sb.append('0');
+ }
+ sb.append(value);
+ }
+}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobLibrarySpec.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobLibrarySpec.java
new file mode 100644
index 0000000..236c138
--- /dev/null
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobLibrarySpec.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.spec;
+
+import java.util.Map;
+
+/**
+ * The machine learning related spec for job.
+ */
+public class JobLibrarySpec {
+ /**
+ * Machine Learning Framework name. Such as: TensorFlow/PyTorch etc.
+ */
+ private String name;
+
+ /**
+ * The version of ML framework. Such as: 2.1.0
+ */
+ private String version;
+
+ /**
+ * The public image used for each task if not specified. Such as:
apache/submarine
+ */
+ private String image;
+
+ /**
+ * The public entry cmd for the task if not specified.
+ */
+ private String cmd;
+
+ /**
+ * The public env vars for the task if not specified.
+ */
+ private Map<String, String> envVars;
+
+ public JobLibrarySpec() {
+
+ }
+
+ /**
+ * Get the name of the machine learning library. Such as: TensorFlow or
PyTorch
+ * @return the library's name
+ */
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the version of the library.
+ * @return the library's version
+ */
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ /**
+ * Get the image for the machine learning job. If the {@link
JobTaskSpec#getImage()} not
+ * specified the image replaced with it.
+ * @return image url link.
+ */
+ public String getImage() {
+ return image;
+ }
+
+ public void setImage(String image) {
+ this.image = image;
+ }
+
+ /**
+ * The default entry command for job task. If the {@link
JobTaskSpec#getCmd()} not specified
+ * the cmd replaced with it.
+ * @return cmd
+ */
+ public String getCmd() {
+ return cmd;
+ }
+
+ public void setCmd(String cmd) {
+ this.cmd = cmd;
+ }
+
+ /**
+ * The default env vars for job task. If the @{@link
JobTaskSpec#getEnvVars()} not specified
+ * replaced with it.
+ * @return env vars
+ */
+ public Map<String, String> getEnvVars() {
+ return envVars;
+ }
+
+ public void setEnvVars(Map<String, String> envVars) {
+ this.envVars = envVars;
+ }
+}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSpec.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSpec.java
new file mode 100644
index 0000000..bb88004
--- /dev/null
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSpec.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.spec;
+
+import java.util.Map;
+
+/**
+ * The submarine job spec for submarine job server. It consist of name,
JobLibrarySpec,
+ * JobSubmitterSpec, the tasks.
+ */
+public class JobSpec {
+ /**
+ * The user-specified job name. Such as: mnist
+ */
+ private String name;
+
+ /**
+ * The user-specified ML framework spec.
+ */
+ private JobLibrarySpec librarySpec;
+
+ /**
+ * The user-specified submitter spec.
+ */
+ private JobSubmitterSpec submitterSpec;
+
+ /**
+ * The tasks of the job.
+ * Such as:
+ * TensorFlow: Chief, Ps, Worker, Evaluator
+ * PyTorch: Master, Worker
+ */
+ private Map<String, JobTaskSpec> taskSpecs;
+
+ public JobSpec() {
+
+ }
+
+ /**
+ * Get the job name which specified by user.
+ * @return job name
+ */
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the library spec.
+ * @return JobLibrarySpec
+ */
+ public JobLibrarySpec getLibrarySpec() {
+ return librarySpec;
+ }
+
+ public void setLibrarySpec(JobLibrarySpec librarySpec) {
+ this.librarySpec = librarySpec;
+ }
+
+ /**
+ * Get the submitter spec.
+ * @return JobSubmitterSpec
+ */
+ public JobSubmitterSpec getSubmitterSpec() {
+ return submitterSpec;
+ }
+
+ public void setSubmitterSpec(JobSubmitterSpec submitterSpec) {
+ this.submitterSpec = submitterSpec;
+ }
+
+ /**
+ * Get all tasks spec
+ * @return Map of JobTaskSpec
+ */
+ public Map<String, JobTaskSpec> getTaskSpecs() {
+ return taskSpecs;
+ }
+
+ public void setTaskSpecs(Map<String, JobTaskSpec> taskSpecs) {
+ this.taskSpecs = taskSpecs;
+ }
+
+ /**
+ * This could be file/directory which contains multiple python scripts.
+ * We should solve dependencies distribution in k8s or yarn.
+ * Or we could build images for each projects before submitting the job
+ * */
+ String projects;
+
+ public String getProjects() {
+ return projects;
+ }
+
+ public void setProjects(String projects) {
+ this.projects = projects;
+ }
+}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSubmitterSpec.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSubmitterSpec.java
new file mode 100644
index 0000000..d012a26
--- /dev/null
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobSubmitterSpec.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.spec;
+
+/**
+ * The spec of the specified JobSubmitter
+ */
+public class JobSubmitterSpec {
+ /**
+ * The type of JobSubmitter which will be selected to submit job. Such as:
yarn/yarnservice/k8s
+ */
+ private String type;
+
+ /**
+ * The config path of the specified Resource Manager.
+ */
+ private String configPath;
+
+ /**
+ * It known as queue in Apache Hadoop YARN and namespace in Kubernetes.
+ */
+ private String namespace;
+
+ private String kind;
+
+ private String apiVersion;
+
+ public JobSubmitterSpec() {
+
+ }
+
+ /**
+ * Get the submitter type, which used to select the specified submitter to
submit this job
+ * @return submitter type, such as yarn/yarnservice/k8s
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * Set the submitter type, now supports the yarn/yarnservice/k8s.
+ * @param type yarn/yarnservice/k8s
+ */
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ /**
+ * Get the config path to initialize the submitter. In Apache Hadoop YARN
cluster it maybe the
+ * path of xxx-site.xml and the kube config for Kubernetes.
+ * @return config path
+ */
+ public String getConfigPath() {
+ return configPath;
+ }
+
+ public void setConfigPath(String configPath) {
+ this.configPath = configPath;
+ }
+
+ /**
+ * Get the namespace to submit the job.
+ * It known as queue in Apache Hadoop YARN and namespace in Kubernetes.
+ * @return namespace
+ */
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ /**
+ * (K8s) Get the CRD kind, which will accept the job
+ * @return CRD kind, such as: TFJob/PyTorchJob
+ */
+ public String getKind() {
+ return kind;
+ }
+
+ public void setKind(String kind) {
+ this.kind = kind;
+ }
+
+ /**
+ * (K8s) Get the CRD api version
+ * @return version, such as: apache.org/submarine/v1
+ */
+ public String getApiVersion() {
+ return apiVersion;
+ }
+
+ public void setApiVersion(String apiVersion) {
+ this.apiVersion = apiVersion;
+ }
+}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobTaskSpec.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobTaskSpec.java
new file mode 100644
index 0000000..586f3ee
--- /dev/null
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/spec/JobTaskSpec.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.api.spec;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The task spec for job, a job consists of tasks.
+ */
+public class JobTaskSpec {
+ /**
+ * The job task name, the range is [Chief, Ps, Worker, Evaluator, Master]
+ */
+ private String name;
+
+ /**
+ * The user-specified job task image. Such as: gcr.io/your-project/your-image
+ * If not specified will used the {@link JobLibrarySpec#getImage()}.
+ */
+ private String image;
+
+ /**
+ * The entry command for running task. If not specified will used the
+ * {@link JobLibrarySpec#getCmd()}.
+ */
+ private String cmd;
+
+ /**
+ * The env vars for the task.
+ */
+ private Map<String, String> envVars;
+
+ /**
+ * The limit resource for the task. Formatter:
cpu=%s,memory=%s,nvidia.com/gpu=%s
+ * Resource type list:
+ * <ul>
+ * <li>cpu: In units of core. It known as vcore in YARN.</li>
+ * <li>memory: In units of bytes. Using one of these suffixes: E, P, T,
G, M, K</li>
+ * <li>nvidia.com/gpu: </li>
+ * </ul>
+ * Such as: cpu=4,memory=2048M,nvidia.com/gpu=1
+ */
+ private String resources;
+
+ /**
+ * Number of desired tasks. Defaults to 1.
+ */
+ private Integer replicas = 1;
+
+ private Map<String, String> resourceMap;
+
+ public JobTaskSpec() {
+
+ }
+
+ /**
+ * Get the task name which range is [Chief, Ps, Worker, Evaluator, Master]
+ * @return task name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Set the task name
+ * @param name task name which range is [Chief, Ps, Worker, Evaluator,
Master]
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the image to start container for running task.
+ * @return image
+ */
+ public String getImage() {
+ return image;
+ }
+
+ /**
+ * Set the task image
+ * @param image image
+ */
+ public void setImage(String image) {
+ this.image = image;
+ }
+
+ /**
+ * Get the entry cmd.
+ * @return cmd
+ */
+ public String getCmd() {
+ return cmd;
+ }
+
+ /**
+ * Set the entry command to start the tasks
+ * @param cmd
+ */
+ public void setCmd(String cmd) {
+ this.cmd = cmd;
+ }
+
+ /**
+ * Get env vars.
+ * @return map
+ */
+ public Map<String, String> getEnvVars() {
+ return envVars;
+ }
+
+ /**
+ * Set the env vars for task
+ * @param envVars env map
+ */
+ public void setEnvVars(Map<String, String> envVars) {
+ this.envVars = envVars;
+ }
+
+ /**
+ * Get the resources. Formatter: cpu=%s,memory=%s,nvidia.com/gpu=%s
+ * Resource type list:
+ * <ul>
+ * <li>cpu: In units of core. It known as vcore in YARN.</li>
+ * <li>memory: In units of bytes. Using one of these suffixes: E, P, T,
G, M, K</li>
+ * <li>nvidia.com/gpu: </li>
+ * </ul>
+ * Such as: cpu=4,memory=2048M,nvidia.com/gpu=1
+ * @return resource format string
+ */
+ public String getResources() {
+ return resources;
+ }
+
+ /**
+ * Set the limit resources for task. Formatter:
cpu=%s,memory=%s,nvidia.com/gpu=%s
+ * Resource type list:
+ * <ul>
+ * <li>cpu: In units of core. It known as vcore in YARN.</li>
+ * <li>memory: In units of bytes. Using one of these suffixes: E, P, T,
G, M, K</li>
+ * <li>nvidia.com/gpu: </li>
+ * </ul>
+ * @param resources resource, such as: cpu=4,memory=2048M,nvidia.com/gpu=1
+ */
+ public void setResources(String resources) {
+ this.resources = resources;
+ parseResources();
+ }
+
+ private void parseResources() {
+ if (resources != null) {
+ resourceMap = new HashMap<>();
+ for (String item : resources.split(",")) {
+ String[] r = item.split("=");
+ if (r.length == 2) {
+ resourceMap.put(r[0], r[1]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the number of desired tasks.
+ * @return number
+ */
+ public Integer getReplicas() {
+ return replicas;
+ }
+
+ /**
+ * Set the number of desired tasks.
+ * @param replicas number
+ */
+ public void setReplicas(Integer replicas) {
+ this.replicas = replicas;
+ }
+
+ /**
+ * Get cpu resource
+ * @return cpu
+ */
+ public String getCpu() {
+ return resourceMap.get("cpu");
+ }
+
+ /**
+ * Get memory resource
+ * @return memory
+ */
+ public String getMemory() {
+ return resourceMap.get("memory");
+ }
+
+ /**
+ * Get gpu resource
+ * @return gpu
+ */
+ public String getGpu() {
+ return resourceMap.get("nvidia.com/gpu");
+ }
+}
diff --git a/submarine-server/server-core/pom.xml
b/submarine-server/server-core/pom.xml
index fc4f4b2..86339a2 100644
--- a/submarine-server/server-core/pom.xml
+++ b/submarine-server/server-core/pom.xml
@@ -40,6 +40,12 @@
<dependency>
<groupId>org.apache.submarine</groupId>
+ <artifactId>server-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.submarine</groupId>
<artifactId>commons-metastore</artifactId>
<version>${project.version}</version>
<exclusions>
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
new file mode 100644
index 0000000..03c386d
--- /dev/null
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server;
+
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.api.JobHandler;
+import org.apache.submarine.server.api.JobSubmitter;
+import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobId;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * It responsible for manage the job (CRUD) and cached the job.
+ */
+public class JobManager implements JobHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
+ private static volatile JobManager manager;
+
+ private final AtomicInteger jobCounter = new AtomicInteger(0);
+
+ private final ConcurrentMap<JobId, Job> jobs = new ConcurrentHashMap<>();
+
+ private SubmitterManager submitterManager;
+ private ExecutorService executorService;
+
+ /**
+ * Get the singleton instance
+ * @return object
+ */
+ public static JobManager getInstance() {
+ if (manager == null) {
+ synchronized (JobManager.class) {
+ if (manager == null) {
+ SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
+ SubmitterManager submitterManager = new SubmitterManager(conf);
+ manager = new JobManager(submitterManager);
+ }
+ }
+ }
+ return manager;
+ }
+
+ private JobManager(SubmitterManager submitterManager) {
+ this.submitterManager = submitterManager;
+ this.executorService = Executors.newFixedThreadPool(50);
+ }
+
+ @Override
+ public Job submitJob(JobSpec spec) throws UnsupportedJobTypeException {
+ Job job = new Job();
+ job.setJobId(generateJobId());
+ executorService.submit(() -> {
+ try {
+ JobSubmitter submitter = submitterManager.getSubmitterByType(
+ spec.getSubmitterSpec().getType());
+ jobs.putIfAbsent(job.getJobId(), submitter.submitJob(spec));
+ } catch (UnsupportedJobTypeException e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ });
+ return job;
+ }
+
+ private JobId generateJobId() {
+ return JobId.newInstance(SubmarineServer.getServerTimeStamp(),
jobCounter.incrementAndGet());
+ }
+}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
index 3cbe0f6..86f174b 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
@@ -58,12 +58,18 @@ import java.io.IOException;
public class SubmarineServer extends ResourceConfig {
private static final Logger LOG =
LoggerFactory.getLogger(SubmarineServer.class);
+ private static long serverTimeStamp = System.currentTimeMillis();
+
public static Server jettyWebServer;
public static SubmarineRpcServer rpcServer;
public static ServiceLocator sharedServiceLocator;
private static SubmarineConfiguration conf =
SubmarineConfiguration.getInstance();
+ public static long getServerTimeStamp() {
+ return serverTimeStamp;
+ }
+
public static void main(String[] args) throws InterruptedException,
IOException {
PropertyConfigurator.configure(ClassLoader.getSystemResource("log4j.properties"));
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
new file mode 100644
index 0000000..07069c8
--- /dev/null
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmitterManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server;
+
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.apache.submarine.server.api.JobSubmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Submitter Manager, load all the submitter plugin, configured by
+ * {@link SubmarineConfiguration.ConfVars#SUBMARINE_SUBMITTERS} and related
key.
+ */
+public class SubmitterManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(SubmitterManager.class);
+
+ private SubmarineConfiguration conf;
+ private ConcurrentMap<String, JobSubmitter> submitterMap = new
ConcurrentHashMap<>();
+
+ public SubmitterManager(SubmarineConfiguration conf) {
+ this.conf = conf;
+ loadSubmitters();
+ }
+
+ private void loadSubmitters() {
+ LOG.info("Start load submitter plugins...");
+ List<String> list = conf.listSubmitter();
+ for (String name : list) {
+ String clazzName = conf.getSubmitterEntry(name);
+ String classpath = conf.getSubmitterClassPath(name);
+ try {
+ ClassLoader classLoader = new
URLClassLoader(constructUrlsFromClasspath(classpath));
+ Class<?> clazz = Class.forName(clazzName, true, classLoader);
+ Class<? extends JobSubmitter> sClass =
clazz.asSubclass(JobSubmitter.class);
+ Constructor<? extends JobSubmitter> method =
sClass.getDeclaredConstructor();
+ JobSubmitter submitter = method.newInstance();
+ submitter.initialize();
+ submitterMap.put(submitter.getSubmitterType(), submitter);
+ } catch (Exception e) {
+ LOG.error(e.toString(), e);
+ }
+ }
+ LOG.info("Success loaded {} submitters.", submitterMap.size());
+ }
+
+ private URL[] constructUrlsFromClasspath(String classpath) throws
MalformedURLException {
+ List<URL> urls = new ArrayList<>();
+ for (String path : classpath.split(File.pathSeparator)) {
+ if (path.endsWith("/*")) {
+ path = path.substring(0, path.length() - 2);
+ }
+
+ File file = new File(path);
+ if (file.isDirectory()) {
+ String[] items = file.list();
+ if (items != null) {
+ for (String item : items) {
+ urls.add(new File(item).toURI().toURL());
+ }
+ }
+ } else {
+ urls.add(file.toURI().toURL());
+ }
+ }
+ return urls.toArray(new URL[0]);
+ }
+
+ /**
+ * Get the specified submitter by submitter type
+ * @return submitter
+ */
+ public JobSubmitter getSubmitterByType(String type) {
+ return submitterMap.get(type);
+ }
+}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/Component.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/Component.java
deleted file mode 100644
index 93b1279..0000000
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/Component.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.jobserver.dao;
-
-
-/**
- * One component contains role, count and resources.
- * The role name could be tensorflow ps, Pytorch master or tensorflow worker
- * The count is the count of the role instance
- * The resource is the memory/vcore/gpu resource strings in the format:
- * "memory=2048M,vcore=4,nvidia.com/gpu=1"
- */
-
-public class Component {
-
- public String getRole() {
- return role;
- }
-
- public void setRole(String role) {
- this.role = role;
- }
-
- public String getCount() {
- return count;
- }
-
- public void setCount(String count) {
- this.count = count;
- }
-
- public String getResources() {
- return resources;
- }
-
- public void setResources(String resources) {
- this.resources = resources;
- }
-
- String role;
- String count;
- String resources;
-
- public Component() {}
-
- public Component(String r, String ct, String res) {
- this.count = ct;
- this.role = r;
- this.resources = res;
- }
-
-}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/MLJobSpec.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/MLJobSpec.java
deleted file mode 100644
index d913a14..0000000
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/MLJobSpec.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.server.jobserver.dao;
-
-/**
- * The machine learning job spec the submarine job server can accept.
- * */
-public class MLJobSpec {
-
- String apiVersion;
- // The engine type the job will use, can be tensorflow or pytorch
- String type;
- // The engine version, not image version/tag. For instance, tensorflow v1.13
- String version;
- /**
- * Advanced property. The RM this job will submit to, k8s or yarn.
- * Could be the path of yarn’s config file or k8s kubeconfig file
- * This should be settled when deploy submarine job server.
- */
- String rmConfig;
-
- /**
- * Advanced property. The image should be inferred from type and version.
- * The normal user should not set this.
- * */
- String dockerImage;
-
- // The process aware environment variable
- EnvVaraible[] envVars;
-
- // The components this cluster will consists.
- Component[] components;
-
- // The user id who submit job
- String uid;
-
- /**
- * The queue this job will submitted to.
- * In YARN, we call it queue. In k8s, no such concept.
- * It could be namespace’s name.
- */
- String queue;
-
- /**
- * The user-specified job name for easy search
- * */
- String name;
-
- /**
- * This could be file/directory which contains multiple python scripts.
- * We should solve dependencies distribution in k8s or yarn.
- * Or we could build images for each projects before submitting the job
- * */
- String projects;
-
- /**
- * The command uses to start the job. This is very job-specific.
- * We assume the cmd is the same for all containers in a cluster
- * */
- String cmd;
-
- public MLJobSpec() {}
-
- public String getUid() {
- return uid;
- }
-
- public void setUid(String uid) {
- this.uid = uid;
- }
-
- public String getQueue() {
- return queue;
- }
-
- public void setQueue(String queue) {
- this.queue = queue;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getProjects() {
- return projects;
- }
-
- public void setProjects(String projects) {
- this.projects = projects;
- }
- public String getCmd() {
- return cmd;
- }
-
- public void setCmd(String cmd) {
- this.cmd = cmd;
- }
-
- public Component[] getComponents() {
- return components;
- }
-
- public void setComponents(
- Component[] components) {
- this.components = components;
- }
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getVersion() {
- return version;
- }
-
- public void setVersion(String version) {
- this.version = version;
- }
-
- public String getRmConfig() {
- return rmConfig;
- }
-
- public void setRmConfig(String rmConfig) {
- this.rmConfig = rmConfig;
- }
-
- public String getDockerImage() {
- return dockerImage;
- }
-
- public void setDockerImage(String dockerImage) {
- this.dockerImage = dockerImage;
- }
-
- public EnvVaraible[] getEnvVars() {
- return envVars;
- }
-
- public void setEnvVars(EnvVaraible[] envVars) {
- this.envVars = envVars;
- }
-
- public String getApiVersion() {
- return apiVersion;
- }
-
- public void setApiVersion(String apiVersion) {
- this.apiVersion = apiVersion;
- }
-
-
-}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
index 8121241..f2b3f41 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/rest/JobServerRestApi.java
@@ -19,7 +19,7 @@
package org.apache.submarine.server.jobserver.rest;
import org.apache.submarine.server.rest.RestConstants;
-import org.apache.submarine.server.jobserver.dao.MLJobSpec;
+import org.apache.submarine.server.api.spec.JobSpec;
import org.apache.submarine.server.response.JsonResponse;
import javax.ws.rs.Consumes;
@@ -61,9 +61,9 @@ public class JobServerRestApi {
@POST
@Consumes({RestConstants.MEDIA_TYPE_YAML, MediaType.APPLICATION_JSON})
- public Response submitJob(MLJobSpec jobSpec) {
+ public Response submitJob(JobSpec jobSpec) {
// Submit the job spec through submitter
- return new JsonResponse.Builder<MLJobSpec>(Response.Status.ACCEPTED)
+ return new JsonResponse.Builder<JobSpec>(Response.Status.ACCEPTED)
.success(true).result(jobSpec).build();
}
@@ -71,14 +71,14 @@ public class JobServerRestApi {
@Path("{" + RestConstants.JOB_ID + "}")
public Response listJob(@PathParam(RestConstants.JOB_ID) String id) {
// Query the job status though submitter
- return new JsonResponse.Builder<MLJobSpec>(Response.Status.OK)
+ return new JsonResponse.Builder<JobSpec>(Response.Status.OK)
.success(true).result(id).build();
}
@GET
public Response listAllJob() {
// Query all the job status though submitter
- return new JsonResponse.Builder<MLJobSpec>(Response.Status.OK)
+ return new JsonResponse.Builder<JobSpec>(Response.Status.OK)
.success(true).build();
}
@@ -86,7 +86,7 @@ public class JobServerRestApi {
@Path("{" + RestConstants.JOB_ID + "}")
public Response deleteJob(@PathParam(RestConstants.JOB_ID) String id) {
// Delete the job though submitter
- return new JsonResponse.Builder<MLJobSpec>(Response.Status.OK)
+ return new JsonResponse.Builder<JobSpec>(Response.Status.OK)
.success(true).result(id).build();
}
diff --git
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
index 448faa7..d580e9d 100644
---
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
+++
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/jobserver/JobServerRestApiTest.java
@@ -70,7 +70,7 @@ public class JobServerRestApiTest extends
AbstractSubmarineServerTest {
// Test job created with correct JSON input
@Test
public void testCreateJobWhenJsonInputIsCorrectThenResponseCodeAccepted()
throws IOException {
- String jobSpec = "{\"type\": \"tensorflow\", \"version\":\"v1.13\"}";
+ String jobSpec = "{\"name\": \"mnist\"}";
PostMethod response = httpPost("/api/" + RestConstants.V1 + "/" +
RestConstants.JOBS, jobSpec);
LOG.info(response.toString());
diff --git
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
index 4ab0d31..2347659 100644
---
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
+++
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
@@ -1,18 +1,20 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.submarine.server.rpc;
@@ -179,6 +181,7 @@ public class SubmarineRpcServer {
protected ApplicationId run(ClientContext clientContext, Parameter
parameter)
throws IOException, YarnException {
+ // TODO replaced with JobManager
JobSubmitter jobSubmitter =
clientContext.getRuntimeFactory().getJobSubmitterInstance();
ApplicationId applicationId = jobSubmitter.submitJob(parameter);
diff --git
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
index 3bfe393..4c90cf2 100644
---
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
+++
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
@@ -1,23 +1,24 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.submarine.server.rpc;
-import com.google.protobuf.LazyStringArrayList;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
diff --git a/submarine-server/server-submitter/submitter-k8s/pom.xml
b/submarine-server/server-submitter/submitter-k8s/pom.xml
index 4a2e3ed..96288b9 100644
--- a/submarine-server/server-submitter/submitter-k8s/pom.xml
+++ b/submarine-server/server-submitter/submitter-k8s/pom.xml
@@ -51,6 +51,12 @@
<version>${k8s.client-java.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>server-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- Unit Tests -->
<dependency>
<groupId>junit</groupId>
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
index 327a082..4e61010 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobRequest.java
@@ -22,7 +22,6 @@ package org.apache.submarine.server.submitter.k8s;
/**
* Job request for Kubernetes Submitter.
*/
-// TODO(jiwq): It should implement the JobRequest interface
public class K8sJobRequest {
private Path path;
private Object body;
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
index 0e6f0ad..de946ff 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
@@ -19,6 +19,11 @@
package org.apache.submarine.server.submitter.k8s;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import io.kubernetes.client.ApiClient;
@@ -29,29 +34,67 @@ import io.kubernetes.client.models.V1DeleteOptions;
import io.kubernetes.client.models.V1DeleteOptionsBuilder;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
+import org.apache.submarine.server.api.JobSubmitter;
+import org.apache.submarine.server.api.exception.UnsupportedJobTypeException;
+import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.spec.JobSpec;
import org.apache.submarine.server.submitter.k8s.model.CustomResourceJob;
import org.apache.submarine.server.submitter.k8s.model.CustomResourceJobList;
+import org.apache.submarine.server.submitter.k8s.model.MLJob;
+import org.apache.submarine.server.submitter.k8s.parser.JobSpecParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileReader;
-import java.io.IOException;
-
/**
* JobSubmitter for Kubernetes Cluster.
*/
-// TODO(jiwq): It should implement the JobSubmitter interface
-public class K8sJobSubmitter {
+public class K8sJobSubmitter implements JobSubmitter {
private final Logger LOG = LoggerFactory.getLogger(K8sJobSubmitter.class);
+ /**
+ * Key: kind of CRD, such as TFJob/PyTorchJob
+ * Value: the CRD api with version
+ */
+ private Map<String, String> supportedCRDMap;
+
public K8sJobSubmitter(String confPath) throws IOException {
ApiClient client =
ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(new
FileReader(confPath))).build();
Configuration.setDefaultApiClient(client);
}
- public String submitJob(K8sJobRequest request) {
- return "job_id";
+ @Override
+ public void initialize() {
+ supportedCRDMap = new HashMap<>();
+ supportedCRDMap.put("TFJob", "kubeflow.org/v1");
+ }
+
+ @Override
+ public String getSubmitterType() {
+ return "k8s";
+ }
+
+ @Override
+ public Job submitJob(JobSpec jobSpec) throws UnsupportedJobTypeException {
+ if (!supportedCRDMap.containsKey(jobSpec.getSubmitterSpec().getType())) {
+ throw new UnsupportedJobTypeException();
+ }
+
+ Job job = new Job();
+ job.setName(jobSpec.getName());
+ createJob(JobSpecParser.parseTFJob(jobSpec));
+ return job;
+ }
+
+ @VisibleForTesting
+ void createJob(MLJob job) {
+ try {
+ CustomObjectsApi api = new CustomObjectsApi();
+ api.createNamespacedCustomObject(job.getGroup(), job.getVersion(),
+ job.getMetadata().getNamespace(), job.getMetadata().getName(), job,
"true");
+ } catch (ApiException e) {
+ LOG.error("Create {} job: " + e.getMessage(), e);
+ }
}
@VisibleForTesting
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
new file mode 100644
index 0000000..04a3d53
--- /dev/null
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/MLJob.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.model;
+
+import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.models.V1ObjectMeta;
+
+/**
+ * The abstract machine learning job for the CRD job
+ */
+public abstract class MLJob {
+ @SerializedName("apiVersion")
+ private String apiVersion;
+
+ @SerializedName("kind")
+ private String kind;
+
+ @SerializedName("metadata")
+ private V1ObjectMeta metadata;
+
+ /**
+ * Get the api with version
+ * @return api with version
+ */
+ public String getApiVersion() {
+ return apiVersion;
+ }
+
+ /**
+ * Set the api with version
+ * @param apiVersion api with version
+ */
+ public void setApiVersion(String apiVersion) {
+ this.apiVersion = apiVersion;
+ }
+
+ /**
+ * Get the kind
+ * @return kind, Default is TFJob
+ */
+ public String getKind() {
+ return kind;
+ }
+
+ /**
+ * Set the CRD's name, Default is TFJob
+ * @param kind the CRD's name
+ */
+ public void setKind(String kind) {
+ this.kind = kind;
+ }
+
+ /**
+ * Get the metadata
+ * @return meta
+ */
+ public V1ObjectMeta getMetadata() {
+ return metadata;
+ }
+
+ /**
+ * Set metadata
+ * @param metadata meta
+ */
+ public void setMetadata(V1ObjectMeta metadata) {
+ this.metadata = metadata;
+ }
+
+ /**
+ * Get the resource's group name
+ * @return group name
+ */
+ public String getGroup() {
+ return apiVersion.split("/")[0];
+ }
+
+ /**
+ * Get the resource's version
+ * @return version
+ */
+ public String getVersion() {
+ return apiVersion.split("/")[1];
+ }
+}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
similarity index 55%
copy from
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
copy to
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
index 7533907..b290885 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJob.java
@@ -17,37 +17,37 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.dao;
+package org.apache.submarine.server.submitter.k8s.model.tfjob;
-// A process level environment variable.
-public class EnvVaraible {
+import com.google.gson.annotations.SerializedName;
+import org.apache.submarine.server.submitter.k8s.model.MLJob;
- public String getKey() {
- return key;
- }
+/**
+ * It's the tf-operator's entry model.
+ */
+public class TFJob extends MLJob {
- public void setKey(String key) {
- this.key = key;
- }
+ @SerializedName("spec")
+ private TFJobSpec spec;
- public String getValue() {
- return value;
+ public TFJob() {
+ setApiVersion("kubeflow.org/v1");
+ setKind("TFJob");
}
- public void setValue(String value) {
- this.value = value;
+ /**
+ * Get the job spec which contains all the info for TFJob.
+ * @return job spec
+ */
+ public TFJobSpec getSpec() {
+ return spec;
}
- String key;
- String value;
-
- public EnvVaraible() {}
-
- public EnvVaraible(String k, String v) {
- this.key = k;
- this.value = v;
+ /**
+ * Set the spec, the entry of the TFJob
+ * @param spec job spec
+ */
+ public void setSpec(TFJobSpec spec) {
+ this.spec = spec;
}
-
-
-
}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
similarity index 50%
copy from
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
copy to
submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
index 7533907..5dde0f3 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobSpec.java
@@ -17,37 +17,36 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.dao;
+package org.apache.submarine.server.submitter.k8s.model.tfjob;
-// A process level environment variable.
-public class EnvVaraible {
+import com.google.gson.annotations.SerializedName;
- public String getKey() {
- return key;
- }
-
- public void setKey(String key) {
- this.key = key;
- }
+import java.util.Map;
- public String getValue() {
- return value;
+/**
+ * The entry spec of TFJob. It contains some {@link TFReplicaSpec} which
describe the info about
+ * the job task.
+ */
+public class TFJobSpec {
+ /**
+ * Key: Chief, Ps, Worker, Evaluator
+ */
+ @SerializedName("tfReplicaSpecs")
+ private Map<String, TFReplicaSpec> tfReplicaSpecs;
+
+ /**
+ * Get the replica specs.
+ * @return map
+ */
+ public Map<String, TFReplicaSpec> getTfReplicaSpecs() {
+ return tfReplicaSpecs;
}
- public void setValue(String value) {
- this.value = value;
+ /**
+ * Set replica specs, the key's range is [Chief, Ps, Worker, Evaluator]
+ * @param tfReplicaSpecs map
+ */
+ public void setTfReplicaSpecs(Map<String, TFReplicaSpec> tfReplicaSpecs) {
+ this.tfReplicaSpecs = tfReplicaSpecs;
}
-
- String key;
- String value;
-
- public EnvVaraible() {}
-
- public EnvVaraible(String k, String v) {
- this.key = k;
- this.value = v;
- }
-
-
-
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFReplicaSpec.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFReplicaSpec.java
new file mode 100644
index 0000000..1002ece
--- /dev/null
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFReplicaSpec.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.model.tfjob;
+
+import com.google.gson.annotations.SerializedName;
+import io.kubernetes.client.models.V1PodTemplateSpec;
+
+/**
+ * The replica spec for TFJob. It contains replicas, restart policy and pod
template.
+ * The template describe the running instance for task.
+ */
+public class TFReplicaSpec {
+ @SerializedName("replicas")
+ private Integer replicas;
+
+ @SerializedName("template")
+ private V1PodTemplateSpec template;
+
+ /**
+ * Always, OnFailure, ExitCode, Never
+ */
+ @SerializedName("restartPolicy")
+ private String restartPolicy = "OnFailure";
+
+ public TFReplicaSpec() {
+
+ }
+
+ /**
+ * Number of desired pod.
+ * @return number
+ */
+ public Integer getReplicas() {
+ return replicas;
+ }
+
+ /**
+ * Set the number of desired pod
+ * @param replicas number
+ */
+ public void setReplicas(Integer replicas) {
+ this.replicas = replicas;
+ }
+
+ /**
+ * Get the pod template
+ * @return pod template spec
+ */
+ public V1PodTemplateSpec getTemplate() {
+ return template;
+ }
+
+ /**
+ * Set the pod template
+ * @param template pod template
+ */
+ public void setTemplate(V1PodTemplateSpec template) {
+ this.template = template;
+ }
+
+ /**
+ * Get the restart policy.
+ * Default is OnFailure.
+ * Supports: Always, OnFailure, ExitCode, Never
+ * @return policy name
+ */
+ public String getRestartPolicy() {
+ return restartPolicy;
+ }
+
+ /**
+ * Set the restart policy.
+ * @param restartPolicy policy name, the range is [Always, OnFailure,
ExitCode, Never]
+ */
+ public void setRestartPolicy(String restartPolicy) {
+ this.restartPolicy = restartPolicy;
+ }
+}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/JobSpecParser.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/JobSpecParser.java
new file mode 100644
index 0000000..2b4f633
--- /dev/null
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/parser/JobSpecParser.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.submitter.k8s.parser;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.models.V1Container;
+import io.kubernetes.client.models.V1EnvVar;
+import io.kubernetes.client.models.V1ObjectMeta;
+import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.models.V1PodTemplateSpec;
+import io.kubernetes.client.models.V1ResourceRequirements;
+import org.apache.submarine.server.api.spec.JobLibrarySpec;
+import org.apache.submarine.server.api.spec.JobSpec;
+import org.apache.submarine.server.api.spec.JobTaskSpec;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJobSpec;
+import org.apache.submarine.server.submitter.k8s.model.tfjob.TFReplicaSpec;
+
+public class JobSpecParser {
+ /**
+ * Parse the job spec to {@link TFJob}
+ * @param jobSpec job spec
+ * @return the TFJob object
+ */
+ public static TFJob parseTFJob(JobSpec jobSpec) {
+ TFJob tfJob = new TFJob();
+ tfJob.setApiVersion(jobSpec.getSubmitterSpec().getApiVersion());
+ tfJob.setMetadata(parseTFMetadata(jobSpec));
+ tfJob.setSpec(parseTFJobSpec(jobSpec));
+ return tfJob;
+ }
+
+ private static V1ObjectMeta parseTFMetadata(JobSpec jobSpec) {
+ V1ObjectMeta meta = new V1ObjectMeta();
+ meta.setNamespace(jobSpec.getSubmitterSpec().getNamespace());
+ meta.setName(jobSpec.getName());
+ return meta;
+ }
+
+ private static TFJobSpec parseTFJobSpec(JobSpec jobSpec) {
+ TFJobSpec tfJobSpec = new TFJobSpec();
+ Map<String, TFReplicaSpec> replicaSpecMap = new HashMap<>();
+ for (Map.Entry<String, JobTaskSpec> entry :
jobSpec.getTaskSpecs().entrySet()) {
+ TFReplicaSpec spec = new TFReplicaSpec();
+ spec.setReplicas(entry.getValue().getReplicas());
+ spec.setTemplate(parseTemplateSpec(entry.getValue(),
jobSpec.getLibrarySpec()));
+ replicaSpecMap.put(entry.getValue().getName(), spec);
+ }
+ tfJobSpec.setTfReplicaSpecs(replicaSpecMap);
+ return tfJobSpec;
+ }
+
+ private static V1PodTemplateSpec parseTemplateSpec(JobTaskSpec taskSpec,
JobLibrarySpec libSpec) {
+ V1PodTemplateSpec templateSpec = new V1PodTemplateSpec();
+ V1PodSpec podSpec = new V1PodSpec();
+ List<V1Container> containers = new ArrayList<>();
+ V1Container container = new V1Container();
+ container.setName(libSpec.getName().toLowerCase());
+ // image
+ if (taskSpec.getImage() != null) {
+ container.setImage(taskSpec.getImage());
+ } else {
+ container.setImage(libSpec.getImage());
+ }
+ // cmd
+ if (taskSpec.getCmd() != null) {
+ container.setCommand(Arrays.asList(taskSpec.getCmd().split(" ")));
+ } else {
+ container.setCommand(Arrays.asList(libSpec.getCmd().split(" ")));
+ }
+ // resources
+ V1ResourceRequirements resources = new V1ResourceRequirements();
+ resources.setLimits(parseResources(taskSpec));
+ container.setResources(resources);
+ container.setEnv(parseEnvVars(taskSpec, libSpec.getEnvVars()));
+ podSpec.setContainers(containers);
+ templateSpec.setSpec(podSpec);
+ return templateSpec;
+ }
+
+ private static List<V1EnvVar> parseEnvVars(JobTaskSpec spec, Map<String,
String> defaultEnvs) {
+ if (spec.getEnvVars() != null) {
+ return parseEnvVars(spec.getEnvVars());
+ }
+ return parseEnvVars(defaultEnvs);
+ }
+
+ private static List<V1EnvVar> parseEnvVars(Map<String, String> envMap) {
+ List<V1EnvVar> envVars = new ArrayList<>();
+ for (Map.Entry<String, String> entry : envMap.entrySet()) {
+ V1EnvVar env = new V1EnvVar();
+ env.setName(entry.getKey());
+ env.setValue(entry.getValue());
+ envVars.add(env);
+ }
+ return envVars;
+ }
+
+ private static Map<String, Quantity> parseResources(JobTaskSpec taskSpec) {
+ Map<String, Quantity> resources = new HashMap<>();
+ if (taskSpec.getCpu() != null) {
+ resources.put("cpu", new Quantity(taskSpec.getCpu()));
+ }
+ if (taskSpec.getMemory() != null) {
+ resources.put("memory", new Quantity(taskSpec.getMemory()));
+ }
+ if (taskSpec.getGpu() != null) {
+ resources.put("nvidia.com/gpu", new Quantity(taskSpec.getGpu()));
+ }
+ return resources;
+ }
+}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobTest.java
similarity index 58%
rename from
submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
rename to
submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobTest.java
index 7533907..f3489ce 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/jobserver/dao/EnvVaraible.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/test/java/org/apache/submarine/server/submitter/k8s/model/tfjob/TFJobTest.java
@@ -17,37 +17,22 @@
* under the License.
*/
-package org.apache.submarine.server.jobserver.dao;
-
-// A process level environment variable.
-public class EnvVaraible {
-
- public String getKey() {
- return key;
- }
-
- public void setKey(String key) {
- this.key = key;
- }
-
- public String getValue() {
- return value;
- }
-
- public void setValue(String value) {
- this.value = value;
+package org.apache.submarine.server.submitter.k8s.model.tfjob;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.net.URL;
+
+public class TFJobTest {
+ @Test
+ public void testFromJson() throws Exception {
+ URL fileUrl = this.getClass().getResource("/tf_job_mnist.json");
+ Gson gson = new Gson();
+ TFJob tfJob = gson.fromJson(new FileReader(new File(fileUrl.toURI())),
TFJob.class);
+ System.out.println(new
GsonBuilder().setPrettyPrinting().create().toJson(tfJob));
}
-
- String key;
- String value;
-
- public EnvVaraible() {}
-
- public EnvVaraible(String k, String v) {
- this.key = k;
- this.value = v;
- }
-
-
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]