Repository: incubator-reef
Updated Branches:
  refs/heads/master 20ee916d8 -> 4f5bb42c5


[REEF-490] Expose configuration to preserve containers across application on 
YARN

This addressed the issue by
  * Adding ResourceManagerPreserveEvaluators to 
DriverRuntimeRestartConfiguration.
  * Adding MaxApplicationSubmissions to DriverConfiguration.
  * Adding PreserveEvaluators and MaxApplicationSubmissions to
    JobSubmissionEvent.
  * Adding command line arguments from .NET to support MaxApplicationSubmissions
    and ResourceManagerPreserveEvaluators.

JIRA:
  [REEF-490](https://issues.apache.org/jira/browse/REEF-490)

Pull Request:
  This closes #347


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4f5bb42c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4f5bb42c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4f5bb42c

Branch: refs/heads/master
Commit: 4f5bb42c511d43c63644e6962982175023fc2b2f
Parents: 20ee916
Author: Andrew Chung <[email protected]>
Authored: Thu Aug 6 15:45:59 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 10 16:04:57 2015 -0700

----------------------------------------------------------------------
 .../ClrClient2JavaClientCuratedParameters.cs    |  39 +++++-
 .../Org.Apache.REEF.Client/YARN/YARNClient.cs   |   4 +-
 .../Bridge/DriverBridgeConfigurationOptions.cs  |   5 +
 .../DriverConfiguration.cs                      |   8 ++
 .../bridge/client/YarnJobSubmissionClient.java  | 120 ++++++++++++++-----
 .../apache/reef/client/DriverConfiguration.java |   8 +-
 .../parameters/MaxApplicationSubmissions.java   |  32 +++++
 .../ResourceManagerPreserveEvaluators.java      |  32 +++++
 .../common/client/JobSubmissionHelper.java      |   6 +-
 .../common/client/api/JobSubmissionEvent.java   |  11 ++
 .../client/api/JobSubmissionEventImpl.java      |  32 +++++
 .../DriverRuntimeRestartConfiguration.java      |   5 +
 .../yarn/client/YarnJobSubmissionHandler.java   |  18 ++-
 .../yarn/client/YarnSubmissionHelper.java       |  68 ++++++++---
 .../reef/runtime/yarn/util/YarnTypes.java       |   3 +
 15 files changed, 338 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
 
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
index e40ab59..473e223 100644
--- 
a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
+++ 
b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
@@ -16,17 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
+using System;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Wake.Remote.Parameters;
 
 namespace Org.Apache.REEF.Client.Common
 {
+    /// <summary>
+    /// Curated parameters for CLR to Java. Passes a set of command line 
parameters to YarnJobSubmissionClient on
+    /// the Java side. The command line parameters should be strictly ordered.
+    /// Note that the EnableRestart parameter will only be true if the user 
ever binds a DriverRestartedHandler.
+    /// </summary>
     internal class ClrClient2JavaClientCuratedParameters
     {
         public int TcpPortRangeStart { get; private set; }
         public int TcpPortRangeCount { get; private set; }
         public int TcpPortRangeTryCount { get; private set; }
         public int TcpPortRangeSeed { get; private set; }
+        public int MaxApplicationSubmissions { get; private set; }
+        public bool EnableRestart { get; private set; }
 
 
         [Inject]
@@ -34,12 +45,38 @@ namespace Org.Apache.REEF.Client.Common
             [Parameter(typeof(TcpPortRangeStart))] int tcpPortRangeStart,
             [Parameter(typeof(TcpPortRangeCount))] int tcpPortRangeCount,
             [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
-            [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed)
+            [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed,
+            
[Parameter(typeof(DriverBridgeConfigurationOptions.MaxApplicationSubmissions))] 
int maxApplicationSubmissions,
+            
[Parameter(typeof(DriverBridgeConfigurationOptions.DriverRestartedHandler))] 
IObserver<IDriverRestarted> restartHandler)
+            : this(tcpPortRangeStart, tcpPortRangeCount, tcpPortRangeTryCount, 
tcpPortRangeSeed, maxApplicationSubmissions, true)
+        {
+        }
+
+        [Inject]
+        private ClrClient2JavaClientCuratedParameters(
+            [Parameter(typeof(TcpPortRangeStart))] int tcpPortRangeStart,
+            [Parameter(typeof(TcpPortRangeCount))] int tcpPortRangeCount,
+            [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
+            [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed,
+            
[Parameter(typeof(DriverBridgeConfigurationOptions.MaxApplicationSubmissions))] 
int maxApplicationSubmissions)
+            : this(tcpPortRangeStart, tcpPortRangeCount, tcpPortRangeTryCount, 
tcpPortRangeSeed, maxApplicationSubmissions, false)
+        {
+        }
+
+        private ClrClient2JavaClientCuratedParameters(
+            int tcpPortRangeStart,
+            int tcpPortRangeCount,
+            int tcpPortRangeTryCount,
+            int tcpPortRangeSeed,
+            int maxApplicationSubmissions,
+            bool enableRestart)
         {
             TcpPortRangeStart = tcpPortRangeStart;
             TcpPortRangeCount = tcpPortRangeCount;
             TcpPortRangeTryCount = tcpPortRangeTryCount;
             TcpPortRangeSeed = tcpPortRangeSeed;
+            MaxApplicationSubmissions = maxApplicationSubmissions;
+            EnableRestart = enableRestart;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
index b6fcc0c..b415ccb 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
@@ -67,7 +67,9 @@ namespace Org.Apache.REEF.Client.YARN
                 jobSubmission.DriverMemory.ToString(),
                 javaParams.TcpPortRangeStart.ToString(),
                 javaParams.TcpPortRangeCount.ToString(),
-                javaParams.TcpPortRangeTryCount.ToString()
+                javaParams.TcpPortRangeTryCount.ToString(),
+                javaParams.MaxApplicationSubmissions.ToString(),
+                javaParams.EnableRestart.ToString()
                 );
             Logger.Log(Level.Info, "Submitted the Driver for execution.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
index f62f69b..9935a89 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
@@ -138,6 +138,11 @@ namespace Org.Apache.REEF.Driver.Bridge
         {
         }
 
+        [NamedParameter("The number of times an application should be 
submitted in case of failure.", "MaxApplicationSubmissions", "1")]
+        public class MaxApplicationSubmissions : Name<int>
+        {
+        }
+
         [NamedParameter("Command Line Arguments supplied by client", 
"CommandLineArguments", null)]
         public class ArgumentSets : Name<ISet<string>>
         {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs 
b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
index c72a9da..70c4ce1 100644
--- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
@@ -158,6 +158,12 @@ namespace Org.Apache.REEF.Driver
             new OptionalParameter<TraceListener>();
 
         /// <summary>
+        /// The number of times the application should be submitted in case of 
failures
+        /// </summary>
+        public static readonly OptionalParameter<int> 
MaxApplicationSubmissions =
+            new OptionalParameter<int>();
+
+        /// <summary>
         /// The implemenation for (attempting to) re-establish connection to 
driver
         /// </summary>
         public static readonly OptionalImpl<IDriverConnection> 
OnDriverReconnect = new OptionalImpl<IDriverConnection>();
@@ -203,6 +209,8 @@ namespace Org.Apache.REEF.Driver
                     
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
                         OnDriverRestartTaskRunning)
                     
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
 CustomTraceLevel)
+                    
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.MaxApplicationSubmissions>.Class,
+                        MaxApplicationSubmissions)
                     .Build()
                     // TODO: Move this up
                     .Set(OnDriverStarted, 
GenericType<ClassHierarchyGeneratingDriverStartObserver>.Class);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index e707549..de8638b 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
 import org.apache.reef.client.DriverRestartConfiguration;
+import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
+import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
 import org.apache.reef.io.TcpPortConfigurationProvider;
 import org.apache.reef.javabridge.generic.JobDriver;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
@@ -35,6 +37,9 @@ import 
org.apache.reef.runtime.yarn.client.uploader.JobUploader;
 import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
 import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
 import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
 import org.apache.reef.util.JARFileMaker;
@@ -61,21 +66,31 @@ public final class YarnJobSubmissionClient {
   private final REEFFileNames fileNames;
   private final YarnConfiguration yarnConfiguration;
   private final ClasspathProvider classpath;
+  private final int maxApplicationSubmissions;
+  private final boolean enableRestart;
 
   @Inject
   YarnJobSubmissionClient(final JobUploader uploader,
                           final YarnConfiguration yarnConfiguration,
                           final ConfigurationSerializer 
configurationSerializer,
                           final REEFFileNames fileNames,
-                          final ClasspathProvider classpath) {
+                          final ClasspathProvider classpath,
+                          @Parameter(MaxApplicationSubmissions.class)
+                          final int maxApplicationSubmissions,
+                          @Parameter(EnableRestart.class)
+                          final boolean enableRestart) {
     this.uploader = uploader;
     this.configurationSerializer = configurationSerializer;
     this.fileNames = fileNames;
     this.yarnConfiguration = yarnConfiguration;
     this.classpath = classpath;
+    this.maxApplicationSubmissions = maxApplicationSubmissions;
+    this.enableRestart = enableRestart;
   }
 
-  private void addYarnDriverConfiguration(final File driverFolder, final 
String jobId, final String jobSubmissionFolder)
+  private Configuration addYarnDriverConfiguration(final File driverFolder,
+                                                   final String jobId,
+                                                   final String 
jobSubmissionFolder)
       throws IOException {
     final File driverConfigurationFile = new File(driverFolder, 
this.fileNames.getDriverConfigurationPath());
     final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
@@ -85,28 +100,32 @@ public final class YarnJobSubmissionClient {
         .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
         .build();
 
-    final Configuration yarnDriverRestartConfiguration =
-        YarnDriverRestartConfiguration.CONF
-            .build();
-
-    final Configuration driverRestartConfiguration =
-        DriverRestartConfiguration.CONF
-            .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, 
JobDriver.RestartHandler.class)
-            .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
-                JobDriver.DriverRestartActiveContextHandler.class)
-            .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
-                JobDriver.DriverRestartRunningTaskHandler.class)
-            .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
-                JobDriver.DriverRestartCompletedHandler.class)
-            .build();
-
-    final Configuration driverConfiguration = Configurations.merge(
+    Configuration driverConfiguration = Configurations.merge(
         Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
-        yarnDriverConfiguration,
-        yarnDriverRestartConfiguration,
-        driverRestartConfiguration);
+        yarnDriverConfiguration);
+
+    if (this.enableRestart) {
+      final Configuration yarnDriverRestartConfiguration =
+          YarnDriverRestartConfiguration.CONF
+              .build();
+
+      final Configuration driverRestartConfiguration =
+          DriverRestartConfiguration.CONF
+              .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, 
JobDriver.RestartHandler.class)
+              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
+                  JobDriver.DriverRestartActiveContextHandler.class)
+              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
+                  JobDriver.DriverRestartRunningTaskHandler.class)
+              .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
+                  JobDriver.DriverRestartCompletedHandler.class)
+              .build();
+
+      driverConfiguration = Configurations.merge(
+          driverConfiguration, yarnDriverRestartConfiguration, 
driverRestartConfiguration);
+    }
 
     this.configurationSerializer.toFile(driverConfiguration, 
driverConfigurationFile);
+    return driverConfiguration;
   }
 
   /**
@@ -154,7 +173,8 @@ public final class YarnJobSubmissionClient {
       // 
------------------------------------------------------------------------
       // Prepare the JAR
       final JobFolder jobFolderOnDFS = 
this.uploader.createJobFolder(submissionHelper.getApplicationId());
-      this.addYarnDriverConfiguration(driverFolder, jobId, 
jobFolderOnDFS.getPath().toString());
+      final Configuration jobSubmissionConfiguration =
+          this.addYarnDriverConfiguration(driverFolder, jobId, 
jobFolderOnDFS.getPath().toString());
       final File jarFile = makeJar(driverFolder);
       LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
 
@@ -165,22 +185,31 @@ public final class YarnJobSubmissionClient {
       final LocalResource jarFileOnDFS = 
jobFolderOnDFS.uploadAsLocalResource(jarFile);
       LOG.info("Uploaded job submission JAR");
 
+      final Injector jobParamsInjector  = 
Tang.Factory.getTang().newInjector(jobSubmissionConfiguration);
 
       // 
------------------------------------------------------------------------
       // Submit
-      submissionHelper
-          .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
-          .setApplicationName(jobId)
-          .setDriverMemory(driverMemory)
-          .setPriority(priority)
-          .setQueue(queue)
-          .submit(ClientRemoteIdentifier.NONE);
+      try {
+        submissionHelper
+            .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
+            .setApplicationName(jobId)
+            .setDriverMemory(driverMemory)
+            .setPriority(priority)
+            .setQueue(queue)
+            .setMaxApplicationAttempts(this.maxApplicationSubmissions)
+            
.setPreserveEvaluators(jobParamsInjector.getNamedInstance(ResourceManagerPreserveEvaluators.class))
+            .submit();
+      } catch (InjectionException ie) {
+        throw new RuntimeException("Unable to submit job due to " + ie);
+      }
     }
   }
 
   private static Configuration getRuntimeConfiguration(final int tcpBeginPort,
                                                        final int tcpRangeCount,
-                                                       final int tcpTryCount) {
+                                                       final int tcpTryCount,
+                                                       final boolean 
enableRestart,
+                                                       final int 
maxApplicationSubmissions) {
     final Configuration yarnClientConfig = YarnClientConfiguration.CONF
         .build();
 
@@ -191,9 +220,23 @@ public final class YarnJobSubmissionClient {
         .bindNamedParameter(TcpPortRangeTryCount.class, 
Integer.toString(tcpTryCount))
         .build();
 
-    return Configurations.merge(yarnClientConfig, providerConfig);
+    final Configuration yarnJobSubmissionClientParamsConfig = 
Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(EnableRestart.class, 
Boolean.toString(enableRestart))
+        .bindNamedParameter(MaxApplicationSubmissions.class, 
Integer.toString(maxApplicationSubmissions))
+        .build();
+
+    return Configurations.merge(yarnClientConfig, providerConfig, 
yarnJobSubmissionClientParamsConfig);
   }
 
+  /**
+   * Takes 5 parameters from the C# side:
+   * [0]: String. Driver folder.
+   * [1]: String. Driver identifier.
+   * [2]: int. Driver memory.
+   * [3~5]: int. TCP configurations.
+   * [6]: int. Max application submissions.
+   * [7]: boolean. Enable restart.
+   */
   public static void main(final String[] args) throws InjectionException, 
IOException, YarnException {
     final File driverFolder = new File(args[0]);
     final String jobId = args[1];
@@ -201,11 +244,15 @@ public final class YarnJobSubmissionClient {
     final int tcpBeginPort = Integer.valueOf(args[3]);
     final int tcpRangeCount = Integer.valueOf(args[4]);
     final int tcpTryCount = Integer.valueOf(args[5]);
+    final int maxApplicationSubmissions = Integer.valueOf(args[6]);
+    final boolean enableRestart = Boolean.valueOf(args[7]);
+
     // Static for now
     final int priority = 1;
     final String queue = "default";
 
-    final Configuration yarnConfiguration = 
getRuntimeConfiguration(tcpBeginPort, tcpRangeCount, tcpTryCount);
+    final Configuration yarnConfiguration = getRuntimeConfiguration(
+        tcpBeginPort, tcpRangeCount, tcpTryCount, enableRestart, 
maxApplicationSubmissions);
     final YarnJobSubmissionClient client = Tang.Factory.getTang()
         .newInjector(yarnConfiguration)
         .getInstance(YarnJobSubmissionClient.class);
@@ -213,3 +260,12 @@ public final class YarnJobSubmissionClient {
     client.launch(driverFolder, jobId, priority, queue, driverMemory);
   }
 }
+
+/**
+ * Whether the resource manager should enable restart. Only used by C# job 
submission.
+ */
+@NamedParameter(doc = "Whether the job driver should enable restart", 
default_value = "false")
+final class EnableRestart implements Name<Boolean> {
+  private EnableRestart() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
index a1ae9c4..49e55b9 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
@@ -177,17 +177,23 @@ public final class DriverConfiguration extends 
ConfigurationModuleBuilder {
   public static final OptionalImpl<EventHandler<ContextMessage>> 
ON_CONTEXT_MESSAGE = new OptionalImpl<>();
 
   /**
-   * "Number of threads allocated per evaluator to dispatch events from this 
Evaluator.
+   * Number of threads allocated per evaluator to dispatch events from this 
Evaluator.
    */
   public static final OptionalParameter<Integer> EVALUATOR_DISPATCHER_THREADS 
= new OptionalParameter<>();
 
   /**
+   * The number of submissions that the resource manager will attempt to 
submit the application. Defaults to 1.
+   */
+  public static final OptionalParameter<Integer> MAX_APPLICATION_SUBMISSIONS = 
new OptionalParameter<>();
+
+  /**
    * ConfigurationModule to fill out to get a legal Driver Configuration.
    */
   public static final ConfigurationModule CONF = new 
DriverConfiguration().merge(DriverRuntimeConfiguration.CONF)
       .bindNamedParameter(DriverIdentifier.class, DRIVER_IDENTIFIER)
       .bindNamedParameter(DriverMemory.class, DRIVER_MEMORY)
       .bindNamedParameter(DriverJobSubmissionDirectory.class, 
DRIVER_JOB_SUBMISSION_DIRECTORY)
+      .bindNamedParameter(MaxApplicationSubmissions.class, 
MAX_APPLICATION_SUBMISSIONS)
       .bindSetEntry(JobGlobalFiles.class, GLOBAL_FILES)
       .bindSetEntry(JobGlobalLibraries.class, GLOBAL_LIBRARIES)
       .bindSetEntry(DriverLocalFiles.class, LOCAL_FILES)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/MaxApplicationSubmissions.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/MaxApplicationSubmissions.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/MaxApplicationSubmissions.java
new file mode 100644
index 0000000..38fa033
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/MaxApplicationSubmissions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Whether the resource manager should preserve evaluators on job driver 
failure.
+ */
+@NamedParameter(doc = "The number of times the resource manager should attempt 
to submit the application.",
+    default_value = "1")
+public final class MaxApplicationSubmissions implements Name<Integer> {
+  private MaxApplicationSubmissions() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ResourceManagerPreserveEvaluators.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ResourceManagerPreserveEvaluators.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ResourceManagerPreserveEvaluators.java
new file mode 100644
index 0000000..40ada88
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ResourceManagerPreserveEvaluators.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Whether the resource manager should preserve evaluators on job driver 
failure.
+ */
+@NamedParameter(doc = "Whether the resource manager should preserve 
evaluators" +
+    " on job driver failure.", default_value = "false")
+public final class ResourceManagerPreserveEvaluators implements Name<Boolean> {
+  private ResourceManagerPreserveEvaluators() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
index 0b6e354..a974ed7 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java
@@ -77,13 +77,17 @@ final class JobSubmissionHelper {
       throws InjectionException, IOException {
     final Injector injector = 
Tang.Factory.getTang().newInjector(driverConfiguration);
 
+    final boolean preserveEvaluators = 
injector.getNamedInstance(ResourceManagerPreserveEvaluators.class);
+    final int maxAppSubmissions = 
injector.getNamedInstance(MaxApplicationSubmissions.class);
+
     final JobSubmissionEventImpl.Builder jbuilder = 
JobSubmissionEventImpl.newBuilder()
         
.setIdentifier(returnOrGenerateDriverId(injector.getNamedInstance(DriverIdentifier.class)))
         .setDriverMemory(injector.getNamedInstance(DriverMemory.class))
         .setUserName(System.getProperty("user.name"))
+        .setPreserveEvaluators(preserveEvaluators)
+        .setMaxApplicationSubmissions(maxAppSubmissions)
         .setConfiguration(driverConfiguration);
 
-
     for (final String globalFileName : 
injector.getNamedInstance(JobGlobalFiles.class)) {
       LOG.log(Level.FINEST, "Adding global file: {0}", globalFileName);
       jbuilder.addGlobalFile(getFileResourceProto(globalFileName, 
FileType.PLAIN));

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
index 7bf3123..094ad2c 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
@@ -75,6 +75,17 @@ public interface JobSubmissionEvent {
   Optional<Integer> getPriority();
 
   /**
+   * @return True if evaluators are to be preserved across driver failures.
+   */
+  Optional<Boolean> getPreserveEvaluators();
+
+  /**
+   * Returns the number of time that the driver should be started by the 
resource manager
+   * if it fails unexpectedly.
+   */
+  Optional<Integer> getMaxApplicationSubmissions();
+
+  /**
    * @return Queue to submit the Job to
    * @deprecated in 0.12. Use 
org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead.
    */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
index 3a6e9dd..4ac0a36 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
@@ -40,6 +40,8 @@ public final class JobSubmissionEventImpl implements 
JobSubmissionEvent {
   private final Optional<Integer> driverMemory;
   private final Optional<Integer> priority;
   private final Optional<String> queue;
+  private final Optional<Boolean> preserveEvaluators;
+  private final Optional<Integer> maxApplicationSubmissions;
 
   private JobSubmissionEventImpl(final Builder builder) {
     this.identifier = BuilderUtils.notNull(builder.identifier);
@@ -50,7 +52,9 @@ public final class JobSubmissionEventImpl implements 
JobSubmissionEvent {
     this.localFileSet = BuilderUtils.notNull(builder.localFileSet);
     this.driverMemory = Optional.ofNullable(builder.driverMemory);
     this.priority = Optional.ofNullable(builder.priority);
+    this.preserveEvaluators = Optional.ofNullable(builder.preserveEvaluators);
     this.queue = Optional.ofNullable(builder.queue);
+    this.maxApplicationSubmissions = 
Optional.ofNullable(builder.maxApplicationSubmissions);
   }
 
   @Override
@@ -94,6 +98,16 @@ public final class JobSubmissionEventImpl implements 
JobSubmissionEvent {
   }
 
   @Override
+  public Optional<Boolean> getPreserveEvaluators() {
+    return preserveEvaluators;
+  }
+
+  @Override
+  public Optional<Integer> getMaxApplicationSubmissions() {
+    return maxApplicationSubmissions;
+  }
+
+  @Override
   public Optional<String> getQueue() {
     return queue;
   }
@@ -115,6 +129,8 @@ public final class JobSubmissionEventImpl implements 
JobSubmissionEvent {
     private Integer driverMemory;
     private Integer priority;
     private String queue;
+    private Boolean preserveEvaluators;
+    private Integer maxApplicationSubmissions;
 
     /**
      * @see JobSubmissionEvent#getIdentifier()
@@ -183,6 +199,22 @@ public final class JobSubmissionEventImpl implements 
JobSubmissionEvent {
     }
 
     /**
+     * @see JobSubmissionEvent#getPreserveEvaluators()
+     */
+    public Builder setPreserveEvaluators(final Boolean preserveEvaluators) {
+      this.preserveEvaluators = preserveEvaluators;
+      return this;
+    }
+
+    /**
+     * @see JobSubmissionEvent#getMaxApplicationSubmissions()
+     */
+    public Builder setMaxApplicationSubmissions(final Integer 
maxApplicationSubmissions) {
+      this.maxApplicationSubmissions = maxApplicationSubmissions;
+      return this;
+    }
+
+    /**
      * @see JobSubmissionEvent#getQueue()
      * @deprecated in 0.12. Use 
org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead.
      */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index 695ac8a..cbac9ea 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.common.driver;
 
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators;
 import org.apache.reef.driver.parameters.ServiceEvaluatorAllocatedHandlers;
 import org.apache.reef.driver.parameters.ServiceEvaluatorCompletedHandlers;
 import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers;
@@ -38,6 +39,10 @@ public final class DriverRuntimeRestartConfiguration extends 
ConfigurationModule
   }
 
   public static final ConfigurationModule CONF = new 
DriverRuntimeRestartConfiguration()
+
+      // Automatically sets preserve evaluators to true.
+      .bindNamedParameter(ResourceManagerPreserveEvaluators.class, 
Boolean.toString(true))
+
       .bindImplementation(DriverRestartManager.class, 
DriverRestartManagerImpl.class)
       .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, 
EvaluatorPreservingEvaluatorAllocatedHandler.class)
       .bindSetEntry(ServiceEvaluatorFailedHandlers.class, 
EvaluatorPreservingEvaluatorFailedHandler.class)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index eda421f..ba35f41 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -109,7 +109,9 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
           .setDriverMemory(jobSubmissionEvent.getDriverMemory().get())
           .setPriority(getPriority(jobSubmissionEvent))
           .setQueue(getQueue(jobSubmissionEvent))
-          .submit(jobSubmissionEvent.getRemoteId());
+          .setPreserveEvaluators(getPreserveEvaluators(jobSubmissionEvent))
+          
.setMaxApplicationAttempts(getMaxApplicationSubmissions(jobSubmissionEvent))
+          .submit();
 
       LOG.log(Level.FINEST, "Submitted job with ID [{0}]", 
jobSubmissionEvent.getIdentifier());
     } catch (final YarnException | IOException e) {
@@ -145,6 +147,20 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
   }
 
   /**
+   * Extract the information on whether or not the job should preserve 
evaluators across job driver restarts.
+   */
+  private Boolean getPreserveEvaluators(final JobSubmissionEvent 
jobSubmissionEvent) {
+    return jobSubmissionEvent.getPreserveEvaluators().orElse(false);
+  }
+
+  /**
+   * Extract the number of maximum application attempts on the job.
+   */
+  private Integer getMaxApplicationSubmissions(final JobSubmissionEvent 
jobSubmissionEvent) {
+    return jobSubmissionEvent.getMaxApplicationSubmissions().orElse(1);
+  }
+
+  /**
    * Extracts the queue name from the driverConfiguration or return default if 
none is set.
    *
    * @param driverConfiguration

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index b5e932c..ca6a04f 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -53,7 +53,8 @@ public final class YarnSubmissionHelper implements Closeable{
   private final Map<String, LocalResource> resources = new HashMap<>();
   private final REEFFileNames fileNames;
   private final ClasspathProvider classpath;
-
+  private boolean preserveEvaluators;
+  private int maxAppSubmissions;
 
   public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
                               final REEFFileNames fileNames,
@@ -72,6 +73,8 @@ public final class YarnSubmissionHelper implements Closeable{
     this.applicationResponse = 
yarnClientApplication.getNewApplicationResponse();
     this.applicationSubmissionContext = 
yarnClientApplication.getApplicationSubmissionContext();
     this.applicationId = applicationSubmissionContext.getApplicationId();
+    this.maxAppSubmissions = 1;
+    this.preserveEvaluators = false;
     LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
   }
 
@@ -125,6 +128,47 @@ public final class YarnSubmissionHelper implements 
Closeable{
   }
 
   /**
+   * Set whether or not the resource manager should preserve evaluators across 
driver restarts.
+   * @param preserveEvaluators
+   * @return
+   */
+  public YarnSubmissionHelper setPreserveEvaluators(final boolean 
preserveEvaluators) {
+    if (preserveEvaluators) {
+      // when supported, set KeepContainersAcrossApplicationAttempts to be true
+      // so that when driver (AM) crashes, evaluators will still be running 
and we can recover later.
+      if 
(YarnTypes.isAtOrAfterVersion(YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE)) 
{
+        LOG.log(
+            Level.FINE,
+            "Hadoop version is {0} or after with 
KeepContainersAcrossApplicationAttempts supported," +
+                " will set it to true.",
+            YarnTypes.MIN_VERSION_KEEP_CONTAINERS_AVAILABLE);
+
+        
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
+      } else {
+        LOG.log(Level.WARNING,
+            "Hadoop version does not yet support 
KeepContainersAcrossApplicationAttempts. Driver restarts " +
+                "will not support recovering evaluators.");
+
+        
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false);
+      }
+    } else {
+      
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(false);
+    }
+
+    return this;
+  }
+
+  /**
+   * Sets the maximum application attempts for the application.
+   * @param maxApplicationAttempts
+   * @return
+   */
+  public YarnSubmissionHelper setMaxApplicationAttempts(final int 
maxApplicationAttempts) {
+    applicationSubmissionContext.setMaxAppAttempts(maxApplicationAttempts);
+    return this;
+  }
+
+  /**
    * Assign this job submission to a queue.
    * @param queueName
    * @return
@@ -134,7 +178,7 @@ public final class YarnSubmissionHelper implements 
Closeable{
     return this;
   }
 
-  public void submit(final String clientRemoteIdentifier) throws IOException, 
YarnException {
+  public void submit() throws IOException, YarnException {
     // SET EXEC COMMAND
     final List<String> launchCommand = new JavaLaunchCommandBuilder()
         .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
@@ -144,6 +188,12 @@ public final class YarnSubmissionHelper implements 
Closeable{
         .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + 
this.fileNames.getDriverStderrFileName())
         .build();
 
+    if 
(this.applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts() 
&&
+        this.applicationSubmissionContext.getMaxAppAttempts() == 1) {
+      LOG.log(Level.WARNING, "Application will not be restarted even though 
preserve evaluators is set to true" +
+          " since the max application submissions is 1. Proceeding to submit 
application...");
+    }
+
     
this.applicationSubmissionContext.setAMContainerSpec(YarnTypes.getContainerLaunchContext(launchCommand,
         this.resources));
 
@@ -153,20 +203,6 @@ public final class YarnSubmissionHelper implements 
Closeable{
       LOG.log(Level.FINEST, "REEF app command: {0}", 
StringUtils.join(launchCommand, ' '));
     }
 
-    // TODO: this is currently being developed on a hacked 2.4.0 bits, should 
be 2.4.1
-    final String minVersionKeepContainerOptionAvailable = "2.4.0";
-
-    // when supported, set KeepContainersAcrossApplicationAttempts to be true
-    // so that when driver (AM) crashes, evaluators will still be running and 
we can recover later.
-    if (YarnTypes.isAtOrAfterVersion(minVersionKeepContainerOptionAvailable)) {
-      LOG.log(
-          Level.FINE,
-          "Hadoop version is {0} or after with 
KeepContainersAcrossApplicationAttempts supported, will set it to true.",
-          minVersionKeepContainerOptionAvailable);
-
-      
applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
-    }
-
     this.yarnClient.submitApplication(applicationSubmissionContext);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4f5bb42c/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
index 72e058e..48347be 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
@@ -33,6 +33,9 @@ import java.util.Map;
 @Private
 public final class YarnTypes {
 
+  // TODO[REEF-537]: Remove once the hadoop version is updated.
+  public static final String MIN_VERSION_KEEP_CONTAINERS_AVAILABLE = "2.4.0";
+
   private YarnTypes() {
   }
 


Reply via email to