Repository: incubator-reef Updated Branches: refs/heads/master 26209180f -> 8b31ee0e2
[REEF-765] Add validation and logging on the Java side of the C# client For YARN, this introduces the new class `YarnSubmissionFromCS`. It is used by `YarnJobSubmissionClient` to parse, log and validate the command line parameters passed from C#. For local, this introduces the new class `LocalSubmissionFromCS`. It is used by `LocalClient` to parse, log and validate the command line parameters passed from C#. JIRA: [REEF-765](https://issues.apache.org/jira/browse/REEF-765) Pull Request: This closes #506 Author: Markus Weimer <wei...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8b31ee0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8b31ee0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8b31ee0e Branch: refs/heads/master Commit: 8b31ee0e2343ce5377440aefde177e18c011c5ce Parents: 2620918 Author: Markus Weimer <wei...@apache.org> Authored: Thu Sep 17 13:50:51 2015 -0700 Committer: Julia Wang <juw...@microsoft.com> Committed: Thu Sep 17 17:55:06 2015 -0700 ---------------------------------------------------------------------- .../Functional/ReefFunctionalTest.cs | 4 +- .../apache/reef/bridge/client/LocalClient.java | 91 ++------- .../bridge/client/LocalSubmissionFromCS.java | 150 +++++++++++++++ .../bridge/client/YarnJobSubmissionClient.java | 97 ++-------- .../bridge/client/YarnSubmissionFromCS.java | 192 +++++++++++++++++++ 5 files changed, 382 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs index 38a773d..e19f53f 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs @@ -168,12 +168,12 @@ namespace Org.Apache.REEF.Tests.Functional if (string.IsNullOrWhiteSpace(driverContainerDirectory)) { - throw new InvalidOperationException("Cannot find driver container directory"); + throw new InvalidOperationException("Cannot find driver container directory: " + driverContainerDirectory); } string logFile = Path.Combine(driverContainerDirectory, logFileName); if (!File.Exists(logFile)) { - throw new InvalidOperationException("Driver stdout file not found"); + throw new InvalidOperationException("Driver stdout file not found: " + logFile); } return logFile; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java index c66c6cd..19a4055 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java @@ -19,32 +19,28 @@ package org.apache.reef.bridge.client; import org.apache.reef.client.parameters.DriverConfigurationProviders; -import org.apache.reef.io.TcpPortConfigurationProvider; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.common.files.REEFFileNames; -import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; import org.apache.reef.runtime.local.client.DriverConfigurationProvider; -import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; import org.apache.reef.runtime.local.client.PreparedDriverFolderLauncher; import org.apache.reef.tang.*; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.tang.formats.AvroConfigurationSerializer; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; import javax.inject.Inject; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Submits a folder containing a Driver to the local runtime. */ -public class LocalClient { +public final class LocalClient { + private static final Logger LOG = Logger.getLogger(LocalClient.class.getName()); private static final String CLIENT_REMOTE_ID = ClientRemoteIdentifier.NONE; private final AvroConfigurationSerializer configurationSerializer; private final PreparedDriverFolderLauncher launcher; @@ -53,12 +49,12 @@ public class LocalClient { private final Set<ConfigurationProvider> configurationProviders; @Inject - public LocalClient(final AvroConfigurationSerializer configurationSerializer, - final PreparedDriverFolderLauncher launcher, - final REEFFileNames fileNames, - final DriverConfigurationProvider driverConfigurationProvider, - @Parameter(DriverConfigurationProviders.class) - final Set<ConfigurationProvider> configurationProviders) { + private LocalClient(final AvroConfigurationSerializer configurationSerializer, + final PreparedDriverFolderLauncher launcher, + final REEFFileNames fileNames, + final DriverConfigurationProvider driverConfigurationProvider, + @Parameter(DriverConfigurationProviders.class) + final Set<ConfigurationProvider> configurationProviders) { this.configurationSerializer = configurationSerializer; this.launcher = launcher; this.fileNames = fileNames; @@ -66,84 +62,39 @@ public class LocalClient { this.configurationProviders = configurationProviders; } - public void submit(final File jobFolder, final String jobId) throws IOException { - if (!jobFolder.exists()) { - throw new IOException("The Job folder" + jobFolder.getAbsolutePath() + "doesn't exist."); - } - - final File driverFolder = new File(jobFolder, PreparedDriverFolderLauncher.DRIVER_FOLDER_NAME); + private void submit(final LocalSubmissionFromCS localSubmissionFromCS) throws IOException { + final File driverFolder = new File(localSubmissionFromCS.getJobFolder(), + PreparedDriverFolderLauncher.DRIVER_FOLDER_NAME); if (!driverFolder.exists()) { throw new IOException("The Driver folder " + driverFolder.getAbsolutePath() + " doesn't exist."); } final Configuration driverConfiguration1 = driverConfigurationProvider - .getDriverConfiguration(jobFolder, CLIENT_REMOTE_ID, jobId, - Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER); + .getDriverConfiguration(localSubmissionFromCS.getJobFolder(), CLIENT_REMOTE_ID, + localSubmissionFromCS.getJobId(), Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER); final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(); for (final ConfigurationProvider configurationProvider : this.configurationProviders) { configurationBuilder.addConfiguration(configurationProvider.getConfiguration()); } - final Configuration providedConfigurations = configurationBuilder.build(); + final Configuration providedConfigurations = configurationBuilder.build(); final Configuration driverConfiguration = Configurations.merge( driverConfiguration1, providedConfigurations); final File driverConfigurationFile = new File(driverFolder, fileNames.getDriverConfigurationPath()); configurationSerializer.toFile(driverConfiguration, driverConfigurationFile); - launcher.launch(driverFolder, jobId, CLIENT_REMOTE_ID); + launcher.launch(driverFolder, localSubmissionFromCS.getJobId(), CLIENT_REMOTE_ID); } - public static void main(final String[] args) throws InjectionException, IOException { - // TODO: Make the parameters of the local runtime command line arguments of this tool. - - // We assume the given path to be the one of the driver. The job folder is one level up from there. - final File jobFolder = new File(args[0]).getParentFile(); - final String runtimeRootFolder = jobFolder.getParentFile().getAbsolutePath(); - final String jobId = args[1]; - // The number of evaluators the local runtime can create - final int numberOfEvaluators = Integer.valueOf(args[2]); - final int tcpBeginPort = Integer.valueOf(args[3]); - final int tcpRangeCount = Integer.valueOf(args[4]); - final int tcpTryCount = Integer.valueOf(args[5]); - - - final Configuration runtimeConfiguration = getRuntimeConfiguration(new File(args[0]), numberOfEvaluators, - runtimeRootFolder, tcpBeginPort, tcpRangeCount, tcpTryCount); + final LocalSubmissionFromCS localSubmissionFromCS = LocalSubmissionFromCS.fromCommandLine(args); + LOG.log(Level.INFO, "Local job submission received from C#: {0}", localSubmissionFromCS); + final Configuration runtimeConfiguration = localSubmissionFromCS.getRuntimeConfiguration(); final LocalClient client = Tang.Factory.getTang() .newInjector(runtimeConfiguration) .getInstance(LocalClient.class); - client.submit(jobFolder, jobId); - } - - private static Configuration getRuntimeConfiguration( - final File jobFolder, - final int numberOfEvaluators, - final String runtimeRootFolder, - final int tcpBeginPort, - final int tcpRangeCount, - final int tcpTryCount) { - final Configuration runtimeConfiguration = getRuntimeConfiguration(numberOfEvaluators, runtimeRootFolder); - ArrayList<String> driverLaunchCommandPrefixList = new ArrayList<String>(); - String path = new File(jobFolder, new REEFFileNames().getDriverLauncherExeFile().toString()).toString(); - - driverLaunchCommandPrefixList.add(path); - final Configuration userproviderConfiguration = Tang.Factory.getTang().newConfigurationBuilder() - .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) - .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort)) - .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount)) - .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount)) - .bindList(DriverLaunchCommandPrefix.class, driverLaunchCommandPrefixList) - .build(); - return Configurations.merge(runtimeConfiguration, userproviderConfiguration); - } - - private static Configuration getRuntimeConfiguration(final int numberOfEvaluators, final String runtimeRootFolder) { - return LocalRuntimeConfiguration.CONF - .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, Integer.toString(numberOfEvaluators)) - .set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, runtimeRootFolder) - .build(); + client.submit(localSubmissionFromCS); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java new file mode 100644 index 0000000..58c5fea --- /dev/null +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.client; + +import org.apache.commons.lang.Validate; +import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.io.TcpPortConfigurationProvider; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; +import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Tang; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; + +import java.io.File; +import java.util.ArrayList; + +/** + * Represents a job submission from the CS code. + * <p/> + * This class exists mostly to parse and validate the command line parameters provided by the C# class + * `Org.Apache.REEF.Client.Local.LocalClient` + */ +final class LocalSubmissionFromCS { + private final File driverFolder; + private final File jobFolder; + private final File runtimeRootFolder; + private final String jobId; + private final int numberOfEvaluators; + private final int tcpBeginPort; + private final int tcpRangeCount; + private final int tcpTryCount; + + private LocalSubmissionFromCS(final File driverFolder, + final String jobId, + final int numberOfEvaluators, + final int tcpBeginPort, + final int tcpRangeCount, + final int tcpTryCount) { + Validate.isTrue(driverFolder.exists(), "The driver folder does not exist."); + Validate.notEmpty(jobId, "The job is is null or empty."); + Validate.isTrue(numberOfEvaluators >= 0, "The number of evaluators is < 0."); + Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0."); + Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0."); + Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0."); + // We assume the given path to be the one of the driver. The job folder is one level up from there. + this.driverFolder = driverFolder; + this.jobFolder = driverFolder.getParentFile(); + this.runtimeRootFolder = jobFolder.getParentFile(); + this.jobId = jobId; + this.numberOfEvaluators = numberOfEvaluators; + this.tcpBeginPort = tcpBeginPort; + this.tcpRangeCount = tcpRangeCount; + this.tcpTryCount = tcpTryCount; + } + + /** + * @return the runtime configuration, based on the parameters passed from C#. + */ + Configuration getRuntimeConfiguration() { + final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF + .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, Integer.toString(numberOfEvaluators)) + .set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, runtimeRootFolder.getAbsolutePath()) + .build(); + + final ArrayList<String> driverLaunchCommandPrefixList = new ArrayList<>(); + driverLaunchCommandPrefixList.add( + new File(driverFolder, + new REEFFileNames().getDriverLauncherExeFile().toString() + ).toString()); + + final Configuration userProviderConfiguration = Tang.Factory.getTang().newConfigurationBuilder() + .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) + .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort)) + .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount)) + .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount)) + .bindList(DriverLaunchCommandPrefix.class, driverLaunchCommandPrefixList) + .build(); + + return Configurations.merge(runtimeConfiguration, userProviderConfiguration); + } + + @Override + public String toString() { + return "LocalSubmissionFromCS{" + + "driverFolder=" + driverFolder + + ", jobFolder=" + jobFolder + + ", runtimeRootFolder=" + runtimeRootFolder + + ", jobId='" + jobId + '\'' + + ", numberOfEvaluators=" + numberOfEvaluators + + ", tcpBeginPort=" + tcpBeginPort + + ", tcpRangeCount=" + tcpRangeCount + + ", tcpTryCount=" + tcpTryCount + + '}'; + } + + /** + * @return The folder in which the job is staged. + */ + File getJobFolder() { + return jobFolder; + } + + /** + * @return The id of this job. + */ + String getJobId() { + return jobId; + } + + /** + * Gets parameters from C#: + * <p/> + * args[0]: Driver folder. + * args[1]: Job ID. + * args[2]: Number of Evaluators. + * args[3]: First port to open. + * args[4]: Port range size. + * args[5]: Port open trial count. + */ + static LocalSubmissionFromCS fromCommandLine(final String[] args) { + final File driverFolder = new File(args[0]); + final String jobId = args[1]; + final int numberOfEvaluators = Integer.valueOf(args[2]); + final int tcpBeginPort = Integer.valueOf(args[3]); + final int tcpRangeCount = Integer.valueOf(args[4]); + final int tcpTryCount = Integer.valueOf(args[5]); + + return new LocalSubmissionFromCS(driverFolder, jobId, numberOfEvaluators, tcpBeginPort, tcpRangeCount, tcpTryCount); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java index 4ebda31..4a17a31 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java @@ -21,39 +21,35 @@ package org.apache.reef.bridge.client; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.reef.client.parameters.DriverConfigurationProviders; import org.apache.reef.client.DriverRestartConfiguration; import org.apache.reef.driver.parameters.MaxApplicationSubmissions; import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators; -import org.apache.reef.io.TcpPortConfigurationProvider; import org.apache.reef.javabridge.generic.JobDriver; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; -import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper; import org.apache.reef.runtime.yarn.client.uploader.JobFolder; import org.apache.reef.runtime.yarn.client.uploader.JobUploader; import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration; import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration; -import org.apache.reef.tang.*; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.tang.formats.ConfigurationSerializer; import org.apache.reef.util.JARFileMaker; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; import javax.inject.Inject; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -160,26 +156,7 @@ public final class YarnJobSubmissionClient { return jarFile; } - /** - * @param driverFolder the folder on the local filesystem that contains the driver's working directory to be - * submitted. - * @param jobId the ID of the job - * @param priority the priority associated with this Driver - * @param queue the queue to submit the driver to - * @param driverMemory in MB - * @throws IOException - * @throws YarnException - */ - private void launch(final File driverFolder, - final String jobId, - final int priority, - final String queue, - final int driverMemory) - throws IOException, YarnException { - if (!driverFolder.exists()) { - throw new IOException("The Driver folder" + driverFolder.getAbsolutePath() + "doesn't exist."); - } - + private void launch(final YarnSubmissionFromCS yarnSubmission) throws IOException, YarnException { // ------------------------------------------------------------------------ // Get an application ID try (final YarnSubmissionHelper submissionHelper = @@ -190,8 +167,9 @@ public final class YarnJobSubmissionClient { // Prepare the JAR final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId()); final Configuration jobSubmissionConfiguration = - this.addYarnDriverConfiguration(driverFolder, jobId, jobFolderOnDFS.getPath().toString()); - final File jarFile = makeJar(driverFolder); + this.addYarnDriverConfiguration(yarnSubmission.getDriverFolder(), yarnSubmission.getJobId(), + jobFolderOnDFS.getPath().toString()); + final File jarFile = makeJar(yarnSubmission.getDriverFolder()); LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile); @@ -201,17 +179,17 @@ public final class YarnJobSubmissionClient { final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile); LOG.info("Uploaded job submission JAR"); - final Injector jobParamsInjector = Tang.Factory.getTang().newInjector(jobSubmissionConfiguration); + final Injector jobParamsInjector = Tang.Factory.getTang().newInjector(jobSubmissionConfiguration); // ------------------------------------------------------------------------ // Submit try { submissionHelper .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS) - .setApplicationName(jobId) - .setDriverMemory(driverMemory) - .setPriority(priority) - .setQueue(queue) + .setApplicationName(yarnSubmission.getJobId()) + .setDriverMemory(yarnSubmission.getDriverMemory()) + .setPriority(yarnSubmission.getPriority()) + .setQueue(yarnSubmission.getQueue()) .setMaxApplicationAttempts(this.maxApplicationSubmissions) .setPreserveEvaluators(jobParamsInjector.getNamedInstance(ResourceManagerPreserveEvaluators.class)) .submit(); @@ -221,34 +199,6 @@ public final class YarnJobSubmissionClient { } } - private static Configuration getRuntimeConfiguration(final int tcpBeginPort, - final int tcpRangeCount, - final int tcpTryCount, - final int driverRecoveryTimeout, - final int maxApplicationSubmissions) { - final Configuration yarnClientConfig = YarnClientConfiguration.CONF - .build(); - - final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder() - .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) - .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort)) - .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount)) - .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount)) - .build(); - - ArrayList<String> driverLaunchCommandPrefixList = new ArrayList<String>(); - driverLaunchCommandPrefixList.add(new REEFFileNames().getDriverLauncherExeFile().toString()); - - final Configuration yarnJobSubmissionClientParamsConfig = Tang.Factory.getTang().newConfigurationBuilder() - .bindNamedParameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class, - Integer.toString(driverRecoveryTimeout)) - .bindNamedParameter(MaxApplicationSubmissions.class, Integer.toString(maxApplicationSubmissions)) - .bindList(DriverLaunchCommandPrefix.class, driverLaunchCommandPrefixList) - .build(); - - return Configurations.merge(yarnClientConfig, providerConfig, yarnJobSubmissionClientParamsConfig); - } - /** * Takes 5 parameters from the C# side: * [0]: String. Driver folder. @@ -259,26 +209,13 @@ public final class YarnJobSubmissionClient { * [7]: int. Evaluator recovery timeout for driver restart. > 0 => restart is enabled. */ public static void main(final String[] args) throws InjectionException, IOException, YarnException { - final File driverFolder = new File(args[0]); - final String jobId = args[1]; - final int driverMemory = Integer.valueOf(args[2]); - final int tcpBeginPort = Integer.valueOf(args[3]); - final int tcpRangeCount = Integer.valueOf(args[4]); - final int tcpTryCount = Integer.valueOf(args[5]); - final int maxApplicationSubmissions = Integer.valueOf(args[6]); - final int driverRecoveryTimeout = Integer.valueOf(args[7]); - - // Static for now - final int priority = 1; - final String queue = "default"; - - final Configuration yarnConfiguration = getRuntimeConfiguration( - tcpBeginPort, tcpRangeCount, tcpTryCount, driverRecoveryTimeout, maxApplicationSubmissions); + final YarnSubmissionFromCS yarnSubmission = YarnSubmissionFromCS.fromCommandLine(args); + LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission); + final Configuration yarnConfiguration = yarnSubmission.getRuntimeConfiguration(); final YarnJobSubmissionClient client = Tang.Factory.getTang() .newInjector(yarnConfiguration) .getInstance(YarnJobSubmissionClient.class); - - client.launch(driverFolder, jobId, priority, queue, driverMemory); + client.launch(yarnSubmission); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java new file mode 100644 index 0000000..c399d94 --- /dev/null +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.client; + +import org.apache.commons.lang.Validate; +import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.driver.parameters.MaxApplicationSubmissions; +import org.apache.reef.io.TcpPortConfigurationProvider; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix; +import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Tang; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * Represents a job submission from the CS code. + * <p/> + * This class exists mostly to parse and validate the command line parameters provided by the C# class + * `Org.Apache.REEF.Client.YARN.YARNClient` + */ +final class YarnSubmissionFromCS { + private final File driverFolder; + private final String jobId; + private final int driverMemory; + private final int tcpBeginPort; + private final int tcpRangeCount; + private final int tcpTryCount; + private final int maxApplicationSubmissions; + private final int driverRecoveryTimeout; + // Static for now + private final int priority; + private final String queue; + + private YarnSubmissionFromCS(final File driverFolder, + final String jobId, + final int driverMemory, + final int tcpBeginPort, + final int tcpRangeCount, + final int tcpTryCount, + final int maxApplicationSubmissions, + final int driverRecoveryTimeout, + final int priority, + final String queue) { + + Validate.isTrue(driverFolder.exists(), "The driver folder given does not exist."); + Validate.notEmpty(jobId, "The job id is null or empty"); + Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 0."); + Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0."); + Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0."); + Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0."); + Validate.isTrue(maxApplicationSubmissions > 0, "The maximum number of app submissions given is <= 0."); + Validate.notEmpty(queue, "The queue is null or empty"); + + this.driverFolder = driverFolder; + this.jobId = jobId; + this.driverMemory = driverMemory; + this.tcpBeginPort = tcpBeginPort; + this.tcpRangeCount = tcpRangeCount; + this.tcpTryCount = tcpTryCount; + this.maxApplicationSubmissions = maxApplicationSubmissions; + this.driverRecoveryTimeout = driverRecoveryTimeout; + this.priority = priority; + this.queue = queue; + } + + @Override + public String toString() { + return "YarnSubmissionFromCS{" + + "driverFolder=" + driverFolder + + ", jobId='" + jobId + '\'' + + ", driverMemory=" + driverMemory + + ", tcpBeginPort=" + tcpBeginPort + + ", tcpRangeCount=" + tcpRangeCount + + ", tcpTryCount=" + tcpTryCount + + ", maxApplicationSubmissions=" + maxApplicationSubmissions + + ", driverRecoveryTimeout=" + driverRecoveryTimeout + + ", priority=" + priority + + ", queue='" + queue + '\'' + + '}'; + } + + /** + * Produces the YARN Runtime Configuration based on the parameters passed from C#. + * + * @return the YARN Runtime Configuration based on the parameters passed from C#. + */ + Configuration getRuntimeConfiguration() { + final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) + .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort)) + .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount)) + .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount)) + .build(); + + final List<String> driverLaunchCommandPrefixList = new ArrayList<>(); + driverLaunchCommandPrefixList.add(new REEFFileNames().getDriverLauncherExeFile().toString()); + + final Configuration yarnJobSubmissionClientParamsConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class, + Integer.toString(driverRecoveryTimeout)) + .bindNamedParameter(MaxApplicationSubmissions.class, Integer.toString(maxApplicationSubmissions)) + .bindList(DriverLaunchCommandPrefix.class, driverLaunchCommandPrefixList) + .build(); + + return Configurations.merge(YarnClientConfiguration.CONF.build(), providerConfig, + yarnJobSubmissionClientParamsConfig); + } + + /** + * @return The local folder where the driver is staged. + */ + File getDriverFolder() { + return driverFolder; + } + + /** + * @return the id of the job to be submitted. + */ + String getJobId() { + return jobId; + } + + /** + * @return the amount of memory to allocate for the Driver, in MB. + */ + int getDriverMemory() { + return driverMemory; + } + + /** + * @return The priority of the job submission + */ + int getPriority() { + return priority; + } + + /** + * @return The queue the driver will be submitted to. + */ + String getQueue() { + return queue; + } + + /** + * Takes 5 parameters from the C# side: + * [0]: String. Driver folder. + * [1]: String. Driver identifier. + * [2]: int. Driver memory. + * [3~5]: int. TCP configurations. + * [6]: int. Max application submissions. + * [7]: int. Evaluator recovery timeout for driver restart. > 0 => restart is enabled. + */ + static YarnSubmissionFromCS fromCommandLine(final String[] args) { + final File driverFolder = new File(args[0]); + final String jobId = args[1]; + final int driverMemory = Integer.valueOf(args[2]); + final int tcpBeginPort = Integer.valueOf(args[3]); + final int tcpRangeCount = Integer.valueOf(args[4]); + final int tcpTryCount = Integer.valueOf(args[5]); + final int maxApplicationSubmissions = Integer.valueOf(args[6]); + final int driverRecoveryTimeout = Integer.valueOf(args[7]); + // Static for now + final int priority = 1; + final String queue = "default"; + return new YarnSubmissionFromCS(driverFolder, jobId, driverMemory, tcpBeginPort, tcpRangeCount, tcpTryCount, + maxApplicationSubmissions, driverRecoveryTimeout, priority, queue); + } +}