http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java new file mode 100644 index 0000000..b6a39b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineException.java @@ -0,0 +1,21 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.common.exception; + +public class SubmarineException extends Exception { + public SubmarineException(String msg) { + super(msg); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java new file mode 100644 index 0000000..4fb74fd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/exception/SubmarineRuntimeException.java @@ -0,0 +1,25 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.common.exception; + +public class SubmarineRuntimeException extends RuntimeException { + public SubmarineRuntimeException(String s) { + super(s); + } + + public SubmarineRuntimeException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java new file mode 100644 index 0000000..fe8956a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java @@ -0,0 +1,84 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.common.fs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.submarine.client.cli.CliConstants; +import org.apache.hadoop.yarn.submarine.common.ClientContext; + +import java.io.IOException; + +/** + * Manages remote directories for staging, log, etc. + * TODO, need to properly handle permission / name validation, etc. + */ +public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager { + FileSystem fs; + + public DefaultRemoteDirectoryManager(ClientContext context) { + try { + this.fs = FileSystem.get(context.getYarnConfig()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Path getJobStagingArea(String jobName, boolean create) throws IOException { + Path staging = new Path(getJobRootFolder(jobName), "staging"); + if (create) { + createFolderIfNotExist(staging); + } + return staging; + } + + @Override + public Path getJobCheckpointDir(String jobName, boolean create) + throws IOException { + Path jobDir = new Path(getJobStagingArea(jobName, create), + CliConstants.CHECKPOINT_PATH); + if (create) { + createFolderIfNotExist(jobDir); + } + return jobDir; + } + + @Override + public Path getModelDir(String modelName, boolean create) throws IOException { + Path modelDir = new Path(new Path("submarine", "models"), modelName); + if (create) { + createFolderIfNotExist(modelDir); + } + return modelDir; + } + + @Override + public FileSystem getFileSystem() { + return fs; + } + + private Path getJobRootFolder(String jobName) throws IOException { + return new Path(new Path("submarine", "jobs"), jobName); + } + + private void createFolderIfNotExist(Path path) throws IOException { + if (!fs.exists(path)) { + if (!fs.mkdirs(path)) { + throw new IOException("Failed to create folder=" + path); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java new file mode 100644 index 0000000..132b314 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java @@ -0,0 +1,30 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.common.fs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +public interface RemoteDirectoryManager { + Path getJobStagingArea(String jobName, boolean create) throws IOException; + + Path getJobCheckpointDir(String jobName, boolean create) throws IOException; + + Path getModelDir(String modelName, boolean create) throws IOException; + + FileSystem getFileSystem() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java new file mode 100644 index 0000000..9c164c6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/RuntimeFactory.java @@ -0,0 +1,106 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.runtimes; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.submarine.common.ClientContext; +import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration; +import org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException; +import org.apache.hadoop.yarn.submarine.runtimes.common.FSBasedSubmarineStorageImpl; +import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor; +import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter; +import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage; +import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobMonitor; +import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobSubmitter; + +import java.lang.reflect.InvocationTargetException; + +public abstract class RuntimeFactory { + protected ClientContext clientContext; + private JobSubmitter jobSubmitter; + private JobMonitor jobMonitor; + private SubmarineStorage submarineStorage; + + public RuntimeFactory(ClientContext clientContext) { + this.clientContext = clientContext; + } + + public static RuntimeFactory getRuntimeFactory( + ClientContext clientContext) { + Configuration submarineConfiguration = + clientContext.getSubmarineConfig(); + String runtimeClass = submarineConfiguration.get( + SubmarineConfiguration.RUNTIME_CLASS, + SubmarineConfiguration.DEFAULT_RUNTIME_CLASS); + + try { + Class<?> runtimeClazz = Class.forName(runtimeClass); + if (RuntimeFactory.class.isAssignableFrom(runtimeClazz)) { + return (RuntimeFactory) runtimeClazz.getConstructor(ClientContext.class).newInstance(clientContext); + } else { + throw new SubmarineRuntimeException("Class: " + runtimeClass + + " not instance of " + RuntimeFactory.class.getCanonicalName()); + } + } catch (ClassNotFoundException | IllegalAccessException | + InstantiationException | NoSuchMethodException | + InvocationTargetException e) { + throw new SubmarineRuntimeException( + "Could not instantiate RuntimeFactory: " + runtimeClass, e); + } + } + + protected abstract JobSubmitter internalCreateJobSubmitter(); + + protected abstract JobMonitor internalCreateJobMonitor(); + + protected abstract SubmarineStorage internalCreateSubmarineStorage(); + + public synchronized JobSubmitter getJobSubmitterInstance() { + if (jobSubmitter == null) { + jobSubmitter = internalCreateJobSubmitter(); + } + return jobSubmitter; + } + + public synchronized JobMonitor getJobMonitorInstance() { + if (jobMonitor == null) { + jobMonitor = internalCreateJobMonitor(); + } + return jobMonitor; + } + + public synchronized SubmarineStorage getSubmarineStorage() { + if (submarineStorage == null) { + submarineStorage = internalCreateSubmarineStorage(); + } + return submarineStorage; + } + + @VisibleForTesting + public synchronized void setJobSubmitterInstance(JobSubmitter jobSubmitter) { + this.jobSubmitter = jobSubmitter; + } + + @VisibleForTesting + public synchronized void setJobMonitorInstance(JobMonitor jobMonitor) { + this.jobMonitor = jobMonitor; + } + + @VisibleForTesting + public synchronized void setSubmarineStorage(SubmarineStorage storage) { + this.submarineStorage = storage; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java new file mode 100644 index 0000000..ebf9581 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java @@ -0,0 +1,106 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + + +package org.apache.hadoop.yarn.submarine.runtimes.common; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.submarine.common.ClientContext; +import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.util.Map; + +/** + * A super naive FS-based storage. + */ +public class FSBasedSubmarineStorageImpl extends SubmarineStorage { + ClientContext clientContext; + RemoteDirectoryManager rdm; + + public FSBasedSubmarineStorageImpl(ClientContext clientContext) { + this.clientContext = clientContext; + rdm = clientContext.getRemoteDirectoryManager(); + } + + @Override + public void addNewJob(String jobName, Map<String, String> jobInfo) + throws IOException { + Path jobInfoPath = getJobInfoPath(jobName, true); + FSDataOutputStream fos = rdm.getFileSystem().create(jobInfoPath); + serializeMap(fos, jobInfo); + } + + @Override + public Map<String, String> getJobInfoByName(String jobName) + throws IOException { + Path jobInfoPath = getJobInfoPath(jobName, false); + FSDataInputStream fis = rdm.getFileSystem().open(jobInfoPath); + return deserializeMap(fis); + } + + @Override + public void addNewModel(String modelName, String version, + Map<String, String> modelInfo) throws IOException { + Path modelInfoPath = getModelInfoPath(modelName, version, true); + FSDataOutputStream fos = rdm.getFileSystem().create(modelInfoPath); + serializeMap(fos, modelInfo); + } + + @Override + public Map<String, String> getModelInfoByName(String modelName, + String version) throws IOException { + Path modelInfoPath = getModelInfoPath(modelName, version, false); + FSDataInputStream fis = rdm.getFileSystem().open(modelInfoPath); + return deserializeMap(fis); + } + + private Path getModelInfoPath(String modelName, String version, boolean create) + throws IOException { + Path modelDir = rdm.getModelDir(modelName, create); + Path modelInfo = new Path(modelDir, version + ".info"); + return modelInfo; + } + + private void serializeMap(FSDataOutputStream fos, Map<String, String> map) + throws IOException { + ObjectOutput oo = new ObjectOutputStream(fos); + oo.writeObject(map); + oo.close(); + } + + private Map<String, String> deserializeMap(FSDataInputStream fis) + throws IOException { + ObjectInput oi = new ObjectInputStream(fis); + Map<String, String> newMap = null; + try { + newMap = (Map<String, String>) oi.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + return newMap; + } + + private Path getJobInfoPath(String jobName, boolean create) throws IOException { + Path path = rdm.getJobStagingArea(jobName, create); + Path fileName = new Path(path, "job.info"); + return fileName; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java new file mode 100644 index 0000000..c81393b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java @@ -0,0 +1,84 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.runtimes.common; + +import org.apache.hadoop.yarn.submarine.common.ClientContext; +import org.apache.hadoop.yarn.submarine.common.api.JobState; +import org.apache.hadoop.yarn.submarine.common.api.JobStatus; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Monitor status of job(s) + */ +public abstract class JobMonitor { + private static final Logger LOG = + LoggerFactory.getLogger(JobMonitor.class); + protected ClientContext clientContext; + + public JobMonitor(ClientContext clientContext) { + this.clientContext = clientContext; + } + + /** + * Returns status of training job. + * + * @param jobName name of job + * @return job status + * @throws IOException anything else happens + * @throws YarnException anything related to YARN happens + */ + public abstract JobStatus getTrainingJobStatus(String jobName) + throws IOException, YarnException; + + /** + * Continue wait and print status if job goes to ready or final state. + * @param jobName + * @throws IOException + * @throws YarnException + * @throws SubmarineException + */ + public void waitTrainingFinal(String jobName) + throws IOException, YarnException, SubmarineException { + // Wait 5 sec between each fetch. + int waitIntervalSec = 5; + JobStatus js; + while (true) { + js = getTrainingJobStatus(jobName); + JobState jobState = js.getState(); + js.nicePrint(System.err); + + if (JobState.isFinal(jobState)) { + if (jobState.equals(JobState.FAILED)) { + throw new SubmarineException("Job failed"); + } else if (jobState.equals(JobState.KILLED)) { + throw new SubmarineException("Job killed"); + } + LOG.info("Job exited with state=" + jobState); + break; + } + + try { + Thread.sleep(waitIntervalSec * 1000); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java new file mode 100644 index 0000000..1749390 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobSubmitter.java @@ -0,0 +1,36 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.runtimes.common; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; + +import java.io.IOException; + +/** + * Submit job to cluster master + */ +public interface JobSubmitter { + /** + * Submit job to cluster + * @param parameters run job parameters + * @return applicatioId when successfully submitted + * @throws YarnException for issues while contacting YARN daemons + * @throws IOException for other issues. + */ + ApplicationId submitJob(RunJobParameters parameters) + throws IOException, YarnException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java new file mode 100644 index 0000000..1fbbe7a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/StorageKeyConstants.java @@ -0,0 +1,24 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.runtimes.common; + +public class StorageKeyConstants { + public static final String JOB_NAME = "JOB_NAME"; + public static final String JOB_RUN_ARGS = "JOB_RUN_ARGS"; + public static final String APPLICATION_ID = "APPLICATION_ID"; + public static final String CHECKPOINT_PATH = "CHECKPOINT_PATH"; + public static final String INPUT_PATH = "INPUT_PATH"; + public static final String SAVED_MODEL_PATH = "SAVED_MODEL_PATH"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java new file mode 100644 index 0000000..9c2004f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/SubmarineStorage.java @@ -0,0 +1,57 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.runtimes.common; + +import java.io.IOException; +import java.util.Map; + +/** + * Persistent job/model, etc. + */ +public abstract class SubmarineStorage { + /** + * Add a new job by name + * @param jobName name of job. + * @param jobInfo info of the job. + */ + public abstract void addNewJob(String jobName, Map<String, String> jobInfo) + throws IOException; + + /** + * Get job info by job name. + * @param jobName name of job + * @return info of the job. + */ + public abstract Map<String, String> getJobInfoByName(String jobName) + throws IOException; + + /** + * Add a new model + * @param modelName name of model + * @param version version of the model, when null is specified, it will be + * "default" + * @param modelInfo info of the model. + */ + public abstract void addNewModel(String modelName, String version, + Map<String, String> modelInfo) throws IOException; + + /** + * Get model info by name and version. + * @param modelName name of model. + * @param version version of the model, when null is specifed, it will be + */ + public abstract Map<String, String> getModelInfoByName(String modelName, String version) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java new file mode 100644 index 0000000..94d30b0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java @@ -0,0 +1,46 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.submarine.common.ClientContext; +import org.apache.hadoop.yarn.submarine.common.api.JobStatus; +import org.apache.hadoop.yarn.submarine.common.api.builder.JobStatusBuilder; +import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor; + +import java.io.IOException; + +public class YarnServiceJobMonitor extends JobMonitor { + private ServiceClient serviceClient = null; + + public YarnServiceJobMonitor(ClientContext clientContext) { + super(clientContext); + } + + @Override + public synchronized JobStatus getTrainingJobStatus(String jobName) + throws IOException, YarnException { + if (this.serviceClient == null) { + this.serviceClient = YarnServiceUtils.createServiceClient( + clientContext.getYarnConfig()); + } + + Service serviceSpec = this.serviceClient.getStatus(jobName); + JobStatus jobStatus = JobStatusBuilder.fromServiceSpec(serviceSpec); + return jobStatus; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java new file mode 100644 index 0000000..3cd0d7e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -0,0 +1,458 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.ServiceApiConstants; +import org.apache.hadoop.yarn.service.api.records.Artifact; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ConfigFile; +import org.apache.hadoop.yarn.service.api.records.Resource; +import org.apache.hadoop.yarn.service.api.records.ResourceInformation; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; +import org.apache.hadoop.yarn.submarine.common.ClientContext; +import org.apache.hadoop.yarn.submarine.common.Envs; +import org.apache.hadoop.yarn.submarine.common.api.TaskType; +import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; +import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +/** + * Submit a job to cluster + */ +public class YarnServiceJobSubmitter implements JobSubmitter { + private static final Logger LOG = + LoggerFactory.getLogger(YarnServiceJobSubmitter.class); + ClientContext clientContext; + Service serviceSpec; + private Set<Path> uploadedFiles = new HashSet<>(); + + public YarnServiceJobSubmitter(ClientContext clientContext) { + this.clientContext = clientContext; + } + + private Resource getServiceResourceFromYarnResource( + org.apache.hadoop.yarn.api.records.Resource yarnResource) { + Resource serviceResource = new Resource(); + serviceResource.setCpus(yarnResource.getVirtualCores()); + serviceResource.setMemory(String.valueOf(yarnResource.getMemorySize())); + + Map<String, ResourceInformation> riMap = new HashMap<>(); + for (org.apache.hadoop.yarn.api.records.ResourceInformation ri : yarnResource + .getAllResourcesListCopy()) { + ResourceInformation serviceRi = + new ResourceInformation(); + serviceRi.setValue(ri.getValue()); + serviceRi.setUnit(ri.getUnits()); + riMap.put(ri.getName(), serviceRi); + } + serviceResource.setResourceInformations(riMap); + + return serviceResource; + } + + private String getValueOfEnvionment(String envar) { + // extract value from "key=value" form + if (envar == null || !envar.contains("=")) { + return ""; + } else { + return envar.substring(envar.indexOf("=") + 1); + } + } + + private void addHdfsClassPathIfNeeded(RunJobParameters parameters, + FileWriter fw, Component comp) throws IOException { + // Find envs to use HDFS + String hdfsHome = null; + String javaHome = null; + + boolean hadoopEnv = false; + + for (String envar : parameters.getEnvars()) { + if (envar.startsWith("DOCKER_HADOOP_HDFS_HOME=")) { + hdfsHome = getValueOfEnvionment(envar); + hadoopEnv = true; + } else if (envar.startsWith("DOCKER_JAVA_HOME=")) { + javaHome = getValueOfEnvionment(envar); + } + } + + boolean lackingEnvs = false; + + if ((parameters.getInputPath() != null && parameters.getInputPath() + .contains("hdfs://")) || (parameters.getCheckpointPath() != null + && parameters.getCheckpointPath().contains("hdfs://")) || ( + parameters.getSavedModelPath() != null && parameters.getSavedModelPath() + .contains("hdfs://")) || hadoopEnv) { + // HDFS is asked either in input or output, set LD_LIBRARY_PATH + // and classpath + + if (hdfsHome != null) { + // Unset HADOOP_HOME/HADOOP_YARN_HOME to make sure host machine's envs + // won't pollute docker's env. + fw.append("export HADOOP_HOME=\n"); + fw.append("export HADOOP_YARN_HOME=\n"); + fw.append("export HADOOP_HDFS_HOME=" + hdfsHome + "\n"); + } else{ + lackingEnvs = true; + } + + // hadoop confs will be uploaded to HDFS and localized to container's + // local folder, so here set $HADOOP_CONF_DIR to $WORK_DIR. + fw.append("export HADOOP_CONF_DIR=$WORK_DIR\n"); + if (javaHome != null) { + fw.append("export JAVA_HOME=" + javaHome + "\n"); + fw.append("export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:" + + "$JAVA_HOME/lib/amd64/server\n"); + } else { + lackingEnvs = true; + } + fw.append("export CLASSPATH=`$HADOOP_HDFS_HOME/bin/hadoop classpath --glob`\n"); + } + + if (lackingEnvs) { + LOG.error("When hdfs is being used to read/write models/data. Following" + + "envs are required: 1) DOCKER_HADOOP_HDFS_HOME=<HDFS_HOME inside" + + "docker container> 2) DOCKER_JAVA_HOME=<JAVA_HOME inside docker" + + "container>. You can use --env to pass these envars."); + throw new IOException("Failed to detect HDFS-related environments."); + } + + // Trying to upload core-site.xml and hdfs-site.xml + Path stagingDir = + clientContext.getRemoteDirectoryManager().getJobStagingArea( + parameters.getName(), true); + File coreSite = findFileOnClassPath("core-site.xml"); + File hdfsSite = findFileOnClassPath("hdfs-site.xml"); + if (coreSite == null || hdfsSite == null) { + LOG.error("hdfs is being used, however we couldn't locate core-site.xml/" + + "hdfs-site.xml from classpath, please double check you classpath" + + "setting and make sure they're included."); + throw new IOException( + "Failed to locate core-site.xml / hdfs-site.xml from class path"); + } + uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir, + coreSite.getAbsolutePath(), "core-site.xml", comp); + uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir, + hdfsSite.getAbsolutePath(), "hdfs-site.xml", comp); + + // DEBUG + if (SubmarineLogs.isVerbose()) { + fw.append("echo $CLASSPATH\n"); + fw.append("echo $JAVA_HOME\n"); + fw.append("echo $LD_LIBRARY_PATH\n"); + fw.append("echo $HADOOP_HDFS_HOME\n"); + } + } + + private void addCommonEnvironments(Component component, TaskType taskType) { + Map<String, String> envs = component.getConfiguration().getEnv(); + envs.put(Envs.TASK_INDEX_ENV, ServiceApiConstants.COMPONENT_ID); + envs.put(Envs.TASK_TYPE_ENV, taskType.name()); + } + + /* + * Generate a command launch script on local disk, returns patch to the script + */ + private String generateCommandLaunchScript(RunJobParameters parameters, + TaskType taskType, Component comp) throws IOException { + File file = File.createTempFile(taskType.name() + "-launch-script", ".sh"); + FileWriter fw = new FileWriter(file); + + fw.append("#!/bin/bash\n"); + + addHdfsClassPathIfNeeded(parameters, fw, comp); + + // For primary_worker + if (taskType == TaskType.PRIMARY_WORKER) { + // Do we need tensorboard? + if (parameters.isTensorboardEnabled()) { + int tensorboardPort = 6006; + // Run tensorboard at the background + fw.append( + "tensorboard --port " + tensorboardPort + " --logdir " + parameters + .getCheckpointPath() + " &\n"); + } + } + + // When distributed training is required + if (parameters.isDistributed()) { + // Generated TF_CONFIG + String tfConfigEnv = YarnServiceUtils.getTFConfigEnv( + taskType.getComponentName(), parameters.getNumWorkers(), + parameters.getNumPS(), parameters.getName(), + System.getProperty("user.name"), + clientContext.getYarnConfig().get("hadoop.registry.dns.domain-name")); + fw.append("export TF_CONFIG=\"" + tfConfigEnv + "\"\n"); + } + + // Print launch command + if (taskType.equals(TaskType.WORKER) || taskType.equals( + TaskType.PRIMARY_WORKER)) { + fw.append(parameters.getWorkerLaunchCmd() + '\n'); + + if (SubmarineLogs.isVerbose()) { + LOG.info("Worker command =[" + parameters.getWorkerLaunchCmd() + "]"); + } + } else if (taskType.equals(TaskType.PS)) { + fw.append(parameters.getPSLaunchCmd() + '\n'); + + if (SubmarineLogs.isVerbose()) { + LOG.info("PS command =[" + parameters.getPSLaunchCmd() + "]"); + } + } + + fw.close(); + return file.getAbsolutePath(); + } + + private String getScriptFileName(TaskType taskType) { + return "run-" + taskType.name() + ".sh"; + } + + private File findFileOnClassPath(final String fileName) { + final String classpath = System.getProperty("java.class.path"); + final String pathSeparator = System.getProperty("path.separator"); + final StringTokenizer tokenizer = new StringTokenizer(classpath, + pathSeparator); + + while (tokenizer.hasMoreTokens()) { + final String pathElement = tokenizer.nextToken(); + final File directoryOrJar = new File(pathElement); + final File absoluteDirectoryOrJar = directoryOrJar.getAbsoluteFile(); + if (absoluteDirectoryOrJar.isFile()) { + final File target = new File(absoluteDirectoryOrJar.getParent(), + fileName); + if (target.exists()) { + return target; + } + } else{ + final File target = new File(directoryOrJar, fileName); + if (target.exists()) { + return target; + } + } + } + + return null; + } + + private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir, + String fileToUpload, String destFilename, Component comp) + throws IOException { + FileSystem fs = FileSystem.get(clientContext.getYarnConfig()); + + // Upload to remote FS under staging area + File localFile = new File(fileToUpload); + if (!localFile.exists()) { + throw new FileNotFoundException( + "Trying to upload file=" + localFile.getAbsolutePath() + + " to remote, but couldn't find local file."); + } + String filename = new File(fileToUpload).getName(); + + Path uploadedFilePath = new Path(stagingDir, filename); + if (!uploadedFiles.contains(uploadedFilePath)) { + if (SubmarineLogs.isVerbose()) { + LOG.info("Copying local file=" + fileToUpload + " to remote=" + + uploadedFilePath); + } + fs.copyFromLocalFile(new Path(fileToUpload), uploadedFilePath); + uploadedFiles.add(uploadedFilePath); + } + + FileStatus fileStatus = fs.getFileStatus(uploadedFilePath); + LOG.info("Uploaded file path = " + fileStatus.getPath()); + + // Set it to component's files list + comp.getConfiguration().getFiles().add(new ConfigFile().srcFile( + fileStatus.getPath().toUri().toString()).destFile(destFilename) + .type(ConfigFile.TypeEnum.STATIC)); + } + + private void handleLaunchCommand(RunJobParameters parameters, + TaskType taskType, Component component) throws IOException { + // Get staging area directory + Path stagingDir = + clientContext.getRemoteDirectoryManager().getJobStagingArea( + parameters.getName(), true); + + // Generate script file in the local disk + String localScriptFile = generateCommandLaunchScript(parameters, taskType, + component); + String destScriptFileName = getScriptFileName(taskType); + uploadToRemoteFileAndLocalizeToContainerWorkDir(stagingDir, localScriptFile, + destScriptFileName, component); + + component.setLaunchCommand("./" + destScriptFileName); + } + + private void addWorkerComponent(Service service, + RunJobParameters parameters, TaskType taskType) throws IOException { + Component workerComponent = new Component(); + addCommonEnvironments(workerComponent, taskType); + + workerComponent.setName(taskType.getComponentName()); + + if (taskType.equals(TaskType.PRIMARY_WORKER)) { + workerComponent.setNumberOfContainers(1L); + } else{ + workerComponent.setNumberOfContainers( + (long) parameters.getNumWorkers() - 1); + } + + if (parameters.getWorkerDockerImage() != null) { + workerComponent.setArtifact( + getDockerArtifact(parameters.getWorkerDockerImage())); + } + + workerComponent.setResource( + getServiceResourceFromYarnResource(parameters.getWorkerResource())); + handleLaunchCommand(parameters, taskType, workerComponent); + workerComponent.setRestartPolicy(Component.RestartPolicyEnum.NEVER); + service.addComponent(workerComponent); + } + + // Handle worker and primary_worker. + private void addWorkerComponents(Service service, RunJobParameters parameters) + throws IOException { + addWorkerComponent(service, parameters, TaskType.PRIMARY_WORKER); + + if (parameters.getNumWorkers() > 1) { + addWorkerComponent(service, parameters, TaskType.WORKER); + } + } + + private void appendToEnv(Service service, String key, String value, + String delim) { + Map<String, String> env = service.getConfiguration().getEnv(); + if (!env.containsKey(key)) { + env.put(key, value); + } else { + if (!value.isEmpty()) { + String existingValue = env.get(key); + if (!existingValue.endsWith(delim)) { + env.put(key, existingValue + delim + value); + } else { + env.put(key, existingValue + value); + } + } + } + } + + private void handleServiceEnvs(Service service, RunJobParameters parameters) { + if (parameters.getEnvars() != null) { + for (String envarPair : parameters.getEnvars()) { + String key, value; + if (envarPair.contains("=")) { + int idx = envarPair.indexOf('='); + key = envarPair.substring(0, idx); + value = envarPair.substring(idx + 1); + } else{ + // No "=" found so use the whole key + key = envarPair; + value = ""; + } + appendToEnv(service, key, value, ":"); + } + } + + // Append other configs like /etc/passwd, /etc/krb5.conf + appendToEnv(service, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS", + "/etc/passwd:/etc/passwd:ro", ","); + + String authenication = clientContext.getYarnConfig().get( + HADOOP_SECURITY_AUTHENTICATION); + if (authenication != null && authenication.equals("kerberos")) { + appendToEnv(service, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS", + "/etc/krb5.conf:/etc/krb5.conf:ro", ","); + } + } + + private Artifact getDockerArtifact(String dockerImageName) { + return new Artifact().type(Artifact.TypeEnum.DOCKER).id(dockerImageName); + } + + private Service createServiceByParameters(RunJobParameters parameters) + throws IOException { + Service service = new Service(); + service.setName(parameters.getName()); + service.setVersion(String.valueOf(System.currentTimeMillis())); + service.setArtifact(getDockerArtifact(parameters.getDockerImageName())); + + handleServiceEnvs(service, parameters); + + addWorkerComponents(service, parameters); + + if (parameters.getNumPS() > 0) { + Component psComponent = new Component(); + psComponent.setName(TaskType.PS.getComponentName()); + addCommonEnvironments(psComponent, TaskType.PS); + psComponent.setNumberOfContainers((long) parameters.getNumPS()); + psComponent.setRestartPolicy(Component.RestartPolicyEnum.NEVER); + psComponent.setResource( + getServiceResourceFromYarnResource(parameters.getPsResource())); + + // Override global docker image if needed. + if (parameters.getPsDockerImage() != null) { + psComponent.setArtifact( + getDockerArtifact(parameters.getPsDockerImage())); + } + handleLaunchCommand(parameters, TaskType.PS, psComponent); + service.addComponent(psComponent); + } + return service; + } + + /** + * {@inheritDoc} + */ + @Override + public ApplicationId submitJob(RunJobParameters parameters) + throws IOException, YarnException { + Service service = createServiceByParameters(parameters); + ServiceClient serviceClient = YarnServiceUtils.createServiceClient( + clientContext.getYarnConfig()); + ApplicationId appid = serviceClient.actionCreate(service); + serviceClient.stop(); + this.serviceSpec = service; + return appid; + } + + @VisibleForTesting + public Service getServiceSpec() { + return serviceSpec; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java new file mode 100644 index 0000000..3489e49 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceRuntimeFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; + +import org.apache.hadoop.yarn.submarine.common.ClientContext; +import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory; +import org.apache.hadoop.yarn.submarine.runtimes.common.FSBasedSubmarineStorageImpl; +import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor; +import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter; +import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage; + +public class YarnServiceRuntimeFactory extends RuntimeFactory { + + public YarnServiceRuntimeFactory(ClientContext clientContext) { + super(clientContext); + } + + @Override + protected JobSubmitter internalCreateJobSubmitter() { + return new YarnServiceJobSubmitter(super.clientContext); + } + + @Override + protected JobMonitor internalCreateJobMonitor() { + return new YarnServiceJobMonitor(super.clientContext); + } + + @Override + protected SubmarineStorage internalCreateSubmarineStorage() { + return new FSBasedSubmarineStorageImpl(super.clientContext); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java new file mode 100644 index 0000000..f7ecc97 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java @@ -0,0 +1,78 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.submarine.common.Envs; + +public class YarnServiceUtils { + // This will be true only in UT. + private static ServiceClient stubServiceClient = null; + + public static ServiceClient createServiceClient( + Configuration yarnConfiguration) { + if (stubServiceClient != null) { + return stubServiceClient; + } + + ServiceClient serviceClient = new ServiceClient(); + serviceClient.init(yarnConfiguration); + serviceClient.start(); + return serviceClient; + } + + @VisibleForTesting + public static void setStubServiceClient(ServiceClient stubServiceClient) { + YarnServiceUtils.stubServiceClient = stubServiceClient; + } + + public static String getTFConfigEnv(String curCommponentName, int nWorkers, + int nPs, String serviceName, String userName, String domain) { + String commonEndpointSuffix = + "." + serviceName + "." + userName + "." + domain + ":8000"; + + String json = "{\\\"cluster\\\":{"; + + String master = getComponentArrayJson("master", 1, commonEndpointSuffix) + + ","; + String worker = getComponentArrayJson("worker", nWorkers - 1, + commonEndpointSuffix) + ","; + String ps = getComponentArrayJson("ps", nPs, commonEndpointSuffix) + "},"; + + String task = + "\\\"task\\\":{" + " \\\"type\\\":\\\"" + curCommponentName + "\\\"," + + " \\\"index\\\":" + '$' + Envs.TASK_INDEX_ENV + "},"; + String environment = "\\\"environment\\\":\\\"cloud\\\"}"; + + return json + master + worker + ps + task + environment; + } + + private static String getComponentArrayJson(String componentName, int count, + String endpointSuffix) { + String component = "\\\"" + componentName + "\\\":"; + String array = "["; + for (int i = 0; i < count; i++) { + array = array + "\\\"" + componentName + "-" + i + + endpointSuffix + "\\\""; + if (i != count - 1) { + array = array + ","; + } + } + array = array + "]"; + return component + array; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md new file mode 100644 index 0000000..ce26ea7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/DeveloperGuide.md @@ -0,0 +1,26 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Developper Guide + +(Need add more details) + +By default, submarine uses YARN service framework as runtime. If you want to add your own implementation. You can add a new `RuntimeFactory` implementation and configure following option to `submarine.xml` (which should be placed under same `$HADOOP_CONF_DIR`) + +``` +<property> + <name>submarine.runtime.class</name> + <value>... full qualified class name for your runtime factory ... </value> +</property> +``` http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md new file mode 100644 index 0000000..b720b5a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/QuickStart.md @@ -0,0 +1,134 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Quick Start Guide + +## Prerequisite + +Must: +- Apache Hadoop 3.1.0, YARN service enabled. + +Optional: +- Enable YARN DNS. (When distributed training required.) +- Enable GPU on YARN support. (When GPU-based training required.) + +## Run jobs + +### Commandline options + +```$xslt +usage: job run + -checkpoint_path <arg> Training output directory of the job, could + be local or other FS directory. This + typically includes checkpoint files and + exported model + -docker_image <arg> Docker image name/tag + -env <arg> Common environment variable of worker/ps + -input_path <arg> Input of the job, could be local or other FS + directory + -name <arg> Name of the job + -num_ps <arg> Number of PS tasks of the job, by default + it's 0 + -num_workers <arg> Numnber of worker tasks of the job, by + default it's 1 + -ps_docker_image <arg> Specify docker image for PS, when this is + not specified, PS uses --docker_image as + default. + -ps_launch_cmd <arg> Commandline of worker, arguments will be + directly used to launch the PS + -ps_resources <arg> Resource of each PS, for example + memory-mb=2048,vcores=2,yarn.io/gpu=2 + -queue <arg> Name of queue to run the job, by default it + uses default queue + -saved_model_path <arg> Model exported path (savedmodel) of the job, + which is needed when exported model is not + placed under ${checkpoint_path}could be + local or other FS directory. This will be + used to serve. + -tensorboard <arg> Should we run TensorBoard for this job? By + default it's true + -verbose Print verbose log for troubleshooting + -wait_job_finish Specified when user want to wait the job + finish + -worker_docker_image <arg> Specify docker image for WORKER, when this + is not specified, WORKER uses --docker_image + as default. + -worker_launch_cmd <arg> Commandline of worker, arguments will be + directly used to launch the worker + -worker_resources <arg> Resource of each worker, for example + memory-mb=2048,vcores=2,yarn.io/gpu=2 +``` + +### Launch Standalone Tensorflow Application: + +#### Commandline +``` +yarn jar path-to/hadoop-yarn-applications-submarine-3.2.0-SNAPSHOT.jar job run \ + --env DOCKER_JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre/ \ + --env DOCKER_HADOOP_HDFS_HOME=/hadoop-3.1.0 --name tf-job-001 \ + --docker_image <your-docker-image> \ + --input_path hdfs://default/dataset/cifar-10-data \ + --checkpoint_path hdfs://default/tmp/cifar-10-jobdir \ + --worker_resources memory=4G,vcores=2,gpu=2 \ + --worker_launch_cmd "python ... (Your training application cmd)" +``` + +#### Notes: + +1) `DOCKER_JAVA_HOME` points to JAVA_HOME inside Docker image. +2) `DOCKER_HADOOP_HDFS_HOME` points to HADOOP_HDFS_HOME inside Docker image. +3) `--worker_resources` can include gpu when you need GPU to train your task. + +### Launch Distributed Tensorflow Application: + +#### Commandline + +``` +yarn jar hadoop-yarn-applications-submarine-<version>.jar job run \ + --name tf-job-001 --docker_image <your docker image> \ + --input_path hdfs://default/dataset/cifar-10-data \ + --checkpoint_path hdfs://default/tmp/cifar-10-jobdir \ + --env DOCKER_JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre/ \ + --env DOCKER_HADOOP_HDFS_HOME=/hadoop-3.1.0 \ + --num_workers 2 \ + --worker_resources memory=8G,vcores=2,gpu=1 --worker_launch_cmd "cmd for worker ..." \ + --num_ps 2 \ + --ps_resources memory=4G,vcores=2,gpu=0 --ps_launch_cmd "cmd for ps" \ +``` + +#### Notes: + +1) Very similar to standalone TF application, but you need to specify #worker/#ps +2) Different resources can be specified for worker and PS. +3) `TF_CONFIG` environment will be auto generated and set before executing user's launch command. + +## Run jobs + +### Get Job Status + +``` +yarn jar hadoop-yarn-applications-submarine-3.2.0-SNAPSHOT.jar job show --name tf-job-001 +``` + +Output looks like: +``` +Job Meta Info: + Application Id: application_1532131617202_0005 + Input Path: hdfs://default/dataset/cifar-10-data + Checkpoint Path: hdfs://default/tmp/cifar-10-jobdir + Run Parameters: --name tf-job-001 --docker_image wtan/tf-1.8.0-gpu:0.0.3 + (... all your commandline before run the job) +``` + +After that, you can run ```tensorboard --logdir=<checkpoint-path>``` to view Tensorboard of the job. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java new file mode 100644 index 0000000..295d6a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.submarine.client.cli; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; +import org.apache.hadoop.yarn.submarine.common.MockClientContext; +import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; +import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory; +import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor; +import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter; +import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestRunJobCliParsing { + @Before + public void before() { + SubmarineLogs.verboseOff(); + } + + @Test + public void testPrintHelp() { + MockClientContext mockClientContext = new MockClientContext(); + JobSubmitter mockJobSubmitter = mock(JobSubmitter.class); + JobMonitor mockJobMonitor = mock(JobMonitor.class); + RunJobCli runJobCli = new RunJobCli(mockClientContext, mockJobSubmitter, + mockJobMonitor); + runJobCli.printUsages(); + } + + private MockClientContext getMockClientContext() + throws IOException, YarnException { + MockClientContext mockClientContext = new MockClientContext(); + JobSubmitter mockJobSubmitter = mock(JobSubmitter.class); + when(mockJobSubmitter.submitJob(any(RunJobParameters.class))).thenReturn( + ApplicationId.newInstance(1234L, 1)); + JobMonitor mockJobMonitor = mock(JobMonitor.class); + SubmarineStorage storage = mock(SubmarineStorage.class); + RuntimeFactory rtFactory = mock(RuntimeFactory.class); + + when(rtFactory.getJobSubmitterInstance()).thenReturn(mockJobSubmitter); + when(rtFactory.getJobMonitorInstance()).thenReturn(mockJobMonitor); + when(rtFactory.getSubmarineStorage()).thenReturn(storage); + + mockClientContext.setRuntimeFactory(rtFactory); + return mockClientContext; + } + + @Test + public void testBasicRunJobForDistributedTraining() throws Exception { + RunJobCli runJobCli = new RunJobCli(getMockClientContext()); + + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", + "--ps_resources", "memory=4G,vcores=4", "--tensorboard", "true", + "--ps_launch_cmd", "python run-ps.py", "--verbose" }); + + RunJobParameters jobRunParameters = runJobCli.getRunJobParameters(); + + Assert.assertEquals(jobRunParameters.getInputPath(), "hdfs://input"); + Assert.assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output"); + Assert.assertEquals(jobRunParameters.getNumPS(), 2); + Assert.assertEquals(jobRunParameters.getPSLaunchCmd(), "python run-ps.py"); + Assert.assertEquals(Resources.createResource(4096, 4), + jobRunParameters.getPsResource()); + Assert.assertEquals(jobRunParameters.getWorkerLaunchCmd(), + "python run-job.py"); + Assert.assertEquals(Resources.createResource(2048, 2), + jobRunParameters.getWorkerResource()); + Assert.assertEquals(jobRunParameters.getDockerImageName(), + "tf-docker:1.1.0"); + Assert.assertTrue(SubmarineLogs.isVerbose()); + } + + @Test + public void testBasicRunJobForSingleNodeTraining() throws Exception { + RunJobCli runJobCli = new RunJobCli(getMockClientContext()); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output", + "--num_workers", "1", "--worker_launch_cmd", "python run-job.py", + "--worker_resources", "memory=4g,vcores=2", "--tensorboard", + "true", "--verbose", "--wait_job_finish" }); + + RunJobParameters jobRunParameters = runJobCli.getRunJobParameters(); + + Assert.assertEquals(jobRunParameters.getInputPath(), "hdfs://input"); + Assert.assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output"); + Assert.assertEquals(jobRunParameters.getNumWorkers(), 1); + Assert.assertEquals(jobRunParameters.getWorkerLaunchCmd(), + "python run-job.py"); + Assert.assertEquals(Resources.createResource(4096, 2), + jobRunParameters.getWorkerResource()); + Assert.assertTrue(SubmarineLogs.isVerbose()); + Assert.assertTrue(jobRunParameters.isWaitJobFinish()); + } + + @Test + public void testLaunchCommandPatternReplace() throws Exception { + RunJobCli runJobCli = new RunJobCli(getMockClientContext()); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py --input=%input_path% --model_dir=%checkpoint_path% --export_dir=%saved_model_path%/savedmodel", + "--worker_resources", "memory=2048,vcores=2", "--ps_resources", + "memory=4096,vcores=4", "--tensorboard", "true", "--ps_launch_cmd", + "python run-ps.py --input=%input_path% --model_dir=%checkpoint_path%/model", + "--verbose" }); + + Assert.assertEquals( + "python run-job.py --input=hdfs://input --model_dir=hdfs://output " + + "--export_dir=hdfs://output/savedmodel", + runJobCli.getRunJobParameters().getWorkerLaunchCmd()); + Assert.assertEquals( + "python run-ps.py --input=hdfs://input --model_dir=hdfs://output/model", + runJobCli.getRunJobParameters().getPSLaunchCmd()); + } + + @Test + public void testResourceUnitParsing() throws Exception { + Resource res = CliUtils.createResourceFromString("memory=20g,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); + + res = CliUtils.createResourceFromString("memory=20G,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); + + res = CliUtils.createResourceFromString("memory=20M,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20, 3), res); + + res = CliUtils.createResourceFromString("memory=20m,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20, 3), res); + + res = CliUtils.createResourceFromString("memory-mb=20,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20, 3), res); + + res = CliUtils.createResourceFromString("memory-mb=20m,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20, 3), res); + + res = CliUtils.createResourceFromString("memory-mb=20G,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); + + // W/o unit for memory means bits, and 20 bits will be rounded to 0 + res = CliUtils.createResourceFromString("memory=20,vcores=3", + ResourceUtils.getResourcesTypeInfo()); + Assert.assertEquals(Resources.createResource(0, 3), res); + + // Test multiple resources + List<ResourceTypeInfo> resTypes = new ArrayList<>( + ResourceUtils.getResourcesTypeInfo()); + resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI, "")); + ResourceUtils.reinitializeResources(resTypes); + res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=0", + resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); + + res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=3", + resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI)); + + res = CliUtils.createResourceFromString("memory=2G,vcores=3", + resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); + + res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=0", + resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); + + res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=3", + resTypes); + Assert.assertEquals(2 * 1024, res.getMemorySize()); + Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI)); + + // TODO, add more negative tests. + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java new file mode 100644 index 0000000..9c0d872 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestShowJobCliParsing.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.submarine.client.cli; + +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.submarine.client.cli.param.ShowJobParameters; +import org.apache.hadoop.yarn.submarine.common.MockClientContext; +import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; +import org.apache.hadoop.yarn.submarine.common.exception.SubmarineException; +import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory; +import org.apache.hadoop.yarn.submarine.runtimes.common.MemorySubmarineStorage; +import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants; +import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestShowJobCliParsing { + @Before + public void before() { + SubmarineLogs.verboseOff(); + } + + @Test + public void testPrintHelp() { + MockClientContext mockClientContext = new MockClientContext(); + ShowJobCli showJobCli = new ShowJobCli(mockClientContext); + showJobCli.printUsages(); + } + + @Test + public void testShowJob() + throws InterruptedException, SubmarineException, YarnException, + ParseException, IOException { + MockClientContext mockClientContext = new MockClientContext(); + ShowJobCli showJobCli = new ShowJobCli(mockClientContext) { + @Override + protected void getAndPrintJobInfo() { + // do nothing + } + }; + showJobCli.run(new String[] { "--name", "my-job" }); + ShowJobParameters parameters = showJobCli.getParameters(); + Assert.assertEquals(parameters.getName(), "my-job"); + } + + private Map<String, String> getMockJobInfo(String jobName) { + Map<String, String> map = new HashMap<>(); + map.put(StorageKeyConstants.APPLICATION_ID, + ApplicationId.newInstance(1234L, 1).toString()); + map.put(StorageKeyConstants.JOB_RUN_ARGS, "job run 123456"); + map.put(StorageKeyConstants.INPUT_PATH, "hdfs://" + jobName); + return map; + } + + @Test + public void testSimpleShowJob() + throws InterruptedException, SubmarineException, YarnException, + ParseException, IOException { + SubmarineStorage storage = new MemorySubmarineStorage(); + MockClientContext mockClientContext = new MockClientContext(); + RuntimeFactory runtimeFactory = mock(RuntimeFactory.class); + when(runtimeFactory.getSubmarineStorage()).thenReturn(storage); + mockClientContext.setRuntimeFactory(runtimeFactory); + + ShowJobCli showJobCli = new ShowJobCli(mockClientContext); + + try { + showJobCli.run(new String[] { "--name", "my-job" }); + } catch (IOException e) { + // expected + } + + + storage.addNewJob("my-job", getMockJobInfo("my-job")); + showJobCli.run(new String[] { "--name", "my-job" }); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cadbc8b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java new file mode 100644 index 0000000..e1756b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.submarine.client.cli.yarnservice; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli; +import org.apache.hadoop.yarn.submarine.common.MockClientContext; +import org.apache.hadoop.yarn.submarine.common.api.TaskType; +import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; +import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter; +import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants; +import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage; +import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobSubmitter; +import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestYarnServiceRunJobCli { + @Before + public void before() throws IOException, YarnException { + SubmarineLogs.verboseOff(); + ServiceClient serviceClient = mock(ServiceClient.class); + when(serviceClient.actionCreate(any(Service.class))).thenReturn( + ApplicationId.newInstance(1234L, 1)); + YarnServiceUtils.setStubServiceClient(serviceClient); + } + + @Test + public void testPrintHelp() { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + runJobCli.printUsages(); + } + + private Service getServiceSpecFromJobSubmitter(JobSubmitter jobSubmitter) { + return ((YarnServiceJobSubmitter) jobSubmitter).getServiceSpec(); + } + + @Test + public void testBasicRunJobForDistributedTraining() throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", + "s3://output", "--num_workers", "3", "--num_ps", "2", + "--worker_launch_cmd", "python run-job.py", "--worker_resources", + "memory=2048M,vcores=2", "--ps_resources", "memory=4096M,vcores=4", + "--tensorboard", "true", "--ps_docker_image", "ps.image", + "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose" }); + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(3, serviceSpec.getComponents().size()); + Assert.assertTrue( + serviceSpec.getComponent(TaskType.WORKER.getComponentName()) != null); + Assert.assertTrue( + serviceSpec.getComponent(TaskType.PRIMARY_WORKER.getComponentName()) + != null); + Assert.assertTrue( + serviceSpec.getComponent(TaskType.PS.getComponentName()) != null); + Component primaryWorkerComp = serviceSpec.getComponent( + TaskType.PRIMARY_WORKER.getComponentName()); + Assert.assertEquals(2048, primaryWorkerComp.getResource().calcMemoryMB()); + Assert.assertEquals(2, + primaryWorkerComp.getResource().getCpus().intValue()); + + Component workerComp = serviceSpec.getComponent( + TaskType.WORKER.getComponentName()); + Assert.assertEquals(2048, workerComp.getResource().calcMemoryMB()); + Assert.assertEquals(2, workerComp.getResource().getCpus().intValue()); + + Component psComp = serviceSpec.getComponent(TaskType.PS.getComponentName()); + Assert.assertEquals(4096, psComp.getResource().calcMemoryMB()); + Assert.assertEquals(4, psComp.getResource().getCpus().intValue()); + + Assert.assertEquals("worker.image", workerComp.getArtifact().getId()); + Assert.assertEquals("ps.image", psComp.getArtifact().getId()); + + Assert.assertTrue(SubmarineLogs.isVerbose()); + + // TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC + } + + @Test + public void testBasicRunJobForSingleNodeTraining() throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", + "s3://output", "--num_workers", "1", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", "memory=2G,vcores=2", + "--tensorboard", "true", "--verbose" }); + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(1, serviceSpec.getComponents().size()); + Assert.assertTrue( + serviceSpec.getComponent(TaskType.PRIMARY_WORKER.getComponentName()) + != null); + Component primaryWorkerComp = serviceSpec.getComponent( + TaskType.PRIMARY_WORKER.getComponentName()); + Assert.assertEquals(2048, primaryWorkerComp.getResource().calcMemoryMB()); + Assert.assertEquals(2, + primaryWorkerComp.getResource().getCpus().intValue()); + + Assert.assertTrue(SubmarineLogs.isVerbose()); + + // TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC + } + + @Test + public void testParameterStorageForTrainingJob() throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", + "s3://output", "--num_workers", "1", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", "memory=2G,vcores=2", + "--tensorboard", "true", "--verbose" }); + SubmarineStorage storage = + mockClientContext.getRuntimeFactory().getSubmarineStorage(); + Map<String, String> jobInfo = storage.getJobInfoByName("my-job"); + Assert.assertTrue(jobInfo.size() > 0); + Assert.assertEquals(jobInfo.get(StorageKeyConstants.INPUT_PATH), + "s3://input"); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org