Repository: reef
Updated Branches:
  refs/heads/master d9c06e882 -> c63219ff2


[REEF-1194] Extensibility point for Driver configuration for 
JobSubmissionHandlers

This addressed the issue by:
  * Adding DriverConfigurationProvider to reef-common
  * Adding default impelmentations for the interface to different runtimes
  * Adding Extensible configuration to runtimes that can be used to bind
    different driver configuration provider

JIRA:
  [REEF-1194](https://issues.apache.org/jira/browse/REEF-1194)

Pull Request:
  This closes #833


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c63219ff
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c63219ff
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c63219ff

Branch: refs/heads/master
Commit: c63219ff2006b65815f53e0b74b73a71c616c3c1
Parents: d9c06e8
Author: Boris Shulman <[email protected]>
Authored: Mon Feb 8 13:26:28 2016 -0800
Committer: Markus Weimer <[email protected]>
Committed: Tue Feb 16 14:19:57 2016 -0800

----------------------------------------------------------------------
 ...ocalRuntimeDriverConfigurationGenerator.java |  6 +-
 .../client/DriverConfigurationProvider.java     | 42 +++++++++
 .../runtime/hdinsight/client/AzureUploader.java | 14 ++-
 ...DInsightDriverConfigurationProviderImpl.java | 61 ++++++++++++
 .../client/HDInsightJobSubmissionHandler.java   | 29 +++---
 .../HDInsightRuntimeConfigurationStatic.java    |  2 +
 ...safeHDInsightRuntimeConfigurationStatic.java |  2 +
 .../client/DriverConfigurationProvider.java     | 84 -----------------
 .../ExtensibleLocalRuntimeConfiguration.java    | 98 ++++++++++++++++++++
 .../LocalDriverConfigurationProviderImpl.java   | 85 +++++++++++++++++
 .../local/client/LocalJobSubmissionHandler.java |  6 +-
 .../local/client/LocalRuntimeConfiguration.java |  9 +-
 .../mesos/client/MesosClientConfiguration.java  |  2 +
 .../MesosDriverConfigurationProviderImpl.java   | 62 +++++++++++++
 .../mesos/client/MesosJobSubmissionHandler.java | 42 ++++-----
 .../ExtensibleYarnClientConfiguration.java      | 73 +++++++++++++++
 .../yarn/client/YarnClientConfiguration.java    | 26 +++---
 .../YarnDriverConfigurationProviderImpl.java    | 57 ++++++++++++
 .../yarn/client/YarnJobSubmissionHandler.java   | 40 ++++----
 19 files changed, 562 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
index b63e20b..ac405af 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
@@ -19,9 +19,9 @@
 package org.apache.reef.bridge.client;
 
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.local.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.local.client.PreparedDriverFolderLauncher;
 import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.tang.*;
@@ -62,7 +62,7 @@ final class LocalRuntimeDriverConfigurationGenerator {
    * Writes driver configuration to disk.
    * @param jobFolder The folder in which the job is staged.
    * @param jobId id of the job to be submitted
-   * @param clientRemoteId
+   * @param clientRemoteId The client remote id
    * @return the configuration
    * @throws IOException
    */
@@ -72,7 +72,7 @@ final class LocalRuntimeDriverConfigurationGenerator {
     final File driverFolder = new File(jobFolder, 
PreparedDriverFolderLauncher.DRIVER_FOLDER_NAME);
 
     final Configuration driverConfiguration1 = driverConfigurationProvider
-        .getDriverConfiguration(jobFolder, clientRemoteId,
+        .getDriverConfiguration(jobFolder.toURI(), clientRemoteId,
         jobId, Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER);
     final ConfigurationBuilder configurationBuilder = 
Tang.Factory.getTang().newConfigurationBuilder();
     for (final ConfigurationProvider configurationProvider : 
this.configurationProviders) {

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/DriverConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/DriverConfigurationProvider.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/DriverConfigurationProvider.java
new file mode 100644
index 0000000..d764461
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/DriverConfigurationProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.tang.Configuration;
+
+import java.net.URI;
+
+/**
+ * A contract for driver configuration provider.
+ */
+public interface DriverConfigurationProvider {
+  /**
+   * Generates driver configuration.
+   * @param jobFolder The job folder.
+   * @param clientRemoteId the client remote id.
+   * @param jobId The job id.
+   * @param applicationConfiguration The application configuration.
+   * @return Generated driver configuration.
+   */
+  Configuration getDriverConfiguration(
+          final URI jobFolder,
+          final String clientRemoteId,
+          final String jobId,
+          final Configuration applicationConfiguration);
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
index 97ef3ea..ba5c233 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
@@ -88,13 +88,13 @@ final class AzureUploader {
   }
 
   @SuppressWarnings("checkstyle:hiddenfield")
-  public String createJobFolder(final String applicationID) throws IOException 
{
+  public URI createJobFolder(final String applicationID) throws IOException {
     try {
       this.applicationID = applicationID;
       this.jobFolderName = assembleJobFolderName(applicationID);
       // Make the directory entry for the job
       final CloudBlockBlob jobFolderBlob = 
this.container.getBlockBlobReference(this.jobFolderName);
-      final String jobFolderURL = getFileSystemURL(jobFolderBlob);
+      final URI jobFolderURL = getFileSystemURL(jobFolderBlob);
       return jobFolderURL;
     } catch (final StorageException | URISyntaxException e) {
       throw new IOException("Unable to create job Folder", e);
@@ -129,7 +129,7 @@ final class AzureUploader {
           .setVisibility(LocalResource.VISIBILITY_APPLICATION)
           .setSize(blobProperties.getLength())
           .setTimestamp(blobProperties.getLastModified().getTime())
-          .setResource(getFileSystemURL(jobJarBlob));
+          .setResource(getFileSystemURL(jobJarBlob).toString());
 
     } catch (final URISyntaxException | StorageException e) {
       throw new IOException(e);
@@ -140,10 +140,14 @@ final class AzureUploader {
    * @param blob
    * @return a HDFS URL for the blob
    */
-  private String getFileSystemURL(final CloudBlockBlob blob) {
+  private URI getFileSystemURL(final CloudBlockBlob blob) {
     final URI primaryURI = blob.getStorageUri().getPrimaryUri();
     final String path = 
primaryURI.getPath().replace(this.azureStorageContainerName + "/", "");
-    return "wasb://" + this.azureStorageContainerName + "@" + 
primaryURI.getHost() + path;
+    try {
+      return new URI("wasb://" + this.azureStorageContainerName + "@" + 
primaryURI.getHost() + path);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Invalid URI constructed ", e);
+    }
   }
 
   private String assembleJobFolderName(final String jobApplicationID) {

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java
new file mode 100644
index 0000000..89658a3
--- /dev/null
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.hdinsight.client;
+
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+import java.net.URI;
+
+import static org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.*;
+
+/**
+ * Default driver configuration provider for HDInsight.
+ */
+final class HDInsightDriverConfigurationProviderImpl implements 
DriverConfigurationProvider {
+  private final double jvmSlack;
+
+  @Inject
+  HDInsightDriverConfigurationProviderImpl(@Parameter(JVMHeapSlack.class) 
final double jvmSlack) {
+    this.jvmSlack = jvmSlack;
+  }
+
+  @Override
+  public Configuration getDriverConfiguration(final URI jobFolder,
+                                              final String clientRemoteId,
+                                              final String jobId,
+                                              final Configuration 
applicationConfiguration) {
+
+    final Configuration hdinsightDriverConfiguration = 
HDInsightDriverConfiguration.CONF
+            .set(HDInsightDriverConfiguration.JOB_IDENTIFIER, jobId)
+            .set(HDInsightDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
clientRemoteId)
+            .set(HDInsightDriverConfiguration.JOB_SUBMISSION_DIRECTORY, 
jobFolder.toString())
+            .set(HDInsightDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+            .build();
+
+    return Configurations.merge(
+            applicationConfiguration,
+            hdinsightDriverConfiguration);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
index 8fd0594..7c70a5a 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
@@ -22,22 +22,21 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.JobJarMaker;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
-import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
 import org.apache.reef.runtime.hdinsight.client.yarnrest.*;
 import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.net.URI;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -56,7 +55,7 @@ public final class HDInsightJobSubmissionHandler implements 
JobSubmissionHandler
   private final HDInsightInstance hdInsightInstance;
   private final REEFFileNames filenames;
   private final ClasspathProvider classpath;
-  private final double jvmHeapSlack;
+  private final DriverConfigurationProvider driverConfigurationProvider;
 
   @Inject
   HDInsightJobSubmissionHandler(final AzureUploader uploader,
@@ -64,13 +63,13 @@ public final class HDInsightJobSubmissionHandler implements 
JobSubmissionHandler
                                 final HDInsightInstance hdInsightInstance,
                                 final REEFFileNames filenames,
                                 final ClasspathProvider classpath,
-                                @Parameter(JVMHeapSlack.class) final double 
jvmHeapSlack) {
+                                final DriverConfigurationProvider 
driverConfigurationProvider) {
     this.uploader = uploader;
     this.jobJarMaker = jobJarMaker;
     this.hdInsightInstance = hdInsightInstance;
     this.filenames = filenames;
     this.classpath = classpath;
-    this.jvmHeapSlack = jvmHeapSlack;
+    this.driverConfigurationProvider = driverConfigurationProvider;
   }
 
   @Override
@@ -89,7 +88,7 @@ public final class HDInsightJobSubmissionHandler implements 
JobSubmissionHandler
       LOG.log(Level.INFO, "Submitting application {0} to YARN.", 
applicationID.getApplicationId());
 
       LOG.log(Level.FINE, "Creating a job folder on Azure.");
-      final String jobFolderURL = 
this.uploader.createJobFolder(applicationID.getApplicationId());
+      final URI jobFolderURL = 
this.uploader.createJobFolder(applicationID.getApplicationId());
 
       LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
       final Configuration driverConfiguration =
@@ -161,17 +160,11 @@ public final class HDInsightJobSubmissionHandler 
implements JobSubmissionHandler
   private Configuration makeDriverConfiguration(
       final JobSubmissionEvent jobSubmissionEvent,
       final String applicationId,
-      final String jobFolderURL) throws IOException {
+      final URI jobFolderURL) throws IOException {
 
-    final Configuration hdinsightDriverConfiguration = 
HDInsightDriverConfiguration.CONF
-        .set(HDInsightDriverConfiguration.JOB_IDENTIFIER, applicationId)
-        .set(HDInsightDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
jobSubmissionEvent.getRemoteId())
-        .set(HDInsightDriverConfiguration.JOB_SUBMISSION_DIRECTORY, 
jobFolderURL)
-        .set(HDInsightDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack)
-        .build();
-
-    return Configurations.merge(
-        jobSubmissionEvent.getConfiguration(),
-        hdinsightDriverConfiguration);
+    return 
this.driverConfigurationProvider.getDriverConfiguration(jobFolderURL,
+                                                            
jobSubmissionEvent.getRemoteId(),
+                                                            applicationId,
+                                                            
jobSubmissionEvent.getConfiguration());
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
index fece8e9..dabe995 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.hdinsight.client;
 
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.RuntimePathProvider;
 import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
@@ -41,6 +42,7 @@ public final class HDInsightRuntimeConfigurationStatic 
extends ConfigurationModu
   public static final ConfigurationModule CONF = new 
HDInsightRuntimeConfigurationStatic()
       .merge(CommonRuntimeConfiguration.CONF)
       .bindImplementation(JobSubmissionHandler.class, 
HDInsightJobSubmissionHandler.class)
+      .bindImplementation(DriverConfigurationProvider.class, 
HDInsightDriverConfigurationProviderImpl.class)
       .bindConstructor(CloseableHttpClient.class, 
DefaultClientConstructor.class)
       .bindImplementation(RuntimeClasspathProvider.class, 
HDInsightClasspathProvider.class)
       .bindImplementation(RuntimePathProvider.class, 
HDInsightJVMPathProvider.class)

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
index 65795ba..fc62f20 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
@@ -21,6 +21,7 @@ package org.apache.reef.runtime.hdinsight.client;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.reef.client.REEF;
 import org.apache.reef.client.RunningJob;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.REEFImplementation;
 import org.apache.reef.runtime.common.client.RunningJobImpl;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
@@ -46,6 +47,7 @@ public final class UnsafeHDInsightRuntimeConfigurationStatic 
extends Configurati
       .bindImplementation(RunningJob.class, RunningJobImpl.class)
       .bindNamedParameter(RemoteConfiguration.MessageCodec.class, 
REEFMessageCodec.class)
       .bindImplementation(JobSubmissionHandler.class, 
HDInsightJobSubmissionHandler.class)
+      .bindImplementation(DriverConfigurationProvider.class, 
HDInsightDriverConfigurationProviderImpl.class)
       .bindConstructor(CloseableHttpClient.class, 
UnsafeClientConstructor.class)
       .bindImplementation(RuntimeClasspathProvider.class, 
HDInsightClasspathProvider.class)
       .build();

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverConfigurationProvider.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverConfigurationProvider.java
deleted file mode 100644
index a2c2fd9..0000000
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverConfigurationProvider.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.local.client;
-
-import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
-import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
-import org.apache.reef.runtime.local.client.parameters.RackNames;
-import org.apache.reef.runtime.local.driver.LocalDriverConfiguration;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.formats.ConfigurationModule;
-
-import javax.inject.Inject;
-
-import java.io.File;
-import java.util.Set;
-
-/**
- * Helper class that assembles the driver configuration when run on the local 
runtime.
- */
-public final class DriverConfigurationProvider {
-
-  private final int maxEvaluators;
-  private final double jvmHeapSlack;
-  private final Set<String> rackNames;
-
-  @Inject
-  DriverConfigurationProvider(@Parameter(MaxNumberOfEvaluators.class) final 
int maxEvaluators,
-                              @Parameter(JVMHeapSlack.class) final double 
jvmHeapSlack,
-                              @Parameter(RackNames.class) final Set<String> 
rackNames) {
-    this.maxEvaluators = maxEvaluators;
-    this.jvmHeapSlack = jvmHeapSlack;
-    this.rackNames = rackNames;
-  }
-
-  private Configuration getDriverConfiguration(final File jobFolder,
-                                               final String clientRemoteId,
-                                               final String jobId) {
-    ConfigurationModule configModule = LocalDriverConfiguration.CONF
-        .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, 
this.maxEvaluators)
-        .set(LocalDriverConfiguration.ROOT_FOLDER, jobFolder.getAbsolutePath())
-        .set(LocalDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack)
-        .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
-        .set(LocalDriverConfiguration.JOB_IDENTIFIER, jobId);
-    for (final String rackName : rackNames) {
-      configModule = configModule.set(LocalDriverConfiguration.RACK_NAMES, 
rackName);
-    }
-    return configModule.build();
-  }
-
-  /**
-   * Assembles the driver configuration.
-   *
-   * @param jobFolder                The folder in which the local runtime 
will execute this job.
-   * @param clientRemoteId           the remote identifier of the client. It 
is used by the Driver to establish a
-   *                                 connection back to the client.
-   * @param jobId                    The identifier of the job.
-   * @param applicationConfiguration The configuration of the application, 
e.g. a filled out DriverConfiguration
-   * @return The Driver configuration to be used to instantiate the Driver.
-   */
-  public Configuration getDriverConfiguration(final File jobFolder,
-                                              final String clientRemoteId,
-                                              final String jobId,
-                                              final Configuration 
applicationConfiguration) {
-    return Configurations.merge(getDriverConfiguration(jobFolder, 
clientRemoteId, jobId), applicationConfiguration);
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/ExtensibleLocalRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/ExtensibleLocalRuntimeConfiguration.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/ExtensibleLocalRuntimeConfiguration.java
new file mode 100644
index 0000000..c8a3f45
--- /dev/null
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/ExtensibleLocalRuntimeConfiguration.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.client;
+
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.local.LocalClasspathProvider;
+import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
+import org.apache.reef.runtime.local.client.parameters.RackNames;
+import org.apache.reef.runtime.local.client.parameters.RootFolder;
+import org.apache.reef.tang.ConfigurationProvider;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.wake.time.Clock;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A ConfigurationModule to configure the local resourcemanager with 
extensibility point.
+ */
+public final class ExtensibleLocalRuntimeConfiguration extends 
ConfigurationModuleBuilder {
+
+  /**
+   * The number of threads or processes available to the resourcemanager. This 
is the upper limit on the number of
+   * Evaluators that the local resourcemanager will hand out concurrently.
+   * This simulates the size of a physical cluster in terms of the number of 
slots available on it
+   * with one important caveat: The Driver is not counted against this number.
+   */
+  public static final OptionalParameter<Integer> MAX_NUMBER_OF_EVALUATORS = 
new OptionalParameter<>();
+  /**
+   * The folder in which the sub-folders, one per Node, will be created. Those 
will contain one folder per
+   * Evaluator instantiated on the virtual node. Those inner folders will be 
named by the time when the Evaluator was
+   * launched.
+   * <p>
+   * If none is given, a folder "REEF_LOCAL_RUNTIME" will be created in the 
local directory.
+   */
+  public static final OptionalParameter<String> RUNTIME_ROOT_FOLDER = new 
OptionalParameter<>();
+
+  /**
+   * The fraction of the container memory NOT to use for the Java Heap.
+   */
+  public static final OptionalParameter<Double> JVM_HEAP_SLACK = new 
OptionalParameter<>();
+
+  /**
+   * Configuration provides whose Configuration will be merged into all Driver 
Configuration.
+   */
+  public static final OptionalImpl<ConfigurationProvider> 
DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>();
+
+  /**
+   * The rack names that will be available in the local runtime.
+   */
+  public static final OptionalParameter<String> RACK_NAMES = new 
OptionalParameter<>();
+
+  /**
+   * Driver configuration provider for the client.
+   */
+  public static final RequiredImpl<DriverConfigurationProvider> 
DRIVER_CONFIGURATION_PROVIDER = new RequiredImpl<>();
+
+  /**
+   * The ConfigurationModule for the local resourcemanager.
+   */
+  public static final ConfigurationModule CONF = new 
ExtensibleLocalRuntimeConfiguration()
+          .merge(CommonRuntimeConfiguration.CONF)
+          // Bind the local runtime
+          .bindImplementation(JobSubmissionHandler.class, 
LocalJobSubmissionHandler.class)
+          .bindImplementation(DriverConfigurationProvider.class, 
DRIVER_CONFIGURATION_PROVIDER)
+          .bindConstructor(ExecutorService.class, 
ExecutorServiceConstructor.class)
+          .bindImplementation(RuntimeClasspathProvider.class, 
LocalClasspathProvider.class)
+          // Bind parameters of the local runtime
+          .bindNamedParameter(MaxNumberOfEvaluators.class, 
MAX_NUMBER_OF_EVALUATORS)
+          .bindNamedParameter(RootFolder.class, RUNTIME_ROOT_FOLDER)
+          .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+          .bindSetEntry(DriverConfigurationProviders.class, 
DRIVER_CONFIGURATION_PROVIDERS)
+          .bindSetEntry(Clock.StartHandler.class, PIDStoreStartHandler.class)
+          .bindSetEntry(RackNames.class, RACK_NAMES)
+          .build();
+}
+

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java
new file mode 100644
index 0000000..4001cfe
--- /dev/null
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.client;
+
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
+import org.apache.reef.runtime.local.client.parameters.RackNames;
+import org.apache.reef.runtime.local.driver.LocalDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationModule;
+
+import javax.inject.Inject;
+
+import java.net.URI;
+import java.util.Set;
+
+/**
+ * Helper class that assembles the driver configuration when run on the local 
runtime.
+ */
+final class LocalDriverConfigurationProviderImpl implements 
DriverConfigurationProvider {
+
+  private final int maxEvaluators;
+  private final double jvmHeapSlack;
+  private final Set<String> rackNames;
+
+  @Inject
+  LocalDriverConfigurationProviderImpl(@Parameter(MaxNumberOfEvaluators.class) 
final int maxEvaluators,
+                                       @Parameter(JVMHeapSlack.class) final 
double jvmHeapSlack,
+                                       @Parameter(RackNames.class) final 
Set<String> rackNames) {
+    this.maxEvaluators = maxEvaluators;
+    this.jvmHeapSlack = jvmHeapSlack;
+    this.rackNames = rackNames;
+  }
+
+  private Configuration getDriverConfiguration(final URI jobFolder,
+                                               final String clientRemoteId,
+                                               final String jobId) {
+    ConfigurationModule configModule = LocalDriverConfiguration.CONF
+        .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, 
this.maxEvaluators)
+        .set(LocalDriverConfiguration.ROOT_FOLDER, jobFolder.getPath())
+        .set(LocalDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack)
+        .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
+        .set(LocalDriverConfiguration.JOB_IDENTIFIER, jobId);
+    for (final String rackName : rackNames) {
+      configModule = configModule.set(LocalDriverConfiguration.RACK_NAMES, 
rackName);
+    }
+    return configModule.build();
+  }
+
+  /**
+   * Assembles the driver configuration.
+   *
+   * @param jobFolder                The folder in which the local runtime 
will execute this job.
+   * @param clientRemoteId           the remote identifier of the client. It 
is used by the Driver to establish a
+   *                                 connection back to the client.
+   * @param jobId                    The identifier of the job.
+   * @param applicationConfiguration The configuration of the application, 
e.g. a filled out DriverConfiguration
+   * @return The Driver configuration to be used to instantiate the Driver.
+   */
+  public Configuration getDriverConfiguration(final URI jobFolder,
+                                              final String clientRemoteId,
+                                              final String jobId,
+                                              final Configuration 
applicationConfiguration) {
+    return Configurations.merge(getDriverConfiguration(jobFolder, 
clientRemoteId, jobId), applicationConfiguration);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
index f3531c7..ecdc17f 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.local.client;
 
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.REEFFileNames;
@@ -99,7 +100,10 @@ final class LocalJobSubmissionHandler implements 
JobSubmissionHandler {
         driverFiles.copyTo(driverFolder);
 
         final Configuration driverConfiguration = 
this.driverConfigurationProvider
-            .getDriverConfiguration(jobFolder, t.getRemoteId(), 
t.getIdentifier(), t.getConfiguration());
+            .getDriverConfiguration(jobFolder.toURI(),
+                                    t.getRemoteId(),
+                                    t.getIdentifier(),
+                                    t.getConfiguration());
 
         this.configurationSerializer.toFile(driverConfiguration,
             new File(driverFolder, 
this.fileNames.getDriverConfigurationPath()));

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
index aa045f9..c8282b9 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.local.client;
 
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
 import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
 import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
@@ -29,10 +30,7 @@ import 
org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
 import org.apache.reef.runtime.local.client.parameters.RackNames;
 import org.apache.reef.runtime.local.client.parameters.RootFolder;
 import org.apache.reef.tang.ConfigurationProvider;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.OptionalImpl;
-import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.*;
 import org.apache.reef.wake.time.Clock;
 
 import java.util.concurrent.ExecutorService;
@@ -73,7 +71,6 @@ public class LocalRuntimeConfiguration extends 
ConfigurationModuleBuilder {
    */
   public static final OptionalParameter<String> RACK_NAMES = new 
OptionalParameter<>();
 
-
   /**
    * The ConfigurationModule for the local resourcemanager.
    */
@@ -81,6 +78,7 @@ public class LocalRuntimeConfiguration extends 
ConfigurationModuleBuilder {
       .merge(CommonRuntimeConfiguration.CONF)
           // Bind the local runtime
       .bindImplementation(JobSubmissionHandler.class, 
LocalJobSubmissionHandler.class)
+      .bindImplementation(DriverConfigurationProvider.class, 
LocalDriverConfigurationProviderImpl.class)
       .bindConstructor(ExecutorService.class, ExecutorServiceConstructor.class)
       .bindImplementation(RuntimeClasspathProvider.class, 
LocalClasspathProvider.class)
           // Bind parameters of the local runtime
@@ -94,3 +92,4 @@ public class LocalRuntimeConfiguration extends 
ConfigurationModuleBuilder {
 
 
 }
+

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
index d3468d4..179604d 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
 import org.apache.reef.runtime.mesos.MesosClasspathProvider;
@@ -53,6 +54,7 @@ public class MesosClientConfiguration extends 
ConfigurationModuleBuilder {
   public static final ConfigurationModule CONF = new MesosClientConfiguration()
       .merge(CommonRuntimeConfiguration.CONF)
       .bindImplementation(JobSubmissionHandler.class, 
MesosJobSubmissionHandler.class)
+      .bindImplementation(DriverConfigurationProvider.class, 
MesosDriverConfigurationProviderImpl.class)
       .bindNamedParameter(RootFolder.class, ROOT_FOLDER)
       .bindNamedParameter(MasterIp.class, MASTER_IP)
       .bindConstructor(Configuration.class, HDFSConfigurationConstructor.class)

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosDriverConfigurationProviderImpl.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosDriverConfigurationProviderImpl.java
new file mode 100644
index 0000000..648c7d0
--- /dev/null
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosDriverConfigurationProviderImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.driver.MesosDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.net.URI;
+
+/**
+ * Default driver configuration provider for Mesos.
+ */
+final class MesosDriverConfigurationProviderImpl implements 
DriverConfigurationProvider {
+
+  private final String masterIp;
+  private final double jvmSlack;
+
+  @Inject
+  MesosDriverConfigurationProviderImpl(@Parameter(MasterIp.class) final String 
masterIp,
+                                              @Parameter(JVMHeapSlack.class) 
final double jvmSlack) {
+    this.masterIp = masterIp;
+    this.jvmSlack = jvmSlack;
+  }
+
+  @Override
+  public Configuration getDriverConfiguration(final URI jobFolder,
+                                              final String clientRemoteId,
+                                              final String jobId,
+                                              final Configuration 
applicationConfiguration) {
+    return Configurations.merge(MesosDriverConfiguration.CONF
+                    .set(MesosDriverConfiguration.MESOS_MASTER_IP, 
this.masterIp)
+                    .set(MesosDriverConfiguration.JOB_IDENTIFIER, jobId)
+                    .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
clientRemoteId)
+                    .set(MesosDriverConfiguration.JVM_HEAP_SLACK, 
this.jvmSlack)
+                    .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1)
+                    // must be 1 as there is 1 scheduler at the same time
+                    .build(),
+            applicationConfiguration);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
index 880d30c..f84e082 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -20,18 +20,15 @@ package org.apache.reef.runtime.mesos.client;
 
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.FileResource;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
-import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
-import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
 import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
-import org.apache.reef.runtime.mesos.driver.MesosDriverConfiguration;
 import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
 
@@ -60,22 +57,19 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
   private final ClasspathProvider classpath;
   private final REEFFileNames fileNames;
   private final String rootFolderName;
-  private final String masterIp;
-  private final double jvmSlack;
+  private final DriverConfigurationProvider driverConfigurationProvider;
 
   @Inject
   MesosJobSubmissionHandler(@Parameter(RootFolder.class) final String 
rootFolderName,
-                            @Parameter(MasterIp.class) final String masterIp,
                             final ConfigurationSerializer 
configurationSerializer,
                             final REEFFileNames fileNames,
                             final ClasspathProvider classpath,
-                            @Parameter(JVMHeapSlack.class) final double 
jvmSlack) {
+                            final DriverConfigurationProvider 
driverConfigurationProvider) {
     this.rootFolderName = new File(rootFolderName).getAbsolutePath();
-    this.masterIp = masterIp;
     this.configurationSerializer = configurationSerializer;
     this.fileNames = fileNames;
     this.classpath = classpath;
-    this.jvmSlack = jvmSlack;
+    this.driverConfigurationProvider = driverConfigurationProvider;
   }
 
   @Override
@@ -86,7 +80,7 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
   public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
     try {
       final File jobFolder = new File(new File(this.rootFolderName),
-          "/" + jobSubmissionEvent.getIdentifier() + "-" + 
System.currentTimeMillis() + "/");
+              "/" + jobSubmissionEvent.getIdentifier() + "-" + 
System.currentTimeMillis() + "/");
 
       final File driverFolder = new File(jobFolder, DRIVER_FOLDER_NAME);
       if (!driverFolder.exists() && !driverFolder.mkdirs()) {
@@ -120,16 +114,12 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
         Files.copy(src, dst, 
java.nio.file.StandardCopyOption.REPLACE_EXISTING);
       }
 
-      final Configuration driverConfiguration =
-          Configurations.merge(MesosDriverConfiguration.CONF
-              .set(MesosDriverConfiguration.MESOS_MASTER_IP, this.masterIp)
-              .set(MesosDriverConfiguration.JOB_IDENTIFIER, 
jobSubmissionEvent.getIdentifier())
-              .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
jobSubmissionEvent.getRemoteId())
-              .set(MesosDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
-              .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1)
-              // must be 1 as there is 1 scheduler at the same time
-              .build(),
-          jobSubmissionEvent.getConfiguration());
+      final Configuration driverConfiguration = 
this.driverConfigurationProvider.getDriverConfiguration(
+              null,
+              jobSubmissionEvent.getRemoteId(),
+              jobSubmissionEvent.getIdentifier(),
+              jobSubmissionEvent.getConfiguration());
+
       final File runtimeConfigurationFile = new File(driverFolder, 
this.fileNames.getDriverConfigurationPath());
       this.configurationSerializer.toFile(driverConfiguration, 
runtimeConfigurationFile);
 
@@ -143,11 +133,11 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
       final File outFile = new File(driverFolder, 
fileNames.getDriverStdoutFileName());
 
       new ProcessBuilder()
-          .command(launchCommand)
-          .directory(driverFolder)
-          .redirectError(errFile)
-          .redirectOutput(outFile)
-          .start();
+              .command(launchCommand)
+              .directory(driverFolder)
+              .redirectError(errFile)
+              .redirectOutput(outFile)
+              .start();
     } catch (final IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/ExtensibleYarnClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/ExtensibleYarnClientConfiguration.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/ExtensibleYarnClientConfiguration.java
new file mode 100644
index 0000000..516c600
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/ExtensibleYarnClientConfiguration.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.client.parameters.JobPriority;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.ConfigurationProvider;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.util.logging.LoggingSetup;
+
+/**
+ * An extensible  ConfigurationModule for the YARN resourcemanager.
+ */
+public final class ExtensibleYarnClientConfiguration extends 
ConfigurationModuleBuilder {
+  static {
+    LoggingSetup.setupCommonsLogging();
+  }
+
+  public static final OptionalParameter<String> YARN_QUEUE_NAME = new 
OptionalParameter<>();
+  public static final OptionalParameter<Integer> YARN_PRIORITY = new 
OptionalParameter<>();
+
+  public static final OptionalParameter<Double> JVM_HEAP_SLACK = new 
OptionalParameter<>();
+
+  /**
+   * Configuration provides whose Configuration will be merged into all Driver 
Configuration.
+   */
+  public static final OptionalImpl<ConfigurationProvider> 
DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>();
+
+  /**
+   * Driver configuration provider for the client.
+   */
+  public static final RequiredImpl<DriverConfigurationProvider> 
DRIVER_CONFIGURATION_PROVIDER = new RequiredImpl<>();
+
+  public static final ConfigurationModule CONF = new 
ExtensibleYarnClientConfiguration()
+      .merge(CommonRuntimeConfiguration.CONF)
+          // Bind YARN
+      .bindImplementation(JobSubmissionHandler.class, 
YarnJobSubmissionHandler.class)
+      .bindImplementation(DriverConfigurationProvider.class, 
DRIVER_CONFIGURATION_PROVIDER)
+          // Bind the parameters given by the user
+      .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
+      .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
+      .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+      .bindImplementation(RuntimeClasspathProvider.class, 
YarnClasspathProvider.class)
+          // Bind external constructors. Taken from  
YarnExternalConstructors.registerClientConstructors
+      .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, 
YarnConfigurationConstructor.class)
+      .bindSetEntry(DriverConfigurationProviders.class, 
DRIVER_CONFIGURATION_PROVIDERS)
+      .build();
+
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
index 8731aa4..33fb177 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
 import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
 import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
@@ -30,10 +31,7 @@ import 
org.apache.reef.runtime.yarn.client.parameters.JobPriority;
 import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
 import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
 import org.apache.reef.tang.ConfigurationProvider;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.OptionalImpl;
-import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.*;
 import org.apache.reef.util.logging.LoggingSetup;
 
 /**
@@ -57,17 +55,17 @@ public class YarnClientConfiguration extends 
ConfigurationModuleBuilder {
   public static final OptionalImpl<ConfigurationProvider> 
DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>();
 
   public static final ConfigurationModule CONF = new YarnClientConfiguration()
-      .merge(CommonRuntimeConfiguration.CONF)
+          .merge(CommonRuntimeConfiguration.CONF)
           // Bind YARN
-      .bindImplementation(JobSubmissionHandler.class, 
YarnJobSubmissionHandler.class)
+          .bindImplementation(JobSubmissionHandler.class, 
YarnJobSubmissionHandler.class)
+          .bindImplementation(DriverConfigurationProvider.class, 
YarnDriverConfigurationProviderImpl.class)
           // Bind the parameters given by the user
-      .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
-      .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
-      .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
-      .bindImplementation(RuntimeClasspathProvider.class, 
YarnClasspathProvider.class)
+          .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
+          .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
+          .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+          .bindImplementation(RuntimeClasspathProvider.class, 
YarnClasspathProvider.class)
           // Bind external constructors. Taken from  
YarnExternalConstructors.registerClientConstructors
-      .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, 
YarnConfigurationConstructor.class)
-      .bindSetEntry(DriverConfigurationProviders.class, 
DRIVER_CONFIGURATION_PROVIDERS)
-      .build();
-
+          
.bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, 
YarnConfigurationConstructor.class)
+          .bindSetEntry(DriverConfigurationProviders.class, 
DRIVER_CONFIGURATION_PROVIDERS)
+          .build();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
new file mode 100644
index 0000000..f7aabb8
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.net.URI;
+
+import static org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.*;
+
+/**
+ * Default driver configuration provider for yarn runtime.
+ */
+final class YarnDriverConfigurationProviderImpl implements 
DriverConfigurationProvider {
+  private final double jvmSlack;
+
+  @Inject
+  YarnDriverConfigurationProviderImpl(@Parameter(JVMHeapSlack.class) final 
double jvmSlack) {
+    this.jvmSlack = jvmSlack;
+  }
+
+  @Override
+  public Configuration getDriverConfiguration(final URI jobFolder,
+                                              final String clientRemoteId,
+                                              final String jobId,
+                                              final Configuration 
applicationConfiguration) {
+    return Configurations.merge(
+      org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.CONF
+              .set(JOB_SUBMISSION_DIRECTORY, jobFolder.toString())
+              .set(JOB_IDENTIFIER, jobId)
+              .set(CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
+              .set(JVM_HEAP_SLACK, this.jvmSlack)
+              .build(),
+              applicationConfiguration);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index faf8e1f..e2c6389 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -26,18 +26,16 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.JobJarMaker;
 import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
 import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
 import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
 import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
-import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
 import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
@@ -60,29 +58,29 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
   private final REEFFileNames fileNames;
   private final ClasspathProvider classpath;
   private final JobUploader uploader;
-  private final double jvmSlack;
   private final String defaultQueueName;
   private final SecurityTokenProvider tokenProvider;
+  private final DriverConfigurationProvider driverConfigurationProvider;
 
   @Inject
   YarnJobSubmissionHandler(
-      final YarnConfiguration yarnConfiguration,
-      final JobJarMaker jobJarMaker,
-      final REEFFileNames fileNames,
-      final ClasspathProvider classpath,
-      final JobUploader uploader,
-      @Parameter(JVMHeapSlack.class) final double jvmSlack,
-      @Parameter(JobQueue.class) final String defaultQueueName,
-      final SecurityTokenProvider tokenProvider) throws IOException {
+          final YarnConfiguration yarnConfiguration,
+          final JobJarMaker jobJarMaker,
+          final REEFFileNames fileNames,
+          final ClasspathProvider classpath,
+          final JobUploader uploader,
+          @Parameter(JobQueue.class) final String defaultQueueName,
+          final SecurityTokenProvider tokenProvider,
+          final DriverConfigurationProvider driverConfigurationProvider) 
throws IOException {
 
     this.yarnConfiguration = yarnConfiguration;
     this.jobJarMaker = jobJarMaker;
     this.fileNames = fileNames;
     this.classpath = classpath;
     this.uploader = uploader;
-    this.jvmSlack = jvmSlack;
     this.defaultQueueName = defaultQueueName;
     this.tokenProvider = tokenProvider;
+    this.driverConfigurationProvider = driverConfigurationProvider;
   }
 
   @Override
@@ -130,14 +128,12 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
   private Configuration makeDriverConfiguration(
       final JobSubmissionEvent jobSubmissionEvent,
       final Path jobFolderPath) throws IOException {
-    return Configurations.merge(
-        YarnDriverConfiguration.CONF
-            .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, 
jobFolderPath.toString())
-            .set(YarnDriverConfiguration.JOB_IDENTIFIER, 
jobSubmissionEvent.getIdentifier())
-            .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
jobSubmissionEvent.getRemoteId())
-            .set(YarnDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
-            .build(),
-        jobSubmissionEvent.getConfiguration());
+
+    return this.driverConfigurationProvider.getDriverConfiguration(
+            jobFolderPath.toUri(),
+            jobSubmissionEvent.getRemoteId(),
+            jobSubmissionEvent.getIdentifier(),
+            jobSubmissionEvent.getConfiguration());
   }
 
   private static int getPriority(final JobSubmissionEvent jobSubmissionEvent) {
@@ -168,7 +164,7 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
   /**
    * Extracts the queue name from the driverConfiguration or return default if 
none is set.
    *
-   * @param driverConfiguration
+   * @param driverConfiguration The drievr configuration
    * @return the queue name from the driverConfiguration or return default if 
none is set.
    */
   private String getQueue(final Configuration driverConfiguration) {

Reply via email to