Repository: reef Updated Branches: refs/heads/master 20369d4d4 -> 1137cdee0
http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java index c79fe41..c77caf7 100644 --- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java +++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java @@ -18,7 +18,9 @@ */ package org.apache.reef.bridge.client; +import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters; import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters; +import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters; import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters; import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; @@ -45,30 +47,42 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { private static final String STRING_REP = "HelloREEF"; private static final String STRING_REP_QUOTED = "\"" + STRING_REP + "\""; private static final long NUMBER_REP = 12345; - private static final String AVRO_YARN_PARAMETERS_SERIALIZED_STRING = + private static final String AVRO_YARN_JOB_PARAMETERS_SERIALIZED_STRING = "{" + "\"sharedJobSubmissionParameters\":" + "{" + "\"jobId\":" + STRING_REP_QUOTED + "," + - "\"tcpBeginPort\":" + NUMBER_REP + "," + - "\"tcpRangeCount\":" + NUMBER_REP + "," + - "\"tcpTryCount\":" + NUMBER_REP + "," + "\"jobSubmissionFolder\":" + STRING_REP_QUOTED + "}," + - "\"driverMemory\":" + NUMBER_REP + "," + - "\"driverRecoveryTimeout\":" + NUMBER_REP + "," + "\"dfsJobSubmissionFolder\":\"" + STRING_REP + "\"," + "\"jobSubmissionDirectoryPrefix\":" + STRING_REP_QUOTED + "}"; - private static final String AVRO_YARN_CLUSTER_PARAMETERS_SERIALIZED_STRING = + private static final String AVRO_YARN_CLUSTER_JOB_PARAMETERS_SERIALIZED_STRING = "{" + - "\"yarnJobSubmissionParameters\":" + AVRO_YARN_PARAMETERS_SERIALIZED_STRING + "," + - "\"maxApplicationSubmissions\":" + NUMBER_REP + "," + + "\"yarnJobSubmissionParameters\":" + AVRO_YARN_JOB_PARAMETERS_SERIALIZED_STRING + "," + "\"securityTokenKind\":\"" + NULL_REP + "\"," + "\"securityTokenService\":\"" + NULL_REP + "\"" + "}"; + private static final String AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING = + "{" + + "\"sharedAppSubmissionParameters\":" + + "{" + + "\"tcpBeginPort\":" + NUMBER_REP + "," + + "\"tcpRangeCount\":" + NUMBER_REP + "," + + "\"tcpTryCount\":" + NUMBER_REP + + "}," + + "\"driverMemory\":" + NUMBER_REP + "," + + "\"driverRecoveryTimeout\":" + NUMBER_REP + + "}"; + + private static final String AVRO_YARN_CLUSTER_APP_PARAMETERS_SERIALIZED_STRING = + "{" + + "\"yarnAppSubmissionParameters\":" + AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING + "," + + "\"maxApplicationSubmissions\":" + NUMBER_REP + + "}"; + /** * Tests deserialization of the Avro parameters for submission from the cluster from C#. * @throws IOException @@ -84,7 +98,8 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { assert yarnClusterSubmissionFromCS.getTokenKind().equals(NULL_REP); assert yarnClusterSubmissionFromCS.getTokenService().equals(NULL_REP); - verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters()); + verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters(), + yarnClusterSubmissionFromCS.getYarnAppSubmissionParameters()); } /** @@ -93,7 +108,7 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { */ @Test public void testAvroYarnParametersDeserialization() throws IOException { - verifyYarnJobSubmissionParams(createAvroYarnJobSubmissionParameters()); + verifyYarnJobSubmissionParams(createAvroYarnJobSubmissionParameters(), createAvroYarnAppSubmissionParameters()); } /** @@ -102,13 +117,24 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { */ @Test public void testAvroYarnParametersSerialization() throws IOException { - try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - YarnJobSubmissionParametersFileGenerator.writeAvroYarnJobSubmissionParametersToOutputStream( - createYarnClusterSubmissionFromCS(), STRING_REP, outputStream); - final byte[] content = outputStream.toByteArray(); - try (final InputStream stream = new ByteArrayInputStream(content)) { - verifyYarnJobSubmissionParams( - YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(stream)); + try (final ByteArrayOutputStream appOutputStream = new ByteArrayOutputStream()) { + try (final ByteArrayOutputStream jobOutputStream = new ByteArrayOutputStream()) { + final YarnClusterSubmissionFromCS clusterSubmissionFromCS = createYarnClusterSubmissionFromCS(); + YarnSubmissionParametersFileGenerator.writeAvroYarnAppSubmissionParametersToOutputStream( + clusterSubmissionFromCS, appOutputStream); + YarnSubmissionParametersFileGenerator.writeAvroYarnJobSubmissionParametersToOutputStream( + clusterSubmissionFromCS, STRING_REP, jobOutputStream); + + final byte[] jobContent = jobOutputStream.toByteArray(); + final byte[] appContent = appOutputStream.toByteArray(); + + try (final InputStream appStream = new ByteArrayInputStream(appContent)) { + try (final InputStream jobStream = new ByteArrayInputStream(jobContent)) { + verifyYarnJobSubmissionParams( + YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(jobStream), + YarnBootstrapDriverConfigGenerator.readYarnAppSubmissionParametersFromInputStream(appStream)); + } + } } } } @@ -121,7 +147,8 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { @Test public void testYarnBootstrapDriverConfigGenerator() throws IOException, InjectionException { final Configuration yarnBootstrapDriverConfig = - YarnBootstrapDriverConfigGenerator.getYarnDriverConfiguration(createAvroYarnJobSubmissionParameters()); + YarnBootstrapDriverConfigGenerator.getYarnDriverConfiguration( + createAvroYarnJobSubmissionParameters(), createAvroYarnAppSubmissionParameters()); final Injector injector = Tang.Factory.getTang().newInjector(yarnBootstrapDriverConfig); assert injector.getNamedInstance(JobSubmissionDirectory.class).equals(STRING_REP); @@ -132,29 +159,45 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { assert injector.getNamedInstance(TcpPortRangeTryCount.class) == NUMBER_REP; } + private static AvroYarnAppSubmissionParameters createAvroYarnAppSubmissionParameters() throws IOException { + try (final InputStream stream = + new ByteArrayInputStream(AVRO_YARN_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { + return YarnBootstrapDriverConfigGenerator.readYarnAppSubmissionParametersFromInputStream(stream); + } + } + private static AvroYarnJobSubmissionParameters createAvroYarnJobSubmissionParameters() throws IOException { try (final InputStream stream = - new ByteArrayInputStream(AVRO_YARN_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { + new ByteArrayInputStream(AVRO_YARN_JOB_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { return YarnBootstrapDriverConfigGenerator.readYarnJobSubmissionParametersFromInputStream(stream); } } private static YarnClusterSubmissionFromCS createYarnClusterSubmissionFromCS() throws IOException { - try (final InputStream stream = + try (final InputStream appStream = new ByteArrayInputStream( - AVRO_YARN_CLUSTER_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { - return YarnClusterSubmissionFromCS.readYarnClusterSubmissionFromCSFromInputStream(stream); + AVRO_YARN_CLUSTER_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { + try (final InputStream jobStream = + new ByteArrayInputStream( + AVRO_YARN_CLUSTER_JOB_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { + return YarnClusterSubmissionFromCS.readYarnClusterSubmissionFromCSFromInputStream(appStream, jobStream); + } } } - private static void verifyYarnJobSubmissionParams(final AvroYarnJobSubmissionParameters jobSubmissionParameters) { + private static void verifyYarnJobSubmissionParams(final AvroYarnJobSubmissionParameters jobSubmissionParameters, + final AvroYarnAppSubmissionParameters appSubmissionParameters) { + final AvroAppSubmissionParameters sharedAppSubmissionParams = + appSubmissionParameters.getSharedAppSubmissionParameters(); + final AvroJobSubmissionParameters sharedJobSubmissionParams = jobSubmissionParameters.getSharedJobSubmissionParameters(); + + assert sharedAppSubmissionParams.getTcpBeginPort() == NUMBER_REP; + assert sharedAppSubmissionParams.getTcpRangeCount() == NUMBER_REP; + assert sharedAppSubmissionParams.getTcpTryCount() == NUMBER_REP; assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP); assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP); - assert sharedJobSubmissionParams.getTcpBeginPort() == NUMBER_REP; - assert sharedJobSubmissionParams.getTcpRangeCount() == NUMBER_REP; - assert sharedJobSubmissionParams.getTcpTryCount() == NUMBER_REP; assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP); assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP); } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java index 2faff66..1219503 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/CLRProcess.java @@ -20,6 +20,7 @@ package org.apache.reef.driver.evaluator; import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder; +import java.util.Collections; import java.util.List; /** @@ -60,7 +61,7 @@ public final class CLRProcess implements EvaluatorProcess { @Override public CLRProcess setConfigurationFileName(final String configurationFileName) { - commandBuilder.setConfigurationFileName(configurationFileName); + commandBuilder.setConfigurationFilePaths(Collections.singletonList(configurationFileName)); return this; } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java index 98f4d14..9b4a599 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/JVMProcess.java @@ -22,6 +22,7 @@ import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.RuntimePathProvider; import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; +import java.util.Collections; import java.util.List; /** @@ -72,7 +73,7 @@ public final class JVMProcess implements EvaluatorProcess { @Override public JVMProcess setConfigurationFileName(final String configurationFileName) { - commandBuilder.setConfigurationFileName(configurationFileName); + commandBuilder.setConfigurationFilePaths(Collections.singletonList(configurationFileName)); return this; } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java index 5de19c4..a51ad1c 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java @@ -51,7 +51,8 @@ public final class REEFFileNames { private static final String BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe"; private static final String SECURITY_TOKEN_IDENTIFIER_FILE = "SecurityTokenId"; private static final String SECURITY_TOKEN_PASSWORD_FILE = "SecurityTokenPwd"; - private static final String YARN_BOOTSTRAP_PARAM_FILE = "yarnparameters.json"; + private static final String YARN_BOOTSTRAP_APP_PARAM_FILE = "yarn-app-parameters.json"; + private static final String YARN_BOOTSTRAP_JOB_PARAM_FILE = "yarn-job-parameters.json"; @Inject public REEFFileNames() { @@ -234,14 +235,32 @@ public final class REEFFileNames { } /** - * @return File name the contains the bootstrap parameters for YARN job submission + * @return File name the contains the bootstrap application parameters for YARN job submission * without Java dependency. */ - public String getYarnBootstrapParamFile() { - return YARN_BOOTSTRAP_PARAM_FILE; + public String getYarnBootstrapAppParamFile() { + return YARN_BOOTSTRAP_APP_PARAM_FILE; } - public String getYarnBootstrapParamFilePath() { - return LOCAL_FOLDER_PATH + '/' + getYarnBootstrapParamFile(); + /** + * @return File name the contains the bootstrap job parameters for YARN job submission + * without Java dependency. + */ + public String getYarnBootstrapJobParamFile() { + return YARN_BOOTSTRAP_JOB_PARAM_FILE; + } + + /** + * @return Path to the bootstrap application parameters file for YARN job submission without Java dependency. + */ + public String getYarnBootstrapAppParamFilePath() { + return LOCAL_FOLDER_PATH + '/' + getYarnBootstrapAppParamFile(); + } + + /** + * @return Path to the bootstrap job parameters file for YARN job submission without Java dependency. + */ + public String getYarnBootstrapJobParamFilePath() { + return getYarnBootstrapJobParamFile(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java index 09c6646..9ad9c60 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java @@ -19,6 +19,7 @@ package org.apache.reef.runtime.common.launch; import org.apache.commons.lang.StringUtils; +import org.apache.reef.util.Optional; import java.io.File; import java.util.LinkedList; @@ -33,11 +34,10 @@ public class CLRLaunchCommandBuilder implements LaunchCommandBuilder { private static final Logger LOG = Logger.getLogger(CLRLaunchCommandBuilder.class.getName()); private static final String EVALUATOR_PATH = "reef/global/Org.Apache.Reef.Evaluator.exe"; - private String standardErrPath = null; private String standardOutPath = null; private int megaBytes = 0; - private String evaluatorConfigurationPath = null; + private Optional<List<String>> evaluatorConfigurationPaths = Optional.empty(); @Override public List<String> build() { @@ -47,7 +47,11 @@ public class CLRLaunchCommandBuilder implements LaunchCommandBuilder { LOG.log(Level.WARNING, "file can NOT be found: {0}", f.getAbsolutePath()); } result.add(f.getPath()); - result.add(evaluatorConfigurationPath); + if (evaluatorConfigurationPaths.isPresent()) { + for (final String evaluatorConfigurationPath : evaluatorConfigurationPaths.get()) { + result.add(evaluatorConfigurationPath); + } + } if (null != this.standardOutPath && !standardOutPath.isEmpty()) { result.add(">" + this.standardOutPath); } @@ -66,8 +70,8 @@ public class CLRLaunchCommandBuilder implements LaunchCommandBuilder { } @Override - public CLRLaunchCommandBuilder setConfigurationFileName(final String configurationFileName) { - this.evaluatorConfigurationPath = configurationFileName; + public CLRLaunchCommandBuilder setConfigurationFilePaths(final List<String> configurationFilePaths) { + this.evaluatorConfigurationPaths = Optional.of(configurationFilePaths); return this; } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java index e6f74ab..bf77ee4 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java @@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.launch; import org.apache.commons.lang.StringUtils; import org.apache.reef.runtime.common.REEFLauncher; import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.util.Optional; import java.io.File; import java.util.ArrayList; @@ -42,7 +43,7 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder { private static final String[] DEFAULT_OPTIONS = {"-XX:PermSize=128m", "-XX:MaxPermSize=128m"}; private String stderrPath = null; private String stdoutPath = null; - private String evaluatorConfigurationPath = null; + private Optional<List<String>> evaluatorConfigurationPaths = Optional.empty(); private String javaPath = null; private String classPath = null; private Boolean assertionsEnabled = null; @@ -112,7 +113,11 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder { "java.util.logging.config.file", "java.util.logging.config.class"); add(launcherClass.getName()); - add(evaluatorConfigurationPath); + if (evaluatorConfigurationPaths.isPresent()) { + for (final String configurationPath : evaluatorConfigurationPaths.get()) { + add(configurationPath); + } + } if (stdoutPath != null && !stdoutPath.isEmpty()) { add("1>"); @@ -133,8 +138,8 @@ public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder { } @Override - public JavaLaunchCommandBuilder setConfigurationFileName(final String configurationFileName) { - this.evaluatorConfigurationPath = configurationFileName; + public JavaLaunchCommandBuilder setConfigurationFilePaths(final List<String> configurationPaths) { + this.evaluatorConfigurationPaths = Optional.of(configurationPaths); return this; } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java index 0768298..396c9c8 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java @@ -42,10 +42,10 @@ public interface LaunchCommandBuilder { * Set the name of the configuration file for the Launcher. This file is assumed to exist in the working directory of * the process launched with this command line. * - * @param configurationFileName + * @param configurationFilePaths * @return this */ - LaunchCommandBuilder setConfigurationFileName(final String configurationFileName); + LaunchCommandBuilder setConfigurationFilePaths(final List<String> configurationFilePaths); /** * Names a file to which stdout will be redirected. http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java index cfe7872..fbcf820 100644 --- a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java +++ b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilderTest.java @@ -20,6 +20,7 @@ package org.apache.reef.runtime.common.launch; import org.junit.Test; +import java.util.Collections; import java.util.List; import static org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder.JVMOption; @@ -167,6 +168,7 @@ public final class JavaLaunchCommandBuilderTest { } private static JavaLaunchCommandBuilder newBuilder() { - return new JavaLaunchCommandBuilder().setConfigurationFileName("mockConfigurationFileName"); + return new JavaLaunchCommandBuilder() + .setConfigurationFilePaths(Collections.singletonList("mockConfigurationFileName")); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java index dfab9c2..8fd0594 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java @@ -37,6 +37,7 @@ import org.apache.reef.tang.annotations.Parameter; import javax.inject.Inject; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -149,7 +150,7 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler return new JavaLaunchCommandBuilder() .setJavaPath("%JAVA_HOME%/bin/java") - .setConfigurationFileName(this.filenames.getDriverConfigurationPath()) + .setConfigurationFilePaths(Collections.singletonList(this.filenames.getDriverConfigurationPath())) .setClassPath(this.classpath.getDriverClasspath()) .setMemory(jobSubmissionEvent.getDriverMemory().get()) .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName()) http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java index c53dc7e..e324cea 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/PreparedDriverFolderLauncher.java @@ -28,6 +28,7 @@ import org.apache.reef.tang.annotations.Parameter; import javax.inject.Inject; import java.io.File; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.logging.Level; @@ -91,7 +92,7 @@ public class PreparedDriverFolderLauncher { private List<String> makeLaunchCommand(final String jobId, final String clientRemoteId) { final List<String> command = new JavaLaunchCommandBuilder(commandPrefixList) - .setConfigurationFileName(this.fileNames.getDriverConfigurationPath()) + .setConfigurationFilePaths(Collections.singletonList(this.fileNames.getDriverConfigurationPath())) .setClassPath(this.classpath.getDriverClasspath()) .setMemory(DRIVER_MEMORY) .build(); http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java index 5c0a3f8..880d30c 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java @@ -40,6 +40,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collections; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -133,7 +134,7 @@ final class MesosJobSubmissionHandler implements JobSubmissionHandler { this.configurationSerializer.toFile(driverConfiguration, runtimeConfigurationFile); final List<String> launchCommand = new JavaLaunchCommandBuilder() - .setConfigurationFileName(this.fileNames.getDriverConfigurationPath()) + .setConfigurationFilePaths(Collections.singletonList(this.fileNames.getDriverConfigurationPath())) .setClassPath(this.classpath.getDriverClasspath()) .setMemory(jobSubmissionEvent.getDriverMemory().get()) .build(); http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java index f006e83..faf8e1f 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java @@ -20,6 +20,7 @@ package org.apache.reef.runtime.yarn.client; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.reef.annotations.audience.ClientSide; @@ -104,7 +105,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { : this.uploader.createJobFolder(submissionHelper.getApplicationId()); final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, jobFolderOnDfs.getPath()); final File jobSubmissionFile = this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration); - final LocalResource driverJarOnDfs = jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile); + final LocalResource driverJarOnDfs = + jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile, LocalResourceType.ARCHIVE); submissionHelper .addLocalResource(this.fileNames.getREEFFolderName(), driverJarOnDfs) http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java index c424fbc..0847f4d 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java @@ -34,9 +34,7 @@ import org.apache.reef.runtime.yarn.util.YarnTypes; import java.io.Closeable; import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -57,7 +55,7 @@ public final class YarnSubmissionHelper implements Closeable{ private final SecurityTokenProvider tokenProvider; private final List<String> commandPrefixList; private Class launcherClazz; - private String confFileName; + private List<String> configurationFilePaths; public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, final REEFFileNames fileNames, @@ -81,7 +79,7 @@ public final class YarnSubmissionHelper implements Closeable{ this.tokenProvider = tokenProvider; this.commandPrefixList = commandPrefixList; this.launcherClazz = REEFLauncher.class; - this.confFileName = this.fileNames.getDriverConfigurationPath(); + this.configurationFilePaths = Collections.singletonList(this.fileNames.getDriverConfigurationPath()); LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId); } @@ -92,12 +90,10 @@ public final class YarnSubmissionHelper implements Closeable{ this(yarnConfiguration, fileNames, classpath, tokenProvider, null); } - - - /** - * - * @return the application ID assigned by YARN. - */ + /** + * + * @return the application ID assigned by YARN. + */ public int getApplicationId() { return this.applicationId.getId(); } @@ -214,21 +210,22 @@ public final class YarnSubmissionHelper implements Closeable{ /** * Sets the configuration file for the job. - * Note that this does not have to be the Driver TANG configuration. In the bootstrap - * launch case, this can be the Avro file that supports the generation of a driver + * Note that this does not have to be Driver TANG configuration. In the bootstrap + * launch case, this can be the set of the Avro files that supports the generation of a driver * configuration file natively at the Launcher. - * @param configurationFileName + * @param configurationFilePaths * @return */ - public YarnSubmissionHelper setConfigurationFileName(final String configurationFileName) { - this.confFileName = configurationFileName; + public YarnSubmissionHelper setConfigurationFilePaths(final List<String> configurationFilePaths) { + this.configurationFilePaths = configurationFilePaths; return this; } public void submit() throws IOException, YarnException { + // SET EXEC COMMAND final List<String> launchCommand = new JavaLaunchCommandBuilder(launcherClazz, commandPrefixList) - .setConfigurationFileName(confFileName) + .setConfigurationFilePaths(configurationFilePaths) .setClassPath(this.classpath.getDriverClasspath()) .setMemory(this.applicationSubmissionContext.getResource().getMemory()) .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName()) http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java index a8b2c05..6de0561 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java @@ -88,18 +88,18 @@ public final class JobFolder { * @return * @throws IOException */ - public LocalResource uploadAsLocalResource(final File localFile) throws IOException { + public LocalResource uploadAsLocalResource(final File localFile, final LocalResourceType type) throws IOException { final Path p = upload(localFile); - return getLocalResourceForPath(p); + return getLocalResourceForPath(p, type); } /** * Creates a LocalResource instance for the JAR file referenced by the given Path. */ - public LocalResource getLocalResourceForPath(final Path jarPath) throws IOException { + public LocalResource getLocalResourceForPath(final Path jarPath, final LocalResourceType type) throws IOException { final LocalResource localResource = Records.newRecord(LocalResource.class); final FileStatus status = FileContext.getFileContext(fileSystem.getUri()).getFileStatus(jarPath); - localResource.setType(LocalResourceType.ARCHIVE); + localResource.setType(type); localResource.setVisibility(LocalResourceVisibility.APPLICATION); localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath())); localResource.setTimestamp(status.getModificationTime());
