Repository: incubator-reef
Updated Branches:
  refs/heads/master 26209180f -> 8b31ee0e2


[REEF-765] Add validation and logging on the Java side of the C# client

For YARN, this introduces the new class `YarnSubmissionFromCS`. It is used by
`YarnJobSubmissionClient` to parse, log and validate the command line parameters
passed from C#.

For local, this introduces the new class `LocalSubmissionFromCS`. It is used by
`LocalClient` to parse, log and validate the command line parameters passed from
C#.

JIRA:
  [REEF-765](https://issues.apache.org/jira/browse/REEF-765)

Pull Request:
  This closes #506

Author:    Markus Weimer <wei...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8b31ee0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8b31ee0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8b31ee0e

Branch: refs/heads/master
Commit: 8b31ee0e2343ce5377440aefde177e18c011c5ce
Parents: 2620918
Author: Markus Weimer <wei...@apache.org>
Authored: Thu Sep 17 13:50:51 2015 -0700
Committer: Julia Wang <juw...@microsoft.com>
Committed: Thu Sep 17 17:55:06 2015 -0700

----------------------------------------------------------------------
 .../Functional/ReefFunctionalTest.cs            |   4 +-
 .../apache/reef/bridge/client/LocalClient.java  |  91 ++-------
 .../bridge/client/LocalSubmissionFromCS.java    | 150 +++++++++++++++
 .../bridge/client/YarnJobSubmissionClient.java  |  97 ++--------
 .../bridge/client/YarnSubmissionFromCS.java     | 192 +++++++++++++++++++
 5 files changed, 382 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
index 38a773d..e19f53f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
@@ -168,12 +168,12 @@ namespace Org.Apache.REEF.Tests.Functional
 
             if (string.IsNullOrWhiteSpace(driverContainerDirectory))
             {
-                throw new InvalidOperationException("Cannot find driver 
container directory");
+                throw new InvalidOperationException("Cannot find driver 
container directory: " + driverContainerDirectory);
             }
             string logFile = Path.Combine(driverContainerDirectory, 
logFileName);
             if (!File.Exists(logFile))
             {
-                throw new InvalidOperationException("Driver stdout file not 
found");
+                throw new InvalidOperationException("Driver stdout file not 
found: " + logFile);
             }
             return logFile;
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
index c66c6cd..19a4055 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
@@ -19,32 +19,28 @@
 package org.apache.reef.bridge.client;
 
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
-import org.apache.reef.io.TcpPortConfigurationProvider;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.files.REEFFileNames;
-import 
org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
 import org.apache.reef.runtime.local.client.DriverConfigurationProvider;
-import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
 import org.apache.reef.runtime.local.client.PreparedDriverFolderLauncher;
 import org.apache.reef.tang.*;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.AvroConfigurationSerializer;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
 
 import javax.inject.Inject;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Submits a folder containing a Driver to the local runtime.
  */
-public class LocalClient {
+public final class LocalClient {
 
+  private static final Logger LOG = 
Logger.getLogger(LocalClient.class.getName());
   private static final String CLIENT_REMOTE_ID = ClientRemoteIdentifier.NONE;
   private final AvroConfigurationSerializer configurationSerializer;
   private final PreparedDriverFolderLauncher launcher;
@@ -53,12 +49,12 @@ public class LocalClient {
   private final Set<ConfigurationProvider> configurationProviders;
 
   @Inject
-  public LocalClient(final AvroConfigurationSerializer configurationSerializer,
-                     final PreparedDriverFolderLauncher launcher,
-                     final REEFFileNames fileNames,
-                     final DriverConfigurationProvider 
driverConfigurationProvider,
-                     @Parameter(DriverConfigurationProviders.class)
-                     final Set<ConfigurationProvider> configurationProviders)  
{
+  private LocalClient(final AvroConfigurationSerializer 
configurationSerializer,
+                      final PreparedDriverFolderLauncher launcher,
+                      final REEFFileNames fileNames,
+                      final DriverConfigurationProvider 
driverConfigurationProvider,
+                      @Parameter(DriverConfigurationProviders.class)
+                      final Set<ConfigurationProvider> configurationProviders) 
{
     this.configurationSerializer = configurationSerializer;
     this.launcher = launcher;
     this.fileNames = fileNames;
@@ -66,84 +62,39 @@ public class LocalClient {
     this.configurationProviders = configurationProviders;
   }
 
-  public void submit(final File jobFolder, final String jobId) throws 
IOException {
-    if (!jobFolder.exists()) {
-      throw new IOException("The Job folder" + jobFolder.getAbsolutePath() + 
"doesn't exist.");
-    }
-
-    final File driverFolder = new File(jobFolder, 
PreparedDriverFolderLauncher.DRIVER_FOLDER_NAME);
+  private void submit(final LocalSubmissionFromCS localSubmissionFromCS) 
throws IOException {
+    final File driverFolder = new File(localSubmissionFromCS.getJobFolder(),
+        PreparedDriverFolderLauncher.DRIVER_FOLDER_NAME);
     if (!driverFolder.exists()) {
       throw new IOException("The Driver folder " + 
driverFolder.getAbsolutePath() + " doesn't exist.");
     }
 
     final Configuration driverConfiguration1 = driverConfigurationProvider
-        .getDriverConfiguration(jobFolder, CLIENT_REMOTE_ID, jobId,
-            Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER);
+        .getDriverConfiguration(localSubmissionFromCS.getJobFolder(), 
CLIENT_REMOTE_ID,
+            localSubmissionFromCS.getJobId(), 
Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER);
     final ConfigurationBuilder configurationBuilder = 
Tang.Factory.getTang().newConfigurationBuilder();
     for (final ConfigurationProvider configurationProvider : 
this.configurationProviders) {
       
configurationBuilder.addConfiguration(configurationProvider.getConfiguration());
     }
-    final Configuration providedConfigurations =  configurationBuilder.build();
+    final Configuration providedConfigurations = configurationBuilder.build();
     final Configuration driverConfiguration = Configurations.merge(
         driverConfiguration1,
         providedConfigurations);
 
     final File driverConfigurationFile = new File(driverFolder, 
fileNames.getDriverConfigurationPath());
     configurationSerializer.toFile(driverConfiguration, 
driverConfigurationFile);
-    launcher.launch(driverFolder, jobId, CLIENT_REMOTE_ID);
+    launcher.launch(driverFolder, localSubmissionFromCS.getJobId(), 
CLIENT_REMOTE_ID);
   }
 
-
   public static void main(final String[] args) throws InjectionException, 
IOException {
-    // TODO: Make the parameters of the local runtime command line arguments 
of this tool.
-
-    // We assume the given path to be the one of the driver. The job folder is 
one level up from there.
-    final File jobFolder = new File(args[0]).getParentFile();
-    final String runtimeRootFolder = 
jobFolder.getParentFile().getAbsolutePath();
-    final String jobId = args[1];
-    // The number of evaluators the local runtime can create
-    final int numberOfEvaluators = Integer.valueOf(args[2]);
-    final int tcpBeginPort = Integer.valueOf(args[3]);
-    final int tcpRangeCount = Integer.valueOf(args[4]);
-    final int tcpTryCount = Integer.valueOf(args[5]);
-
-
-    final Configuration runtimeConfiguration = getRuntimeConfiguration(new 
File(args[0]), numberOfEvaluators,
-        runtimeRootFolder, tcpBeginPort, tcpRangeCount, tcpTryCount);
+    final LocalSubmissionFromCS localSubmissionFromCS = 
LocalSubmissionFromCS.fromCommandLine(args);
+    LOG.log(Level.INFO, "Local job submission received from C#: {0}", 
localSubmissionFromCS);
+    final Configuration runtimeConfiguration = 
localSubmissionFromCS.getRuntimeConfiguration();
 
     final LocalClient client = Tang.Factory.getTang()
         .newInjector(runtimeConfiguration)
         .getInstance(LocalClient.class);
 
-    client.submit(jobFolder, jobId);
-  }
-
-  private static Configuration getRuntimeConfiguration(
-      final File jobFolder,
-      final int numberOfEvaluators,
-      final String runtimeRootFolder,
-      final int tcpBeginPort,
-      final int tcpRangeCount,
-      final int tcpTryCount) {
-    final Configuration runtimeConfiguration = 
getRuntimeConfiguration(numberOfEvaluators, runtimeRootFolder);
-    ArrayList<String> driverLaunchCommandPrefixList = new ArrayList<String>();
-    String path = new File(jobFolder, new 
REEFFileNames().getDriverLauncherExeFile().toString()).toString();
-
-    driverLaunchCommandPrefixList.add(path);
-    final Configuration userproviderConfiguration = 
Tang.Factory.getTang().newConfigurationBuilder()
-        .bindSetEntry(DriverConfigurationProviders.class, 
TcpPortConfigurationProvider.class)
-        .bindNamedParameter(TcpPortRangeBegin.class, 
Integer.toString(tcpBeginPort))
-        .bindNamedParameter(TcpPortRangeCount.class, 
Integer.toString(tcpRangeCount))
-        .bindNamedParameter(TcpPortRangeTryCount.class, 
Integer.toString(tcpTryCount))
-        .bindList(DriverLaunchCommandPrefix.class, 
driverLaunchCommandPrefixList)
-        .build();
-    return Configurations.merge(runtimeConfiguration, 
userproviderConfiguration);
-  }
-
-  private static Configuration getRuntimeConfiguration(final int 
numberOfEvaluators, final String runtimeRootFolder) {
-    return LocalRuntimeConfiguration.CONF
-        .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 
Integer.toString(numberOfEvaluators))
-        .set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, runtimeRootFolder)
-        .build();
+    client.submit(localSubmissionFromCS);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
new file mode 100644
index 0000000..58c5fea
--- /dev/null
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.commons.lang.Validate;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import 
org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
+
+import java.io.File;
+import java.util.ArrayList;
+
+/**
+ * Represents a job submission from the CS code.
+ * <p/>
+ * This class exists mostly to parse and validate the command line parameters 
provided by the C# class
+ * `Org.Apache.REEF.Client.Local.LocalClient`
+ */
+final class LocalSubmissionFromCS {
+  private final File driverFolder;
+  private final File jobFolder;
+  private final File runtimeRootFolder;
+  private final String jobId;
+  private final int numberOfEvaluators;
+  private final int tcpBeginPort;
+  private final int tcpRangeCount;
+  private final int tcpTryCount;
+
+  private LocalSubmissionFromCS(final File driverFolder,
+                                final String jobId,
+                                final int numberOfEvaluators,
+                                final int tcpBeginPort,
+                                final int tcpRangeCount,
+                                final int tcpTryCount) {
+    Validate.isTrue(driverFolder.exists(), "The driver folder does not 
exist.");
+    Validate.notEmpty(jobId, "The job is is null or empty.");
+    Validate.isTrue(numberOfEvaluators >= 0, "The number of evaluators is < 
0.");
+    Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
+    Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0.");
+    Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0.");
+    // We assume the given path to be the one of the driver. The job folder is 
one level up from there.
+    this.driverFolder = driverFolder;
+    this.jobFolder = driverFolder.getParentFile();
+    this.runtimeRootFolder = jobFolder.getParentFile();
+    this.jobId = jobId;
+    this.numberOfEvaluators = numberOfEvaluators;
+    this.tcpBeginPort = tcpBeginPort;
+    this.tcpRangeCount = tcpRangeCount;
+    this.tcpTryCount = tcpTryCount;
+  }
+
+  /**
+   * @return the runtime configuration, based on the parameters passed from C#.
+   */
+  Configuration getRuntimeConfiguration() {
+    final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+        .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 
Integer.toString(numberOfEvaluators))
+        .set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, 
runtimeRootFolder.getAbsolutePath())
+        .build();
+
+    final ArrayList<String> driverLaunchCommandPrefixList = new ArrayList<>();
+    driverLaunchCommandPrefixList.add(
+        new File(driverFolder,
+            new REEFFileNames().getDriverLauncherExeFile().toString()
+        ).toString());
+
+    final Configuration userProviderConfiguration = 
Tang.Factory.getTang().newConfigurationBuilder()
+        .bindSetEntry(DriverConfigurationProviders.class, 
TcpPortConfigurationProvider.class)
+        .bindNamedParameter(TcpPortRangeBegin.class, 
Integer.toString(tcpBeginPort))
+        .bindNamedParameter(TcpPortRangeCount.class, 
Integer.toString(tcpRangeCount))
+        .bindNamedParameter(TcpPortRangeTryCount.class, 
Integer.toString(tcpTryCount))
+        .bindList(DriverLaunchCommandPrefix.class, 
driverLaunchCommandPrefixList)
+        .build();
+
+    return Configurations.merge(runtimeConfiguration, 
userProviderConfiguration);
+  }
+
+  @Override
+  public String toString() {
+    return "LocalSubmissionFromCS{" +
+        "driverFolder=" + driverFolder +
+        ", jobFolder=" + jobFolder +
+        ", runtimeRootFolder=" + runtimeRootFolder +
+        ", jobId='" + jobId + '\'' +
+        ", numberOfEvaluators=" + numberOfEvaluators +
+        ", tcpBeginPort=" + tcpBeginPort +
+        ", tcpRangeCount=" + tcpRangeCount +
+        ", tcpTryCount=" + tcpTryCount +
+        '}';
+  }
+
+  /**
+   * @return The folder in which the job is staged.
+   */
+  File getJobFolder() {
+    return jobFolder;
+  }
+
+  /**
+   * @return The id of this job.
+   */
+  String getJobId() {
+    return jobId;
+  }
+
+  /**
+   * Gets parameters from C#:
+   * <p/>
+   * args[0]: Driver folder.
+   * args[1]: Job ID.
+   * args[2]: Number of Evaluators.
+   * args[3]: First port to open.
+   * args[4]: Port range size.
+   * args[5]: Port open trial count.
+   */
+  static LocalSubmissionFromCS fromCommandLine(final String[] args) {
+    final File driverFolder = new File(args[0]);
+    final String jobId = args[1];
+    final int numberOfEvaluators = Integer.valueOf(args[2]);
+    final int tcpBeginPort = Integer.valueOf(args[3]);
+    final int tcpRangeCount = Integer.valueOf(args[4]);
+    final int tcpTryCount = Integer.valueOf(args[5]);
+
+    return new LocalSubmissionFromCS(driverFolder, jobId, numberOfEvaluators, 
tcpBeginPort, tcpRangeCount, tcpTryCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 4ebda31..4a17a31 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -21,39 +21,35 @@ package org.apache.reef.bridge.client;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.reef.client.parameters.DriverConfigurationProviders;
 import org.apache.reef.client.DriverRestartConfiguration;
 import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
 import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
-import org.apache.reef.io.TcpPortConfigurationProvider;
 import org.apache.reef.javabridge.generic.JobDriver;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import 
org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
 import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
-import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
 import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
 import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
 import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
 import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
 import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
-import org.apache.reef.tang.*;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
 import org.apache.reef.util.JARFileMaker;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
 
 import javax.inject.Inject;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -160,26 +156,7 @@ public final class YarnJobSubmissionClient {
     return jarFile;
   }
 
-  /**
-   * @param driverFolder the folder on the local filesystem that contains the 
driver's working directory to be
-   *                     submitted.
-   * @param jobId        the ID of the job
-   * @param priority     the priority associated with this Driver
-   * @param queue        the queue to submit the driver to
-   * @param driverMemory in MB
-   * @throws IOException
-   * @throws YarnException
-   */
-  private void launch(final File driverFolder,
-                      final String jobId,
-                      final int priority,
-                      final String queue,
-                      final int driverMemory)
-      throws IOException, YarnException {
-    if (!driverFolder.exists()) {
-      throw new IOException("The Driver folder" + 
driverFolder.getAbsolutePath() + "doesn't exist.");
-    }
-
+  private void launch(final YarnSubmissionFromCS yarnSubmission) throws 
IOException, YarnException {
     // ------------------------------------------------------------------------
     // Get an application ID
     try (final YarnSubmissionHelper submissionHelper =
@@ -190,8 +167,9 @@ public final class YarnJobSubmissionClient {
       // Prepare the JAR
       final JobFolder jobFolderOnDFS = 
this.uploader.createJobFolder(submissionHelper.getApplicationId());
       final Configuration jobSubmissionConfiguration =
-          this.addYarnDriverConfiguration(driverFolder, jobId, 
jobFolderOnDFS.getPath().toString());
-      final File jarFile = makeJar(driverFolder);
+          this.addYarnDriverConfiguration(yarnSubmission.getDriverFolder(), 
yarnSubmission.getJobId(),
+              jobFolderOnDFS.getPath().toString());
+      final File jarFile = makeJar(yarnSubmission.getDriverFolder());
       LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
 
 
@@ -201,17 +179,17 @@ public final class YarnJobSubmissionClient {
       final LocalResource jarFileOnDFS = 
jobFolderOnDFS.uploadAsLocalResource(jarFile);
       LOG.info("Uploaded job submission JAR");
 
-      final Injector jobParamsInjector  = 
Tang.Factory.getTang().newInjector(jobSubmissionConfiguration);
+      final Injector jobParamsInjector = 
Tang.Factory.getTang().newInjector(jobSubmissionConfiguration);
 
       // 
------------------------------------------------------------------------
       // Submit
       try {
         submissionHelper
             .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
-            .setApplicationName(jobId)
-            .setDriverMemory(driverMemory)
-            .setPriority(priority)
-            .setQueue(queue)
+            .setApplicationName(yarnSubmission.getJobId())
+            .setDriverMemory(yarnSubmission.getDriverMemory())
+            .setPriority(yarnSubmission.getPriority())
+            .setQueue(yarnSubmission.getQueue())
             .setMaxApplicationAttempts(this.maxApplicationSubmissions)
             
.setPreserveEvaluators(jobParamsInjector.getNamedInstance(ResourceManagerPreserveEvaluators.class))
             .submit();
@@ -221,34 +199,6 @@ public final class YarnJobSubmissionClient {
     }
   }
 
-  private static Configuration getRuntimeConfiguration(final int tcpBeginPort,
-                                                       final int tcpRangeCount,
-                                                       final int tcpTryCount,
-                                                       final int 
driverRecoveryTimeout,
-                                                       final int 
maxApplicationSubmissions) {
-    final Configuration yarnClientConfig = YarnClientConfiguration.CONF
-        .build();
-
-    final Configuration providerConfig = 
Tang.Factory.getTang().newConfigurationBuilder()
-        .bindSetEntry(DriverConfigurationProviders.class, 
TcpPortConfigurationProvider.class)
-        .bindNamedParameter(TcpPortRangeBegin.class, 
Integer.toString(tcpBeginPort))
-        .bindNamedParameter(TcpPortRangeCount.class, 
Integer.toString(tcpRangeCount))
-        .bindNamedParameter(TcpPortRangeTryCount.class, 
Integer.toString(tcpTryCount))
-        .build();
-
-    ArrayList<String> driverLaunchCommandPrefixList = new ArrayList<String>();
-    driverLaunchCommandPrefixList.add(new 
REEFFileNames().getDriverLauncherExeFile().toString());
-
-    final Configuration yarnJobSubmissionClientParamsConfig = 
Tang.Factory.getTang().newConfigurationBuilder()
-        
.bindNamedParameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class,
-            Integer.toString(driverRecoveryTimeout))
-        .bindNamedParameter(MaxApplicationSubmissions.class, 
Integer.toString(maxApplicationSubmissions))
-        .bindList(DriverLaunchCommandPrefix.class, 
driverLaunchCommandPrefixList)
-        .build();
-
-    return Configurations.merge(yarnClientConfig, providerConfig, 
yarnJobSubmissionClientParamsConfig);
-  }
-
   /**
    * Takes 5 parameters from the C# side:
    * [0]: String. Driver folder.
@@ -259,26 +209,13 @@ public final class YarnJobSubmissionClient {
    * [7]: int. Evaluator recovery timeout for driver restart. > 0 => restart 
is enabled.
    */
   public static void main(final String[] args) throws InjectionException, 
IOException, YarnException {
-    final File driverFolder = new File(args[0]);
-    final String jobId = args[1];
-    final int driverMemory = Integer.valueOf(args[2]);
-    final int tcpBeginPort = Integer.valueOf(args[3]);
-    final int tcpRangeCount = Integer.valueOf(args[4]);
-    final int tcpTryCount = Integer.valueOf(args[5]);
-    final int maxApplicationSubmissions = Integer.valueOf(args[6]);
-    final int driverRecoveryTimeout = Integer.valueOf(args[7]);
-
-    // Static for now
-    final int priority = 1;
-    final String queue = "default";
-
-    final Configuration yarnConfiguration = getRuntimeConfiguration(
-        tcpBeginPort, tcpRangeCount, tcpTryCount, driverRecoveryTimeout, 
maxApplicationSubmissions);
+    final YarnSubmissionFromCS yarnSubmission = 
YarnSubmissionFromCS.fromCommandLine(args);
+    LOG.log(Level.INFO, "YARN job submission received from C#: {0}", 
yarnSubmission);
+    final Configuration yarnConfiguration = 
yarnSubmission.getRuntimeConfiguration();
     final YarnJobSubmissionClient client = Tang.Factory.getTang()
         .newInjector(yarnConfiguration)
         .getInstance(YarnJobSubmissionClient.class);
-
-    client.launch(driverFolder, jobId, priority, queue, driverMemory);
+    client.launch(yarnSubmission);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8b31ee0e/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
new file mode 100644
index 0000000..c399d94
--- /dev/null
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.commons.lang.Validate;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
+import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import 
org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a job submission from the CS code.
+ * <p/>
+ * This class exists mostly to parse and validate the command line parameters 
provided by the C# class
+ * `Org.Apache.REEF.Client.YARN.YARNClient`
+ */
+final class YarnSubmissionFromCS {
+  private final File driverFolder;
+  private final String jobId;
+  private final int driverMemory;
+  private final int tcpBeginPort;
+  private final int tcpRangeCount;
+  private final int tcpTryCount;
+  private final int maxApplicationSubmissions;
+  private final int driverRecoveryTimeout;
+  // Static for now
+  private final int priority;
+  private final String queue;
+
+  private YarnSubmissionFromCS(final File driverFolder,
+                               final String jobId,
+                               final int driverMemory,
+                               final int tcpBeginPort,
+                               final int tcpRangeCount,
+                               final int tcpTryCount,
+                               final int maxApplicationSubmissions,
+                               final int driverRecoveryTimeout,
+                               final int priority,
+                               final String queue) {
+
+    Validate.isTrue(driverFolder.exists(), "The driver folder given does not 
exist.");
+    Validate.notEmpty(jobId, "The job id is null or empty");
+    Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 
0.");
+    Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
+    Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0.");
+    Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0.");
+    Validate.isTrue(maxApplicationSubmissions > 0, "The maximum number of app 
submissions given is <= 0.");
+    Validate.notEmpty(queue, "The queue is null or empty");
+
+    this.driverFolder = driverFolder;
+    this.jobId = jobId;
+    this.driverMemory = driverMemory;
+    this.tcpBeginPort = tcpBeginPort;
+    this.tcpRangeCount = tcpRangeCount;
+    this.tcpTryCount = tcpTryCount;
+    this.maxApplicationSubmissions = maxApplicationSubmissions;
+    this.driverRecoveryTimeout = driverRecoveryTimeout;
+    this.priority = priority;
+    this.queue = queue;
+  }
+
+  @Override
+  public String toString() {
+    return "YarnSubmissionFromCS{" +
+        "driverFolder=" + driverFolder +
+        ", jobId='" + jobId + '\'' +
+        ", driverMemory=" + driverMemory +
+        ", tcpBeginPort=" + tcpBeginPort +
+        ", tcpRangeCount=" + tcpRangeCount +
+        ", tcpTryCount=" + tcpTryCount +
+        ", maxApplicationSubmissions=" + maxApplicationSubmissions +
+        ", driverRecoveryTimeout=" + driverRecoveryTimeout +
+        ", priority=" + priority +
+        ", queue='" + queue + '\'' +
+        '}';
+  }
+
+  /**
+   * Produces the YARN Runtime Configuration based on the parameters passed 
from C#.
+   *
+   * @return the YARN Runtime Configuration based on the parameters passed 
from C#.
+   */
+  Configuration getRuntimeConfiguration() {
+    final Configuration providerConfig = 
Tang.Factory.getTang().newConfigurationBuilder()
+        .bindSetEntry(DriverConfigurationProviders.class, 
TcpPortConfigurationProvider.class)
+        .bindNamedParameter(TcpPortRangeBegin.class, 
Integer.toString(tcpBeginPort))
+        .bindNamedParameter(TcpPortRangeCount.class, 
Integer.toString(tcpRangeCount))
+        .bindNamedParameter(TcpPortRangeTryCount.class, 
Integer.toString(tcpTryCount))
+        .build();
+
+    final List<String> driverLaunchCommandPrefixList = new ArrayList<>();
+    driverLaunchCommandPrefixList.add(new 
REEFFileNames().getDriverLauncherExeFile().toString());
+
+    final Configuration yarnJobSubmissionClientParamsConfig = 
Tang.Factory.getTang().newConfigurationBuilder()
+        
.bindNamedParameter(SubmissionDriverRestartEvaluatorRecoverySeconds.class,
+            Integer.toString(driverRecoveryTimeout))
+        .bindNamedParameter(MaxApplicationSubmissions.class, 
Integer.toString(maxApplicationSubmissions))
+        .bindList(DriverLaunchCommandPrefix.class, 
driverLaunchCommandPrefixList)
+        .build();
+
+    return Configurations.merge(YarnClientConfiguration.CONF.build(), 
providerConfig,
+        yarnJobSubmissionClientParamsConfig);
+  }
+
+  /**
+   * @return The local folder where the driver is staged.
+   */
+  File getDriverFolder() {
+    return driverFolder;
+  }
+
+  /**
+   * @return the id of the job to be submitted.
+   */
+  String getJobId() {
+    return jobId;
+  }
+
+  /**
+   * @return the amount of memory to allocate for the Driver, in MB.
+   */
+  int getDriverMemory() {
+    return driverMemory;
+  }
+
+  /**
+   * @return The priority of the job submission
+   */
+  int getPriority() {
+    return priority;
+  }
+
+  /**
+   * @return The queue the driver will be submitted to.
+   */
+  String getQueue() {
+    return queue;
+  }
+
+  /**
+   * Takes 5 parameters from the C# side:
+   * [0]: String. Driver folder.
+   * [1]: String. Driver identifier.
+   * [2]: int. Driver memory.
+   * [3~5]: int. TCP configurations.
+   * [6]: int. Max application submissions.
+   * [7]: int. Evaluator recovery timeout for driver restart. > 0 => restart 
is enabled.
+   */
+  static YarnSubmissionFromCS fromCommandLine(final String[] args) {
+    final File driverFolder = new File(args[0]);
+    final String jobId = args[1];
+    final int driverMemory = Integer.valueOf(args[2]);
+    final int tcpBeginPort = Integer.valueOf(args[3]);
+    final int tcpRangeCount = Integer.valueOf(args[4]);
+    final int tcpTryCount = Integer.valueOf(args[5]);
+    final int maxApplicationSubmissions = Integer.valueOf(args[6]);
+    final int driverRecoveryTimeout = Integer.valueOf(args[7]);
+    // Static for now
+    final int priority = 1;
+    final String queue = "default";
+    return new YarnSubmissionFromCS(driverFolder, jobId, driverMemory, 
tcpBeginPort, tcpRangeCount, tcpTryCount,
+        maxApplicationSubmissions, driverRecoveryTimeout, priority, queue);
+  }
+}

Reply via email to