YARN-8757. [Submarine] Add Tensorboard component when --tensorboard is specified. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1824d5d1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1824d5d1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1824d5d1 Branch: refs/heads/YARN-7402 Commit: 1824d5d1c49c16db6341141fa204d4a4c02d0944 Parents: 56e0d63 Author: Sunil G <[email protected]> Authored: Wed Sep 19 22:18:55 2018 +0530 Committer: Sunil G <[email protected]> Committed: Wed Sep 19 22:18:55 2018 +0530 ---------------------------------------------------------------------- .../yarn/submarine/client/cli/CliConstants.java | 6 + .../yarn/submarine/client/cli/CliUtils.java | 10 +- .../yarn/submarine/client/cli/RunJobCli.java | 43 ++- .../client/cli/param/RunJobParameters.java | 52 +++- .../fs/DefaultRemoteDirectoryManager.java | 21 +- .../common/fs/RemoteDirectoryManager.java | 4 +- .../common/FSBasedSubmarineStorageImpl.java | 4 +- .../yarnservice/YarnServiceJobSubmitter.java | 126 ++++++--- .../runtimes/yarnservice/YarnServiceUtils.java | 26 +- .../yarnservice/TestYarnServiceRunJobCli.java | 268 ++++++++++++++++--- .../common/fs/MockRemoteDirectoryManager.java | 7 +- 11 files changed, 463 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java index d0958a8..d51ffc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java @@ -35,6 +35,10 @@ public class CliConstants { public static final String DOCKER_IMAGE = "docker_image"; public static final String QUEUE = "queue"; public static final String TENSORBOARD = "tensorboard"; + public static final String TENSORBOARD_RESOURCES = "tensorboard_resources"; + public static final String TENSORBOARD_DEFAULT_RESOURCES = + "memory=4G,vcores=1"; + public static final String WORKER_LAUNCH_CMD = "worker_launch_cmd"; public static final String SERVING_LAUNCH_CMD = "serving_launch_cmd"; public static final String PS_LAUNCH_CMD = "ps_launch_cmd"; @@ -45,4 +49,6 @@ public class CliConstants { public static final String WAIT_JOB_FINISH = "wait_job_finish"; public static final String PS_DOCKER_IMAGE = "ps_docker_image"; public static final String WORKER_DOCKER_IMAGE = "worker_docker_image"; + public static final String TENSORBOARD_DOCKER_IMAGE = + "tensorboard_docker_image"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java index 6dd3e4d..546c6eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java @@ -39,17 +39,9 @@ public class CliUtils { public static String replacePatternsInLaunchCommand(String specifiedCli, RunJobParameters jobRunParameters, RemoteDirectoryManager directoryManager) throws IOException { - String jobDir = jobRunParameters.getCheckpointPath(); - if (null == jobDir) { - jobDir = directoryManager.getJobCheckpointDir(jobRunParameters.getName(), - true).toString(); - } - String input = jobRunParameters.getInputPath(); + String jobDir = jobRunParameters.getCheckpointPath(); String savedModelDir = jobRunParameters.getSavedModelPath(); - if (null == savedModelDir) { - savedModelDir = jobDir; - } Map<String, String> replacePattern = new HashMap<>(); if (jobDir != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java index d7dfc0d..faa22d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java @@ -89,8 +89,16 @@ public class RunJobCli extends AbstractCli { options.addOption(CliConstants.DOCKER_IMAGE, true, "Docker image name/tag"); options.addOption(CliConstants.QUEUE, true, "Name of queue to run the job, by default it uses default queue"); - options.addOption(CliConstants.TENSORBOARD, true, - "Should we run TensorBoard" + " for this job? By default it's true"); + options.addOption(CliConstants.TENSORBOARD, false, + "Should we run TensorBoard" + + " for this job? By default it's disabled"); + options.addOption(CliConstants.TENSORBOARD_RESOURCES, true, + "Specify resources of Tensorboard, by default it is " + + CliConstants.TENSORBOARD_DEFAULT_RESOURCES); + options.addOption(CliConstants.TENSORBOARD_DOCKER_IMAGE, true, + "Specify Tensorboard docker image. when this is not " + + "specified, Tensorboard " + "uses --" + CliConstants.DOCKER_IMAGE + + " as default."); options.addOption(CliConstants.WORKER_LAUNCH_CMD, true, "Commandline of worker, arguments will be " + "directly used to launch the worker"); @@ -144,10 +152,39 @@ public class RunJobCli extends AbstractCli { throw e; } + // Set default job dir / saved model dir, etc. + setDefaultDirs(); + // replace patterns replacePatternsInParameters(); } + private void setDefaultDirs() throws IOException { + // Create directories if needed + String jobDir = parameters.getCheckpointPath(); + if (null == jobDir) { + if (parameters.getNumWorkers() > 0) { + jobDir = clientContext.getRemoteDirectoryManager().getJobCheckpointDir( + parameters.getName(), true).toString(); + } else { + // when #workers == 0, it means we only launch TB. In that case, + // point job dir to root dir so all job's metrics will be shown. + jobDir = clientContext.getRemoteDirectoryManager().getUserRootFolder() + .toString(); + } + parameters.setCheckpointPath(jobDir); + } + + if (parameters.getNumWorkers() > 0) { + // Only do this when #worker > 0 + String savedModelDir = parameters.getSavedModelPath(); + if (null == savedModelDir) { + savedModelDir = jobDir; + parameters.setSavedModelPath(savedModelDir); + } + } + } + private void storeJobInformation(String jobName, ApplicationId applicationId, String[] args) throws IOException { Map<String, String> jobInfo = new HashMap<>(); @@ -198,7 +235,7 @@ public class RunJobCli extends AbstractCli { } @VisibleForTesting - RunJobParameters getRunJobParameters() { + public RunJobParameters getRunJobParameters() { return parameters; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java index 6cab9e3..4558f6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java @@ -37,6 +37,8 @@ public class RunJobParameters extends RunParameters { private Resource workerResource; private Resource psResource; private boolean tensorboardEnabled; + private Resource tensorboardResource; + private String tensorboardDockerImage; private String workerLaunchCmd; private String psLaunchCmd; @@ -69,19 +71,23 @@ public class RunJobParameters extends RunParameters { // When distributed training is required if (nWorkers >= 2 && nPS > 0) { distributed = true; - } else if (nWorkers == 1 && nPS > 0) { + } else if (nWorkers <= 1 && nPS > 0) { throw new ParseException("Only specified one worker but non-zero PS, " + "please double check."); } - String workerResourceStr = parsedCommandLine.getOptionValue( - CliConstants.WORKER_RES); - if (workerResourceStr == null) { - throw new ParseException("--" + CliConstants.WORKER_RES + " is absent."); + workerResource = null; + if (nWorkers > 0) { + String workerResourceStr = parsedCommandLine.getOptionValue( + CliConstants.WORKER_RES); + if (workerResourceStr == null) { + throw new ParseException( + "--" + CliConstants.WORKER_RES + " is absent."); + } + workerResource = CliUtils.createResourceFromString( + workerResourceStr, + clientContext.getOrCreateYarnClient().getResourceTypeInfo()); } - Resource workerResource = CliUtils.createResourceFromString( - workerResourceStr, - clientContext.getOrCreateYarnClient().getResourceTypeInfo()); Resource psResource = null; if (nPS > 0) { @@ -94,9 +100,19 @@ public class RunJobParameters extends RunParameters { } boolean tensorboard = false; - if (parsedCommandLine.getOptionValue(CliConstants.TENSORBOARD) != null) { - tensorboard = Boolean.parseBoolean( - parsedCommandLine.getOptionValue(CliConstants.TENSORBOARD)); + if (parsedCommandLine.hasOption(CliConstants.TENSORBOARD)) { + tensorboard = true; + String tensorboardResourceStr = parsedCommandLine.getOptionValue( + CliConstants.TENSORBOARD_RESOURCES); + if (tensorboardResourceStr == null || tensorboardResourceStr.isEmpty()) { + tensorboardResourceStr = CliConstants.TENSORBOARD_DEFAULT_RESOURCES; + } + tensorboardResource = CliUtils.createResourceFromString( + tensorboardResourceStr, + clientContext.getOrCreateYarnClient().getResourceTypeInfo()); + tensorboardDockerImage = parsedCommandLine.getOptionValue( + CliConstants.TENSORBOARD_DOCKER_IMAGE); + this.setTensorboardResource(tensorboardResource); } if (parsedCommandLine.hasOption(CliConstants.WAIT_JOB_FINISH)) { @@ -115,7 +131,7 @@ public class RunJobParameters extends RunParameters { this.setInputPath(input).setCheckpointPath(jobDir).setNumPS(nPS).setNumWorkers(nWorkers) .setPSLaunchCmd(psLaunchCommand).setWorkerLaunchCmd(workerLaunchCmd) - .setPsResource(psResource).setWorkerResource(workerResource) + .setPsResource(psResource) .setTensorboardEnabled(tensorboard); super.updateParametersByParsedCommandline(parsedCommandLine, @@ -219,4 +235,16 @@ public class RunJobParameters extends RunParameters { public boolean isDistributed() { return distributed; } + + public Resource getTensorboardResource() { + return tensorboardResource; + } + + public void setTensorboardResource(Resource tensorboardResource) { + this.tensorboardResource = tensorboardResource; + } + + public String getTensorboardDockerImage() { + return tensorboardDockerImage; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java index fe8956a..b2e2b41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java @@ -14,6 +14,7 @@ package org.apache.hadoop.yarn.submarine.common.fs; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.submarine.client.cli.CliConstants; @@ -42,7 +43,10 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager { if (create) { createFolderIfNotExist(staging); } - return staging; + + // Get a file status to make sure it is a absolute path. + FileStatus fStatus = fs.getFileStatus(staging); + return fStatus.getPath(); } @Override @@ -70,8 +74,21 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager { return fs; } + @Override + public Path getUserRootFolder() throws IOException { + Path rootPath = new Path("submarine", "jobs"); + createFolderIfNotExist(rootPath); + // Get a file status to make sure it is a absolute path. + FileStatus fStatus = fs.getFileStatus(rootPath); + return fStatus.getPath(); + } + private Path getJobRootFolder(String jobName) throws IOException { - return new Path(new Path("submarine", "jobs"), jobName); + Path jobRootPath = getUserRootFolder(); + createFolderIfNotExist(jobRootPath); + // Get a file status to make sure it is a absolute path. + FileStatus fStatus = fs.getFileStatus(jobRootPath); + return fStatus.getPath(); } private void createFolderIfNotExist(Path path) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java index 132b314..ad0d428 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java @@ -27,4 +27,6 @@ public interface RemoteDirectoryManager { Path getModelDir(String modelName, boolean create) throws IOException; FileSystem getFileSystem() throws IOException; -} + + Path getUserRootFolder() throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java index ebf9581..767fe78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java @@ -32,11 +32,9 @@ import java.util.Map; * A super naive FS-based storage. */ public class FSBasedSubmarineStorageImpl extends SubmarineStorage { - ClientContext clientContext; RemoteDirectoryManager rdm; public FSBasedSubmarineStorageImpl(ClientContext clientContext) { - this.clientContext = clientContext; rdm = clientContext.getRemoteDirectoryManager(); } @@ -89,7 +87,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage { private Map<String, String> deserializeMap(FSDataInputStream fis) throws IOException { ObjectInput oi = new ObjectInputStream(fis); - Map<String, String> newMap = null; + Map<String, String> newMap; try { newMap = (Map<String, String>) oi.readObject(); } catch (ClassNotFoundException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java index a2a2067..8fb213f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -15,6 +15,7 @@ package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -59,6 +60,10 @@ public class YarnServiceJobSubmitter implements JobSubmitter { Service serviceSpec; private Set<Path> uploadedFiles = new HashSet<>(); + // Used by testing + private Map<String, String> componentToLocalLaunchScriptPath = + new HashMap<>(); + public YarnServiceJobSubmitter(ClientContext clientContext) { this.clientContext = clientContext; } @@ -186,6 +191,14 @@ public class YarnServiceJobSubmitter implements JobSubmitter { envs.put(Envs.TASK_TYPE_ENV, taskType.name()); } + private String getUserName() { + return System.getProperty("user.name"); + } + + private String getDNSDomain() { + return clientContext.getYarnConfig().get("hadoop.registry.dns.domain-name"); + } + /* * Generate a command launch script on local disk, returns patch to the script */ @@ -194,50 +207,48 @@ public class YarnServiceJobSubmitter implements JobSubmitter { File file = File.createTempFile(taskType.name() + "-launch-script", ".sh"); FileWriter fw = new FileWriter(file); - fw.append("#!/bin/bash\n"); - - addHdfsClassPathIfNeeded(parameters, fw, comp); - - // For primary_worker - if (taskType == TaskType.PRIMARY_WORKER) { - // Do we need tensorboard? - if (parameters.isTensorboardEnabled()) { - int tensorboardPort = 6006; - // Run tensorboard at the background - fw.append( - "tensorboard --port " + tensorboardPort + " --logdir " + parameters - .getCheckpointPath() + " &\n"); - } - } - - // When distributed training is required - if (parameters.isDistributed()) { - // Generated TF_CONFIG - String tfConfigEnv = YarnServiceUtils.getTFConfigEnv( - taskType.getComponentName(), parameters.getNumWorkers(), - parameters.getNumPS(), parameters.getName(), - System.getProperty("user.name"), - clientContext.getYarnConfig().get("hadoop.registry.dns.domain-name")); - fw.append("export TF_CONFIG=\"" + tfConfigEnv + "\"\n"); - } + try { + fw.append("#!/bin/bash\n"); - // Print launch command - if (taskType.equals(TaskType.WORKER) || taskType.equals( - TaskType.PRIMARY_WORKER)) { - fw.append(parameters.getWorkerLaunchCmd() + '\n'); + addHdfsClassPathIfNeeded(parameters, fw, comp); - if (SubmarineLogs.isVerbose()) { - LOG.info("Worker command =[" + parameters.getWorkerLaunchCmd() + "]"); - } - } else if (taskType.equals(TaskType.PS)) { - fw.append(parameters.getPSLaunchCmd() + '\n'); + if (taskType.equals(TaskType.TENSORBOARD)) { + String tbCommand = + "export LC_ALL=C && tensorboard --logdir=" + parameters + .getCheckpointPath(); + fw.append(tbCommand + "\n"); + LOG.info("Tensorboard command=" + tbCommand); + } else{ + // When distributed training is required + if (parameters.isDistributed()) { + // Generated TF_CONFIG + String tfConfigEnv = YarnServiceUtils.getTFConfigEnv( + taskType.getComponentName(), parameters.getNumWorkers(), + parameters.getNumPS(), parameters.getName(), getUserName(), + getDNSDomain()); + fw.append("export TF_CONFIG=\"" + tfConfigEnv + "\"\n"); + } - if (SubmarineLogs.isVerbose()) { - LOG.info("PS command =[" + parameters.getPSLaunchCmd() + "]"); + // Print launch command + if (taskType.equals(TaskType.WORKER) || taskType.equals( + TaskType.PRIMARY_WORKER)) { + fw.append(parameters.getWorkerLaunchCmd() + '\n'); + + if (SubmarineLogs.isVerbose()) { + LOG.info( + "Worker command =[" + parameters.getWorkerLaunchCmd() + "]"); + } + } else if (taskType.equals(TaskType.PS)) { + fw.append(parameters.getPSLaunchCmd() + '\n'); + + if (SubmarineLogs.isVerbose()) { + LOG.info("PS command =[" + parameters.getPSLaunchCmd() + "]"); + } + } } + } finally { + fw.close(); } - - fw.close(); return file.getAbsolutePath(); } @@ -320,6 +331,8 @@ public class YarnServiceJobSubmitter implements JobSubmitter { destScriptFileName, component); component.setLaunchCommand("./" + destScriptFileName); + componentToLocalLaunchScriptPath.put(taskType.getComponentName(), + localScriptFile); } private void addWorkerComponent(Service service, @@ -410,6 +423,7 @@ public class YarnServiceJobSubmitter implements JobSubmitter { private Service createServiceByParameters(RunJobParameters parameters) throws IOException { + componentToLocalLaunchScriptPath.clear(); Service service = new Service(); service.setName(parameters.getName()); service.setVersion(String.valueOf(System.currentTimeMillis())); @@ -417,7 +431,9 @@ public class YarnServiceJobSubmitter implements JobSubmitter { handleServiceEnvs(service, parameters); - addWorkerComponents(service, parameters); + if (parameters.getNumWorkers() > 0) { + addWorkerComponents(service, parameters); + } if (parameters.getNumPS() > 0) { Component psComponent = new Component(); @@ -436,6 +452,31 @@ public class YarnServiceJobSubmitter implements JobSubmitter { handleLaunchCommand(parameters, TaskType.PS, psComponent); service.addComponent(psComponent); } + + if (parameters.isTensorboardEnabled()) { + Component tbComponent = new Component(); + tbComponent.setName(TaskType.TENSORBOARD.getComponentName()); + addCommonEnvironments(tbComponent, TaskType.TENSORBOARD); + tbComponent.setNumberOfContainers(1L); + tbComponent.setRestartPolicy(Component.RestartPolicyEnum.NEVER); + tbComponent.setResource(getServiceResourceFromYarnResource( + parameters.getTensorboardResource())); + if (parameters.getTensorboardDockerImage() != null) { + tbComponent.setArtifact( + getDockerArtifact(parameters.getTensorboardDockerImage())); + } + + handleLaunchCommand(parameters, TaskType.TENSORBOARD, tbComponent); + + // Add tensorboard to quicklink + String tensorboardLink = "http://" + YarnServiceUtils.getDNSName( + parameters.getName(), TaskType.TENSORBOARD.getComponentName(), 0, + getUserName(), getDNSDomain(), 6006); + LOG.info("Link to tensorboard:" + tensorboardLink); + service.addComponent(tbComponent); + service.setQuicklinks(ImmutableMap.of("Tensorboard", tensorboardLink)); + } + return service; } @@ -458,4 +499,9 @@ public class YarnServiceJobSubmitter implements JobSubmitter { public Service getServiceSpec() { return serviceSpec; } + + @VisibleForTesting + public Map<String, String> getComponentToLocalLaunchScriptPath() { + return componentToLocalLaunchScriptPath; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java index f7ecc97..9238a67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java @@ -40,10 +40,23 @@ public class YarnServiceUtils { YarnServiceUtils.stubServiceClient = stubServiceClient; } + public static String getDNSName(String serviceName, String componentName, + int index, String userName, String domain, int port) { + return componentName + "-" + index + getDNSNameCommonSuffix(serviceName, + userName, domain, port); + } + + private static String getDNSNameCommonSuffix(String serviceName, + String userName, String domain, int port) { + String commonEndpointSuffix = + "." + serviceName + "." + userName + "." + domain + ":" + port; + return commonEndpointSuffix; + } + public static String getTFConfigEnv(String curCommponentName, int nWorkers, int nPs, String serviceName, String userName, String domain) { - String commonEndpointSuffix = - "." + serviceName + "." + userName + "." + domain + ":8000"; + String commonEndpointSuffix = getDNSNameCommonSuffix(serviceName, userName, + domain, 8000); String json = "{\\\"cluster\\\":{"; @@ -58,7 +71,14 @@ public class YarnServiceUtils { + " \\\"index\\\":" + '$' + Envs.TASK_INDEX_ENV + "},"; String environment = "\\\"environment\\\":\\\"cloud\\\"}"; - return json + master + worker + ps + task + environment; + StringBuilder sb = new StringBuilder(); + sb.append(json); + sb.append(master); + sb.append(worker); + sb.append(ps); + sb.append(task); + sb.append(environment); + return sb.toString(); } private static String getComponentArrayJson(String componentName, int count, http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java index e1756b8..a88d673 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.submarine.client.cli.yarnservice; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; @@ -32,11 +33,15 @@ import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants; import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage; import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobSubmitter; import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Map; import static org.mockito.Matchers.any; @@ -65,25 +70,8 @@ public class TestYarnServiceRunJobCli { return ((YarnServiceJobSubmitter) jobSubmitter).getServiceSpec(); } - @Test - public void testBasicRunJobForDistributedTraining() throws Exception { - MockClientContext mockClientContext = - YarnServiceCliTestUtils.getMockClientContext(); - RunJobCli runJobCli = new RunJobCli(mockClientContext); - Assert.assertFalse(SubmarineLogs.isVerbose()); - - runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", - "--input_path", "s3://input", "--checkpoint_path", - "s3://output", "--num_workers", "3", "--num_ps", "2", - "--worker_launch_cmd", "python run-job.py", "--worker_resources", - "memory=2048M,vcores=2", "--ps_resources", "memory=4096M,vcores=4", - "--tensorboard", "true", "--ps_docker_image", "ps.image", - "--worker_docker_image", "worker.image", - "--ps_launch_cmd", "python run-ps.py", "--verbose" }); - Service serviceSpec = getServiceSpecFromJobSubmitter( - runJobCli.getJobSubmitter()); - Assert.assertEquals(3, serviceSpec.getComponents().size()); + private void commonVerifyDistributedTrainingSpec(Service serviceSpec) + throws Exception { Assert.assertTrue( serviceSpec.getComponent(TaskType.WORKER.getComponentName()) != null); Assert.assertTrue( @@ -98,7 +86,7 @@ public class TestYarnServiceRunJobCli { primaryWorkerComp.getResource().getCpus().intValue()); Component workerComp = serviceSpec.getComponent( - TaskType.WORKER.getComponentName()); + TaskType.WORKER.getComponentName()); Assert.assertEquals(2048, workerComp.getResource().calcMemoryMB()); Assert.assertEquals(2, workerComp.getResource().getCpus().intValue()); @@ -110,8 +98,55 @@ public class TestYarnServiceRunJobCli { Assert.assertEquals("ps.image", psComp.getArtifact().getId()); Assert.assertTrue(SubmarineLogs.isVerbose()); + } + + @Test + public void testBasicRunJobForDistributedTraining() throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); - // TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose" }); + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(3, serviceSpec.getComponents().size()); + + commonVerifyDistributedTrainingSpec(serviceSpec); + } + + @Test + public void testBasicRunJobForDistributedTrainingWithTensorboard() + throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--tensorboard", "--ps_launch_cmd", "python run-ps.py", + "--verbose" }); + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(4, serviceSpec.getComponents().size()); + + commonVerifyDistributedTrainingSpec(serviceSpec); + + verifyTensorboardComponent(runJobCli, serviceSpec, + Resources.createResource(4096, 1)); } @Test @@ -123,13 +158,84 @@ public class TestYarnServiceRunJobCli { runJobCli.run( new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", - "--input_path", "s3://input", "--checkpoint_path", - "s3://output", "--num_workers", "1", "--worker_launch_cmd", - "python run-job.py", "--worker_resources", "memory=2G,vcores=2", - "--tensorboard", "true", "--verbose" }); + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "1", "--worker_launch_cmd", "python run-job.py", + "--worker_resources", "memory=2G,vcores=2", "--verbose" }); + + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(1, serviceSpec.getComponents().size()); + + commonTestSingleNodeTraining(serviceSpec); + } + + @Test + public void testTensorboardOnlyService() throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "0", "--tensorboard", "--verbose" }); + + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(1, serviceSpec.getComponents().size()); + + verifyTensorboardComponent(runJobCli, serviceSpec, + Resources.createResource(4096, 1)); + } + + @Test + public void testTensorboardOnlyServiceWithCustomizedDockerImageAndResourceCkptPath() + throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "0", "--tensorboard", "--verbose", + "--tensorboard_resources", "memory=2G,vcores=2", + "--tensorboard_docker_image", "tb_docker_image:001" }); + + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(1, serviceSpec.getComponents().size()); + + verifyTensorboardComponent(runJobCli, serviceSpec, + Resources.createResource(2048, 2)); + } + + @Test + public void testTensorboardOnlyServiceWithCustomizedDockerImageAndResource() + throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--num_workers", "0", "--tensorboard", "--verbose", + "--tensorboard_resources", "memory=2G,vcores=2", + "--tensorboard_docker_image", "tb_docker_image:001" }); + Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); Assert.assertEquals(1, serviceSpec.getComponents().size()); + + verifyTensorboardComponent(runJobCli, serviceSpec, + Resources.createResource(2048, 2)); + } + + private void commonTestSingleNodeTraining(Service serviceSpec) + throws Exception { Assert.assertTrue( serviceSpec.getComponent(TaskType.PRIMARY_WORKER.getComponentName()) != null); @@ -140,8 +246,110 @@ public class TestYarnServiceRunJobCli { primaryWorkerComp.getResource().getCpus().intValue()); Assert.assertTrue(SubmarineLogs.isVerbose()); + } + + private void verifyTensorboardComponent(RunJobCli runJobCli, + Service serviceSpec, Resource resource) throws Exception { + Assert.assertTrue( + serviceSpec.getComponent(TaskType.TENSORBOARD.getComponentName()) + != null); + Component tensorboardComp = serviceSpec.getComponent( + TaskType.TENSORBOARD.getComponentName()); + Assert.assertEquals(1, tensorboardComp.getNumberOfContainers().intValue()); + Assert.assertEquals(resource.getMemorySize(), + tensorboardComp.getResource().calcMemoryMB()); + Assert.assertEquals(resource.getVirtualCores(), + tensorboardComp.getResource().getCpus().intValue()); + + Assert.assertEquals("./run-TENSORBOARD.sh", + tensorboardComp.getLaunchCommand()); + + // Check docker image + if (runJobCli.getRunJobParameters().getTensorboardDockerImage() != null) { + Assert.assertEquals( + runJobCli.getRunJobParameters().getTensorboardDockerImage(), + tensorboardComp.getArtifact().getId()); + } else{ + Assert.assertNull(tensorboardComp.getArtifact()); + } + + YarnServiceJobSubmitter yarnServiceJobSubmitter = + (YarnServiceJobSubmitter) runJobCli.getJobSubmitter(); + + String expectedLaunchScript = + "#!/bin/bash\n" + "echo \"CLASSPATH:$CLASSPATH\"\n" + + "echo \"HADOOP_CONF_DIR:$HADOOP_CONF_DIR\"\n" + + "echo \"HADOOP_TOKEN_FILE_LOCATION:$HADOOP_TOKEN_FILE_LOCATION\"\n" + + "echo \"JAVA_HOME:$JAVA_HOME\"\n" + + "echo \"LD_LIBRARY_PATH:$LD_LIBRARY_PATH\"\n" + + "echo \"HADOOP_HDFS_HOME:$HADOOP_HDFS_HOME\"\n" + + "export LC_ALL=C && tensorboard --logdir=" + runJobCli + .getRunJobParameters().getCheckpointPath() + "\n"; + + verifyLaunchScriptForComponet(yarnServiceJobSubmitter, serviceSpec, + TaskType.TENSORBOARD, expectedLaunchScript); + } + + private void verifyLaunchScriptForComponet( + YarnServiceJobSubmitter yarnServiceJobSubmitter, Service serviceSpec, + TaskType taskType, String expectedLaunchScriptContent) throws Exception { + Map<String, String> componentToLocalLaunchScriptMap = + yarnServiceJobSubmitter.getComponentToLocalLaunchScriptPath(); + + String path = componentToLocalLaunchScriptMap.get( + taskType.getComponentName()); + + byte[] encoded = Files.readAllBytes(Paths.get(path)); + String scriptContent = new String(encoded, Charset.defaultCharset()); + + Assert.assertEquals(expectedLaunchScriptContent, scriptContent); + } + + @Test + public void testBasicRunJobForSingleNodeTrainingWithTensorboard() + throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "1", "--worker_launch_cmd", "python run-job.py", + "--worker_resources", "memory=2G,vcores=2", "--tensorboard", + "--verbose" }); + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + + Assert.assertEquals(2, serviceSpec.getComponents().size()); + + commonTestSingleNodeTraining(serviceSpec); + verifyTensorboardComponent(runJobCli, serviceSpec, + Resources.createResource(4096, 1)); + } + + @Test + public void testBasicRunJobForSingleNodeTrainingWithGeneratedCheckpoint() + throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + runJobCli.run( + new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--num_workers", "1", + "--worker_launch_cmd", "python run-job.py", "--worker_resources", + "memory=2G,vcores=2", "--tensorboard", "--verbose" }); + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + + Assert.assertEquals(2, serviceSpec.getComponents().size()); - // TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC + commonTestSingleNodeTraining(serviceSpec); + verifyTensorboardComponent(runJobCli, serviceSpec, + Resources.createResource(4096, 1)); } @Test @@ -153,10 +361,10 @@ public class TestYarnServiceRunJobCli { runJobCli.run( new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", - "--input_path", "s3://input", "--checkpoint_path", - "s3://output", "--num_workers", "1", "--worker_launch_cmd", - "python run-job.py", "--worker_resources", "memory=2G,vcores=2", - "--tensorboard", "true", "--verbose" }); + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "1", "--worker_launch_cmd", "python run-job.py", + "--worker_resources", "memory=2G,vcores=2", "--tensorboard", "true", + "--verbose" }); SubmarineStorage storage = mockClientContext.getRuntimeFactory().getSubmarineStorage(); Map<String, String> jobInfo = storage.getJobInfoByName("my-job"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1824d5d1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java index a195b59..b637036 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java @@ -53,7 +53,7 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager { @Override public Path getJobCheckpointDir(String jobName, boolean create) throws IOException { - return null; + return new Path("s3://generated_checkpoint_dir"); } @Override @@ -80,4 +80,9 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager { public FileSystem getFileSystem() throws IOException { return FileSystem.getLocal(new Configuration()); } + + @Override + public Path getUserRootFolder() throws IOException { + return new Path("s3://generated_root_dir"); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
