Repository: incubator-reef Updated Branches: refs/heads/master 13cf52250 -> c8db8f8ff
[REEF-55] Delayed creation and upload of the global.jar file until the first Evaluator submission on YARN. * Added the `GlobalJarUploader` class which performs the creation and upload. It is a `Callable` such that we can move this to a thread in a future version * Moved the actual upload logic from `EvaluatorSetupHelper` to `UploaderToJobFolder` JIRA: [REEF-55](https://issues.apache.org/jira/browse/REEF-55) Pull Request: Closes #28 Author: Markus Weimer <wei...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c8db8f8f Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c8db8f8f Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c8db8f8f Branch: refs/heads/master Commit: c8db8f8ff72ae5e285938f40e1a6ff6feb0866a6 Parents: 13cf522 Author: Markus Weimer <wei...@apache.org> Authored: Mon Jan 12 14:42:28 2015 +0900 Committer: Brian Cho <chobr...@apache.org> Committed: Mon Jan 12 14:45:25 2015 +0900 ---------------------------------------------------------------------- .../yarn/driver/EvaluatorSetupHelper.java | 98 +++++--------------- .../runtime/yarn/driver/GlobalJarUploader.java | 92 ++++++++++++++++++ .../yarn/driver/UploaderToJobfolder.java | 94 +++++++++++++++++++ 3 files changed, 209 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c8db8f8f/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java ---------------------------------------------------------------------- diff --git a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java index 55a7855..3a2ff5c 100644 --- a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java +++ b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java @@ -18,16 +18,8 @@ */ package org.apache.reef.runtime.yarn.driver; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.io.TempFileCreator; import org.apache.reef.io.WorkingDirectoryTempFileCreator; @@ -35,7 +27,6 @@ import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.runtime.common.files.JobJarMaker; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.parameters.DeleteTempFiles; -import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Tang; import org.apache.reef.tang.annotations.Parameter; @@ -58,49 +49,41 @@ final class EvaluatorSetupHelper { private static final Logger LOG = Logger.getLogger(EvaluatorSetupHelper.class.getName()); - private final String jobSubmissionDirectory; - private final Map<String, LocalResource> globalResources; private final REEFFileNames fileNames; private final ConfigurationSerializer configurationSerializer; - private final FileSystem fileSystem; private final TempFileCreator tempFileCreator; + private final UploaderToJobFolder uploader; + private final GlobalJarUploader globalJarUploader; private final boolean deleteTempFiles; @Inject EvaluatorSetupHelper( - final @Parameter(JobSubmissionDirectory.class) String jobSubmissionDirectory, - final YarnConfiguration yarnConfiguration, final REEFFileNames fileNames, final ConfigurationSerializer configurationSerializer, final TempFileCreator tempFileCreator, - final @Parameter(DeleteTempFiles.class) boolean deleteTempFiles) throws IOException { + final @Parameter(DeleteTempFiles.class) boolean deleteTempFiles, + final UploaderToJobFolder uploader, + final GlobalJarUploader globalJarUploader) throws IOException { this.tempFileCreator = tempFileCreator; this.deleteTempFiles = deleteTempFiles; + this.globalJarUploader = globalJarUploader; - this.fileSystem = FileSystem.get(yarnConfiguration); - this.jobSubmissionDirectory = jobSubmissionDirectory; this.fileNames = fileNames; this.configurationSerializer = configurationSerializer; - this.globalResources = this.setup(); + this.uploader = uploader; } - public Map<String, LocalResource> getGlobalResources() { - return this.globalResources; + /** + * @return the map to be used in formulating the evaluator launch submission. + */ + Map<String, LocalResource> getGlobalResources() { + try { + return this.globalJarUploader.call(); + } catch (IOException e) { + throw new RuntimeException("Unable to upload the global JAR file to the job folder.", e); + } } - private Map<String, LocalResource> setup() throws IOException { - final Map<String, LocalResource> result = new HashMap<>(1); - final Path pathToGlobalJar = this.uploadToJobFolder(makeGlobalJar()); - result.put(this.fileNames.getGlobalFolderPath(), makeLocalResourceForJarFile(pathToGlobalJar)); - return result; - } - - private File makeGlobalJar() throws IOException { - final File jarFile = new File( - this.fileNames.getGlobalFolderName() + this.fileNames.getJarFileSuffix()); - new JARFileMaker(jarFile).addChildren(this.fileNames.getGlobalFolder()).close(); - return jarFile; - } /** * Sets up the LocalResources for a new Evaluator. @@ -109,7 +92,7 @@ final class EvaluatorSetupHelper { * @return * @throws IOException */ - public Map<String, LocalResource> getResources( + Map<String, LocalResource> getResources( final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) throws IOException { @@ -119,10 +102,8 @@ final class EvaluatorSetupHelper { final File localStagingFolder = this.tempFileCreator.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix()); // Write the configuration - final File configurationFile = new File( - localStagingFolder, this.fileNames.getEvaluatorConfigurationName()); - this.configurationSerializer.toFile( - makeEvaluatorConfiguration(resourceLaunchProto), configurationFile); + final File configurationFile = new File(localStagingFolder, this.fileNames.getEvaluatorConfigurationName()); + this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchProto), configurationFile); // Copy files to the staging folder JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder); @@ -133,8 +114,8 @@ final class EvaluatorSetupHelper { new JARFileMaker(localFile).addChildren(localStagingFolder).close(); // Upload the JAR to the job folder - final Path pathToEvaluatorJar = uploadToJobFolder(localFile); - result.put(this.fileNames.getLocalFolderPath(), makeLocalResourceForJarFile(pathToEvaluatorJar)); + final Path pathToEvaluatorJar = this.uploader.uploadToJobFolder(localFile); + result.put(this.fileNames.getLocalFolderPath(), this.uploader.makeLocalResourceForJarFile(pathToEvaluatorJar)); if (this.deleteTempFiles) { LOG.log(Level.FINE, "Marking [{0}] for deletion at the exit of this JVM and deleting [{1}]", @@ -156,44 +137,11 @@ final class EvaluatorSetupHelper { * @throws IOException */ - private Configuration makeEvaluatorConfiguration( - final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) throws IOException { + private Configuration makeEvaluatorConfiguration(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) + throws IOException { return Tang.Factory.getTang() .newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf())) .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class) .build(); } - - /** - * Uploads the given file to the job folder on (H)DFS. - * - * @param file - * @return - * @throws IOException - */ - private Path uploadToJobFolder(final File file) throws IOException { - final Path source = new Path(file.getAbsolutePath()); - final Path destination = new Path(this.jobSubmissionDirectory + "/" + file.getName()); - LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination}); - this.fileSystem.copyFromLocalFile(false, true, source, destination); - return destination; - } - - /** - * Creates a LocalResource instance for the JAR file referenced by the given Path - * - * @param path - * @return - * @throws IOException - */ - private LocalResource makeLocalResourceForJarFile(final Path path) throws IOException { - final LocalResource localResource = Records.newRecord(LocalResource.class); - final FileStatus status = FileContext.getFileContext(this.fileSystem.getUri()).getFileStatus(path); - localResource.setType(LocalResourceType.ARCHIVE); - localResource.setVisibility(LocalResourceVisibility.APPLICATION); - localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath())); - localResource.setTimestamp(status.getModificationTime()); - localResource.setSize(status.getLen()); - return localResource; - } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c8db8f8f/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java ---------------------------------------------------------------------- diff --git a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java new file mode 100644 index 0000000..c6ee91e --- /dev/null +++ b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.driver; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.util.JARFileMaker; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * Utility class that creates the JAR file with the global files on the driver and then uploads it to the job folder on + * (H)DFS. + */ +final class GlobalJarUploader implements Callable<Map<String, LocalResource>> { + + /** + * Used for the file system constants. + */ + private final REEFFileNames fileNames; + /** + * This will hold the actuall map to be used as the "global" resources when submitting Evaluators. + */ + private final Map<String, LocalResource> globalResources = new HashMap<>(1); + /** + * Utility to actually perform the update. + */ + private final UploaderToJobFolder uploader; + /** + * True, if globalResources contains the valid information which is cached after the first call to call(). + */ + private boolean isDone; + + @Inject + GlobalJarUploader(final REEFFileNames fileNames, + final UploaderToJobFolder uploader) { + this.fileNames = fileNames; + this.uploader = uploader; + } + + /** + * Creates the JAR file with the global files on the driver and then uploads it to the job folder on + * (H)DFS. + * + * @return the map to be used as the "global" resources when submitting Evaluators. + * @throws IOException if the creation of the JAR or the upload fails + */ + @Override + public synchronized Map<String, LocalResource> call() throws IOException { + if (!this.isDone) { + final Path pathToGlobalJar = this.uploader.uploadToJobFolder(makeGlobalJar()); + globalResources.put(this.fileNames.getGlobalFolderPath(), + this.uploader.makeLocalResourceForJarFile(pathToGlobalJar)); + this.isDone = true; + } + return this.globalResources; + } + + /** + * Creates the JAR file for upload. + * + * @return + * @throws IOException + */ + private File makeGlobalJar() throws IOException { + final File jarFile = new File(this.fileNames.getGlobalFolderName() + this.fileNames.getJarFileSuffix()); + new JARFileMaker(jarFile).addChildren(this.fileNames.getGlobalFolder()).close(); + return jarFile; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c8db8f8f/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java ---------------------------------------------------------------------- diff --git a/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java new file mode 100644 index 0000000..afc1d9c --- /dev/null +++ b/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.driver; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Uploads files to the current job folder. + */ +final class UploaderToJobFolder { + private static final Logger LOG = Logger.getLogger(UploaderToJobFolder.class.getName()); + + /** + * The path on (H)DFS which is used as the job's folder. + */ + private final String jobSubmissionDirectory; + /** + * The FileSystem instance to use for fs operations. + */ + private final FileSystem fileSystem; + + @Inject + UploaderToJobFolder(final @Parameter(JobSubmissionDirectory.class) String jobSubmissionDirectory, + final YarnConfiguration yarnConfiguration) throws IOException { + this.jobSubmissionDirectory = jobSubmissionDirectory; + this.fileSystem = FileSystem.get(yarnConfiguration); + } + + /** + * Uploads the given file to the job folder on (H)DFS. + * + * @param file + * @return + * @throws java.io.IOException + */ + Path uploadToJobFolder(final File file) throws IOException { + final Path source = new Path(file.getAbsolutePath()); + final Path destination = new Path(this.jobSubmissionDirectory + "/" + file.getName()); + LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination}); + this.fileSystem.copyFromLocalFile(false, true, source, destination); + return destination; + } + + /** + * Creates a LocalResource instance for the JAR file referenced by the given Path + * + * @param path + * @return + * @throws IOException + */ + LocalResource makeLocalResourceForJarFile(final Path path) throws IOException { + final LocalResource localResource = Records.newRecord(LocalResource.class); + final FileStatus status = FileContext.getFileContext(this.fileSystem.getUri()).getFileStatus(path); + localResource.setType(LocalResourceType.ARCHIVE); + localResource.setVisibility(LocalResourceVisibility.APPLICATION); + localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath())); + localResource.setTimestamp(status.getModificationTime()); + localResource.setSize(status.getLen()); + return localResource; + } +}