Repository: reef
Updated Branches:
refs/heads/master d9c06e882 -> c63219ff2
[REEF-1194] Extensibility point for Driver configuration for
JobSubmissionHandlers
This addressed the issue by:
* Adding DriverConfigurationProvider to reef-common
* Adding default impelmentations for the interface to different runtimes
* Adding Extensible configuration to runtimes that can be used to bind
different driver configuration provider
JIRA:
[REEF-1194](https://issues.apache.org/jira/browse/REEF-1194)
Pull Request:
This closes #833
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c63219ff
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c63219ff
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c63219ff
Branch: refs/heads/master
Commit: c63219ff2006b65815f53e0b74b73a71c616c3c1
Parents: d9c06e8
Author: Boris Shulman <[email protected]>
Authored: Mon Feb 8 13:26:28 2016 -0800
Committer: Markus Weimer <[email protected]>
Committed: Tue Feb 16 14:19:57 2016 -0800
----------------------------------------------------------------------
...ocalRuntimeDriverConfigurationGenerator.java | 6 +-
.../client/DriverConfigurationProvider.java | 42 +++++++++
.../runtime/hdinsight/client/AzureUploader.java | 14 ++-
...DInsightDriverConfigurationProviderImpl.java | 61 ++++++++++++
.../client/HDInsightJobSubmissionHandler.java | 29 +++---
.../HDInsightRuntimeConfigurationStatic.java | 2 +
...safeHDInsightRuntimeConfigurationStatic.java | 2 +
.../client/DriverConfigurationProvider.java | 84 -----------------
.../ExtensibleLocalRuntimeConfiguration.java | 98 ++++++++++++++++++++
.../LocalDriverConfigurationProviderImpl.java | 85 +++++++++++++++++
.../local/client/LocalJobSubmissionHandler.java | 6 +-
.../local/client/LocalRuntimeConfiguration.java | 9 +-
.../mesos/client/MesosClientConfiguration.java | 2 +
.../MesosDriverConfigurationProviderImpl.java | 62 +++++++++++++
.../mesos/client/MesosJobSubmissionHandler.java | 42 ++++-----
.../ExtensibleYarnClientConfiguration.java | 73 +++++++++++++++
.../yarn/client/YarnClientConfiguration.java | 26 +++---
.../YarnDriverConfigurationProviderImpl.java | 57 ++++++++++++
.../yarn/client/YarnJobSubmissionHandler.java | 40 ++++----
19 files changed, 562 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
index b63e20b..ac405af 100644
---
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
+++
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
@@ -19,9 +19,9 @@
package org.apache.reef.bridge.client;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.local.client.DriverConfigurationProvider;
import org.apache.reef.runtime.local.client.PreparedDriverFolderLauncher;
import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
import org.apache.reef.tang.*;
@@ -62,7 +62,7 @@ final class LocalRuntimeDriverConfigurationGenerator {
* Writes driver configuration to disk.
* @param jobFolder The folder in which the job is staged.
* @param jobId id of the job to be submitted
- * @param clientRemoteId
+ * @param clientRemoteId The client remote id
* @return the configuration
* @throws IOException
*/
@@ -72,7 +72,7 @@ final class LocalRuntimeDriverConfigurationGenerator {
final File driverFolder = new File(jobFolder,
PreparedDriverFolderLauncher.DRIVER_FOLDER_NAME);
final Configuration driverConfiguration1 = driverConfigurationProvider
- .getDriverConfiguration(jobFolder, clientRemoteId,
+ .getDriverConfiguration(jobFolder.toURI(), clientRemoteId,
jobId, Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER);
final ConfigurationBuilder configurationBuilder =
Tang.Factory.getTang().newConfigurationBuilder();
for (final ConfigurationProvider configurationProvider :
this.configurationProviders) {
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/DriverConfigurationProvider.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/DriverConfigurationProvider.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/DriverConfigurationProvider.java
new file mode 100644
index 0000000..d764461
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/DriverConfigurationProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.client;
+
+import org.apache.reef.tang.Configuration;
+
+import java.net.URI;
+
+/**
+ * A contract for driver configuration provider.
+ */
+public interface DriverConfigurationProvider {
+ /**
+ * Generates driver configuration.
+ * @param jobFolder The job folder.
+ * @param clientRemoteId the client remote id.
+ * @param jobId The job id.
+ * @param applicationConfiguration The application configuration.
+ * @return Generated driver configuration.
+ */
+ Configuration getDriverConfiguration(
+ final URI jobFolder,
+ final String clientRemoteId,
+ final String jobId,
+ final Configuration applicationConfiguration);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
index 97ef3ea..ba5c233 100644
---
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
+++
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/AzureUploader.java
@@ -88,13 +88,13 @@ final class AzureUploader {
}
@SuppressWarnings("checkstyle:hiddenfield")
- public String createJobFolder(final String applicationID) throws IOException
{
+ public URI createJobFolder(final String applicationID) throws IOException {
try {
this.applicationID = applicationID;
this.jobFolderName = assembleJobFolderName(applicationID);
// Make the directory entry for the job
final CloudBlockBlob jobFolderBlob =
this.container.getBlockBlobReference(this.jobFolderName);
- final String jobFolderURL = getFileSystemURL(jobFolderBlob);
+ final URI jobFolderURL = getFileSystemURL(jobFolderBlob);
return jobFolderURL;
} catch (final StorageException | URISyntaxException e) {
throw new IOException("Unable to create job Folder", e);
@@ -129,7 +129,7 @@ final class AzureUploader {
.setVisibility(LocalResource.VISIBILITY_APPLICATION)
.setSize(blobProperties.getLength())
.setTimestamp(blobProperties.getLastModified().getTime())
- .setResource(getFileSystemURL(jobJarBlob));
+ .setResource(getFileSystemURL(jobJarBlob).toString());
} catch (final URISyntaxException | StorageException e) {
throw new IOException(e);
@@ -140,10 +140,14 @@ final class AzureUploader {
* @param blob
* @return a HDFS URL for the blob
*/
- private String getFileSystemURL(final CloudBlockBlob blob) {
+ private URI getFileSystemURL(final CloudBlockBlob blob) {
final URI primaryURI = blob.getStorageUri().getPrimaryUri();
final String path =
primaryURI.getPath().replace(this.azureStorageContainerName + "/", "");
- return "wasb://" + this.azureStorageContainerName + "@" +
primaryURI.getHost() + path;
+ try {
+ return new URI("wasb://" + this.azureStorageContainerName + "@" +
primaryURI.getHost() + path);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Invalid URI constructed ", e);
+ }
}
private String assembleJobFolderName(final String jobApplicationID) {
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java
new file mode 100644
index 0000000..89658a3
--- /dev/null
+++
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfigurationProviderImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.hdinsight.client;
+
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+import java.net.URI;
+
+import static org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.*;
+
+/**
+ * Default driver configuration provider for HDInsight.
+ */
+final class HDInsightDriverConfigurationProviderImpl implements
DriverConfigurationProvider {
+ private final double jvmSlack;
+
+ @Inject
+ HDInsightDriverConfigurationProviderImpl(@Parameter(JVMHeapSlack.class)
final double jvmSlack) {
+ this.jvmSlack = jvmSlack;
+ }
+
+ @Override
+ public Configuration getDriverConfiguration(final URI jobFolder,
+ final String clientRemoteId,
+ final String jobId,
+ final Configuration
applicationConfiguration) {
+
+ final Configuration hdinsightDriverConfiguration =
HDInsightDriverConfiguration.CONF
+ .set(HDInsightDriverConfiguration.JOB_IDENTIFIER, jobId)
+ .set(HDInsightDriverConfiguration.CLIENT_REMOTE_IDENTIFIER,
clientRemoteId)
+ .set(HDInsightDriverConfiguration.JOB_SUBMISSION_DIRECTORY,
jobFolder.toString())
+ .set(HDInsightDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+ .build();
+
+ return Configurations.merge(
+ applicationConfiguration,
+ hdinsightDriverConfiguration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
index 8fd0594..7c70a5a 100644
---
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
+++
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
@@ -22,22 +22,21 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.JobJarMaker;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
-import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
import org.apache.reef.runtime.hdinsight.client.yarnrest.*;
import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.net.URI;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -56,7 +55,7 @@ public final class HDInsightJobSubmissionHandler implements
JobSubmissionHandler
private final HDInsightInstance hdInsightInstance;
private final REEFFileNames filenames;
private final ClasspathProvider classpath;
- private final double jvmHeapSlack;
+ private final DriverConfigurationProvider driverConfigurationProvider;
@Inject
HDInsightJobSubmissionHandler(final AzureUploader uploader,
@@ -64,13 +63,13 @@ public final class HDInsightJobSubmissionHandler implements
JobSubmissionHandler
final HDInsightInstance hdInsightInstance,
final REEFFileNames filenames,
final ClasspathProvider classpath,
- @Parameter(JVMHeapSlack.class) final double
jvmHeapSlack) {
+ final DriverConfigurationProvider
driverConfigurationProvider) {
this.uploader = uploader;
this.jobJarMaker = jobJarMaker;
this.hdInsightInstance = hdInsightInstance;
this.filenames = filenames;
this.classpath = classpath;
- this.jvmHeapSlack = jvmHeapSlack;
+ this.driverConfigurationProvider = driverConfigurationProvider;
}
@Override
@@ -89,7 +88,7 @@ public final class HDInsightJobSubmissionHandler implements
JobSubmissionHandler
LOG.log(Level.INFO, "Submitting application {0} to YARN.",
applicationID.getApplicationId());
LOG.log(Level.FINE, "Creating a job folder on Azure.");
- final String jobFolderURL =
this.uploader.createJobFolder(applicationID.getApplicationId());
+ final URI jobFolderURL =
this.uploader.createJobFolder(applicationID.getApplicationId());
LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
final Configuration driverConfiguration =
@@ -161,17 +160,11 @@ public final class HDInsightJobSubmissionHandler
implements JobSubmissionHandler
private Configuration makeDriverConfiguration(
final JobSubmissionEvent jobSubmissionEvent,
final String applicationId,
- final String jobFolderURL) throws IOException {
+ final URI jobFolderURL) throws IOException {
- final Configuration hdinsightDriverConfiguration =
HDInsightDriverConfiguration.CONF
- .set(HDInsightDriverConfiguration.JOB_IDENTIFIER, applicationId)
- .set(HDInsightDriverConfiguration.CLIENT_REMOTE_IDENTIFIER,
jobSubmissionEvent.getRemoteId())
- .set(HDInsightDriverConfiguration.JOB_SUBMISSION_DIRECTORY,
jobFolderURL)
- .set(HDInsightDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack)
- .build();
-
- return Configurations.merge(
- jobSubmissionEvent.getConfiguration(),
- hdinsightDriverConfiguration);
+ return
this.driverConfigurationProvider.getDriverConfiguration(jobFolderURL,
+
jobSubmissionEvent.getRemoteId(),
+ applicationId,
+
jobSubmissionEvent.getConfiguration());
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
index fece8e9..dabe995 100644
---
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
+++
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightRuntimeConfigurationStatic.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.hdinsight.client;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.RuntimePathProvider;
import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
@@ -41,6 +42,7 @@ public final class HDInsightRuntimeConfigurationStatic
extends ConfigurationModu
public static final ConfigurationModule CONF = new
HDInsightRuntimeConfigurationStatic()
.merge(CommonRuntimeConfiguration.CONF)
.bindImplementation(JobSubmissionHandler.class,
HDInsightJobSubmissionHandler.class)
+ .bindImplementation(DriverConfigurationProvider.class,
HDInsightDriverConfigurationProviderImpl.class)
.bindConstructor(CloseableHttpClient.class,
DefaultClientConstructor.class)
.bindImplementation(RuntimeClasspathProvider.class,
HDInsightClasspathProvider.class)
.bindImplementation(RuntimePathProvider.class,
HDInsightJVMPathProvider.class)
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
index 65795ba..fc62f20 100644
---
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
+++
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/UnsafeHDInsightRuntimeConfigurationStatic.java
@@ -21,6 +21,7 @@ package org.apache.reef.runtime.hdinsight.client;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.reef.client.REEF;
import org.apache.reef.client.RunningJob;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.REEFImplementation;
import org.apache.reef.runtime.common.client.RunningJobImpl;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
@@ -46,6 +47,7 @@ public final class UnsafeHDInsightRuntimeConfigurationStatic
extends Configurati
.bindImplementation(RunningJob.class, RunningJobImpl.class)
.bindNamedParameter(RemoteConfiguration.MessageCodec.class,
REEFMessageCodec.class)
.bindImplementation(JobSubmissionHandler.class,
HDInsightJobSubmissionHandler.class)
+ .bindImplementation(DriverConfigurationProvider.class,
HDInsightDriverConfigurationProviderImpl.class)
.bindConstructor(CloseableHttpClient.class,
UnsafeClientConstructor.class)
.bindImplementation(RuntimeClasspathProvider.class,
HDInsightClasspathProvider.class)
.build();
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverConfigurationProvider.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverConfigurationProvider.java
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverConfigurationProvider.java
deleted file mode 100644
index a2c2fd9..0000000
---
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverConfigurationProvider.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.local.client;
-
-import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
-import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
-import org.apache.reef.runtime.local.client.parameters.RackNames;
-import org.apache.reef.runtime.local.driver.LocalDriverConfiguration;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.formats.ConfigurationModule;
-
-import javax.inject.Inject;
-
-import java.io.File;
-import java.util.Set;
-
-/**
- * Helper class that assembles the driver configuration when run on the local
runtime.
- */
-public final class DriverConfigurationProvider {
-
- private final int maxEvaluators;
- private final double jvmHeapSlack;
- private final Set<String> rackNames;
-
- @Inject
- DriverConfigurationProvider(@Parameter(MaxNumberOfEvaluators.class) final
int maxEvaluators,
- @Parameter(JVMHeapSlack.class) final double
jvmHeapSlack,
- @Parameter(RackNames.class) final Set<String>
rackNames) {
- this.maxEvaluators = maxEvaluators;
- this.jvmHeapSlack = jvmHeapSlack;
- this.rackNames = rackNames;
- }
-
- private Configuration getDriverConfiguration(final File jobFolder,
- final String clientRemoteId,
- final String jobId) {
- ConfigurationModule configModule = LocalDriverConfiguration.CONF
- .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS,
this.maxEvaluators)
- .set(LocalDriverConfiguration.ROOT_FOLDER, jobFolder.getAbsolutePath())
- .set(LocalDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack)
- .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
- .set(LocalDriverConfiguration.JOB_IDENTIFIER, jobId);
- for (final String rackName : rackNames) {
- configModule = configModule.set(LocalDriverConfiguration.RACK_NAMES,
rackName);
- }
- return configModule.build();
- }
-
- /**
- * Assembles the driver configuration.
- *
- * @param jobFolder The folder in which the local runtime
will execute this job.
- * @param clientRemoteId the remote identifier of the client. It
is used by the Driver to establish a
- * connection back to the client.
- * @param jobId The identifier of the job.
- * @param applicationConfiguration The configuration of the application,
e.g. a filled out DriverConfiguration
- * @return The Driver configuration to be used to instantiate the Driver.
- */
- public Configuration getDriverConfiguration(final File jobFolder,
- final String clientRemoteId,
- final String jobId,
- final Configuration
applicationConfiguration) {
- return Configurations.merge(getDriverConfiguration(jobFolder,
clientRemoteId, jobId), applicationConfiguration);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/ExtensibleLocalRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/ExtensibleLocalRuntimeConfiguration.java
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/ExtensibleLocalRuntimeConfiguration.java
new file mode 100644
index 0000000..c8a3f45
--- /dev/null
+++
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/ExtensibleLocalRuntimeConfiguration.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.client;
+
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.local.LocalClasspathProvider;
+import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
+import org.apache.reef.runtime.local.client.parameters.RackNames;
+import org.apache.reef.runtime.local.client.parameters.RootFolder;
+import org.apache.reef.tang.ConfigurationProvider;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.wake.time.Clock;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A ConfigurationModule to configure the local resourcemanager with
extensibility point.
+ */
+public final class ExtensibleLocalRuntimeConfiguration extends
ConfigurationModuleBuilder {
+
+ /**
+ * The number of threads or processes available to the resourcemanager. This
is the upper limit on the number of
+ * Evaluators that the local resourcemanager will hand out concurrently.
+ * This simulates the size of a physical cluster in terms of the number of
slots available on it
+ * with one important caveat: The Driver is not counted against this number.
+ */
+ public static final OptionalParameter<Integer> MAX_NUMBER_OF_EVALUATORS =
new OptionalParameter<>();
+ /**
+ * The folder in which the sub-folders, one per Node, will be created. Those
will contain one folder per
+ * Evaluator instantiated on the virtual node. Those inner folders will be
named by the time when the Evaluator was
+ * launched.
+ * <p>
+ * If none is given, a folder "REEF_LOCAL_RUNTIME" will be created in the
local directory.
+ */
+ public static final OptionalParameter<String> RUNTIME_ROOT_FOLDER = new
OptionalParameter<>();
+
+ /**
+ * The fraction of the container memory NOT to use for the Java Heap.
+ */
+ public static final OptionalParameter<Double> JVM_HEAP_SLACK = new
OptionalParameter<>();
+
+ /**
+ * Configuration provides whose Configuration will be merged into all Driver
Configuration.
+ */
+ public static final OptionalImpl<ConfigurationProvider>
DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>();
+
+ /**
+ * The rack names that will be available in the local runtime.
+ */
+ public static final OptionalParameter<String> RACK_NAMES = new
OptionalParameter<>();
+
+ /**
+ * Driver configuration provider for the client.
+ */
+ public static final RequiredImpl<DriverConfigurationProvider>
DRIVER_CONFIGURATION_PROVIDER = new RequiredImpl<>();
+
+ /**
+ * The ConfigurationModule for the local resourcemanager.
+ */
+ public static final ConfigurationModule CONF = new
ExtensibleLocalRuntimeConfiguration()
+ .merge(CommonRuntimeConfiguration.CONF)
+ // Bind the local runtime
+ .bindImplementation(JobSubmissionHandler.class,
LocalJobSubmissionHandler.class)
+ .bindImplementation(DriverConfigurationProvider.class,
DRIVER_CONFIGURATION_PROVIDER)
+ .bindConstructor(ExecutorService.class,
ExecutorServiceConstructor.class)
+ .bindImplementation(RuntimeClasspathProvider.class,
LocalClasspathProvider.class)
+ // Bind parameters of the local runtime
+ .bindNamedParameter(MaxNumberOfEvaluators.class,
MAX_NUMBER_OF_EVALUATORS)
+ .bindNamedParameter(RootFolder.class, RUNTIME_ROOT_FOLDER)
+ .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+ .bindSetEntry(DriverConfigurationProviders.class,
DRIVER_CONFIGURATION_PROVIDERS)
+ .bindSetEntry(Clock.StartHandler.class, PIDStoreStartHandler.class)
+ .bindSetEntry(RackNames.class, RACK_NAMES)
+ .build();
+}
+
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java
new file mode 100644
index 0000000..4001cfe
--- /dev/null
+++
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalDriverConfigurationProviderImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.client;
+
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
+import org.apache.reef.runtime.local.client.parameters.RackNames;
+import org.apache.reef.runtime.local.driver.LocalDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationModule;
+
+import javax.inject.Inject;
+
+import java.net.URI;
+import java.util.Set;
+
+/**
+ * Helper class that assembles the driver configuration when run on the local
runtime.
+ */
+final class LocalDriverConfigurationProviderImpl implements
DriverConfigurationProvider {
+
+ private final int maxEvaluators;
+ private final double jvmHeapSlack;
+ private final Set<String> rackNames;
+
+ @Inject
+ LocalDriverConfigurationProviderImpl(@Parameter(MaxNumberOfEvaluators.class)
final int maxEvaluators,
+ @Parameter(JVMHeapSlack.class) final
double jvmHeapSlack,
+ @Parameter(RackNames.class) final
Set<String> rackNames) {
+ this.maxEvaluators = maxEvaluators;
+ this.jvmHeapSlack = jvmHeapSlack;
+ this.rackNames = rackNames;
+ }
+
+ private Configuration getDriverConfiguration(final URI jobFolder,
+ final String clientRemoteId,
+ final String jobId) {
+ ConfigurationModule configModule = LocalDriverConfiguration.CONF
+ .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS,
this.maxEvaluators)
+ .set(LocalDriverConfiguration.ROOT_FOLDER, jobFolder.getPath())
+ .set(LocalDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack)
+ .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
+ .set(LocalDriverConfiguration.JOB_IDENTIFIER, jobId);
+ for (final String rackName : rackNames) {
+ configModule = configModule.set(LocalDriverConfiguration.RACK_NAMES,
rackName);
+ }
+ return configModule.build();
+ }
+
+ /**
+ * Assembles the driver configuration.
+ *
+ * @param jobFolder The folder in which the local runtime
will execute this job.
+ * @param clientRemoteId the remote identifier of the client. It
is used by the Driver to establish a
+ * connection back to the client.
+ * @param jobId The identifier of the job.
+ * @param applicationConfiguration The configuration of the application,
e.g. a filled out DriverConfiguration
+ * @return The Driver configuration to be used to instantiate the Driver.
+ */
+ public Configuration getDriverConfiguration(final URI jobFolder,
+ final String clientRemoteId,
+ final String jobId,
+ final Configuration
applicationConfiguration) {
+ return Configurations.merge(getDriverConfiguration(jobFolder,
clientRemoteId, jobId), applicationConfiguration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
index f3531c7..ecdc17f 100644
---
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
+++
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.local.client;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.REEFFileNames;
@@ -99,7 +100,10 @@ final class LocalJobSubmissionHandler implements
JobSubmissionHandler {
driverFiles.copyTo(driverFolder);
final Configuration driverConfiguration =
this.driverConfigurationProvider
- .getDriverConfiguration(jobFolder, t.getRemoteId(),
t.getIdentifier(), t.getConfiguration());
+ .getDriverConfiguration(jobFolder.toURI(),
+ t.getRemoteId(),
+ t.getIdentifier(),
+ t.getConfiguration());
this.configurationSerializer.toFile(driverConfiguration,
new File(driverFolder,
this.fileNames.getDriverConfigurationPath()));
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
index aa045f9..c8282b9 100644
---
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
+++
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.local.client;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
@@ -29,10 +30,7 @@ import
org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
import org.apache.reef.runtime.local.client.parameters.RackNames;
import org.apache.reef.runtime.local.client.parameters.RootFolder;
import org.apache.reef.tang.ConfigurationProvider;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.OptionalImpl;
-import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.*;
import org.apache.reef.wake.time.Clock;
import java.util.concurrent.ExecutorService;
@@ -73,7 +71,6 @@ public class LocalRuntimeConfiguration extends
ConfigurationModuleBuilder {
*/
public static final OptionalParameter<String> RACK_NAMES = new
OptionalParameter<>();
-
/**
* The ConfigurationModule for the local resourcemanager.
*/
@@ -81,6 +78,7 @@ public class LocalRuntimeConfiguration extends
ConfigurationModuleBuilder {
.merge(CommonRuntimeConfiguration.CONF)
// Bind the local runtime
.bindImplementation(JobSubmissionHandler.class,
LocalJobSubmissionHandler.class)
+ .bindImplementation(DriverConfigurationProvider.class,
LocalDriverConfigurationProviderImpl.class)
.bindConstructor(ExecutorService.class, ExecutorServiceConstructor.class)
.bindImplementation(RuntimeClasspathProvider.class,
LocalClasspathProvider.class)
// Bind parameters of the local runtime
@@ -94,3 +92,4 @@ public class LocalRuntimeConfiguration extends
ConfigurationModuleBuilder {
}
+
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
index d3468d4..179604d 100644
---
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
+++
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Public;
import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
import org.apache.reef.runtime.mesos.MesosClasspathProvider;
@@ -53,6 +54,7 @@ public class MesosClientConfiguration extends
ConfigurationModuleBuilder {
public static final ConfigurationModule CONF = new MesosClientConfiguration()
.merge(CommonRuntimeConfiguration.CONF)
.bindImplementation(JobSubmissionHandler.class,
MesosJobSubmissionHandler.class)
+ .bindImplementation(DriverConfigurationProvider.class,
MesosDriverConfigurationProviderImpl.class)
.bindNamedParameter(RootFolder.class, ROOT_FOLDER)
.bindNamedParameter(MasterIp.class, MASTER_IP)
.bindConstructor(Configuration.class, HDFSConfigurationConstructor.class)
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosDriverConfigurationProviderImpl.java
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosDriverConfigurationProviderImpl.java
new file mode 100644
index 0000000..648c7d0
--- /dev/null
+++
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosDriverConfigurationProviderImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.driver.MesosDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.net.URI;
+
+/**
+ * Default driver configuration provider for Mesos.
+ */
+final class MesosDriverConfigurationProviderImpl implements
DriverConfigurationProvider {
+
+ private final String masterIp;
+ private final double jvmSlack;
+
+ @Inject
+ MesosDriverConfigurationProviderImpl(@Parameter(MasterIp.class) final String
masterIp,
+ @Parameter(JVMHeapSlack.class)
final double jvmSlack) {
+ this.masterIp = masterIp;
+ this.jvmSlack = jvmSlack;
+ }
+
+ @Override
+ public Configuration getDriverConfiguration(final URI jobFolder,
+ final String clientRemoteId,
+ final String jobId,
+ final Configuration
applicationConfiguration) {
+ return Configurations.merge(MesosDriverConfiguration.CONF
+ .set(MesosDriverConfiguration.MESOS_MASTER_IP,
this.masterIp)
+ .set(MesosDriverConfiguration.JOB_IDENTIFIER, jobId)
+ .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER,
clientRemoteId)
+ .set(MesosDriverConfiguration.JVM_HEAP_SLACK,
this.jvmSlack)
+ .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1)
+ // must be 1 as there is 1 scheduler at the same time
+ .build(),
+ applicationConfiguration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
index 880d30c..f84e082 100644
---
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
+++
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -20,18 +20,15 @@ package org.apache.reef.runtime.mesos.client;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.FileResource;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
-import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
-import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
-import org.apache.reef.runtime.mesos.driver.MesosDriverConfiguration;
import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;
@@ -60,22 +57,19 @@ final class MesosJobSubmissionHandler implements
JobSubmissionHandler {
private final ClasspathProvider classpath;
private final REEFFileNames fileNames;
private final String rootFolderName;
- private final String masterIp;
- private final double jvmSlack;
+ private final DriverConfigurationProvider driverConfigurationProvider;
@Inject
MesosJobSubmissionHandler(@Parameter(RootFolder.class) final String
rootFolderName,
- @Parameter(MasterIp.class) final String masterIp,
final ConfigurationSerializer
configurationSerializer,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
- @Parameter(JVMHeapSlack.class) final double
jvmSlack) {
+ final DriverConfigurationProvider
driverConfigurationProvider) {
this.rootFolderName = new File(rootFolderName).getAbsolutePath();
- this.masterIp = masterIp;
this.configurationSerializer = configurationSerializer;
this.fileNames = fileNames;
this.classpath = classpath;
- this.jvmSlack = jvmSlack;
+ this.driverConfigurationProvider = driverConfigurationProvider;
}
@Override
@@ -86,7 +80,7 @@ final class MesosJobSubmissionHandler implements
JobSubmissionHandler {
public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
try {
final File jobFolder = new File(new File(this.rootFolderName),
- "/" + jobSubmissionEvent.getIdentifier() + "-" +
System.currentTimeMillis() + "/");
+ "/" + jobSubmissionEvent.getIdentifier() + "-" +
System.currentTimeMillis() + "/");
final File driverFolder = new File(jobFolder, DRIVER_FOLDER_NAME);
if (!driverFolder.exists() && !driverFolder.mkdirs()) {
@@ -120,16 +114,12 @@ final class MesosJobSubmissionHandler implements
JobSubmissionHandler {
Files.copy(src, dst,
java.nio.file.StandardCopyOption.REPLACE_EXISTING);
}
- final Configuration driverConfiguration =
- Configurations.merge(MesosDriverConfiguration.CONF
- .set(MesosDriverConfiguration.MESOS_MASTER_IP, this.masterIp)
- .set(MesosDriverConfiguration.JOB_IDENTIFIER,
jobSubmissionEvent.getIdentifier())
- .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER,
jobSubmissionEvent.getRemoteId())
- .set(MesosDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
- .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1)
- // must be 1 as there is 1 scheduler at the same time
- .build(),
- jobSubmissionEvent.getConfiguration());
+ final Configuration driverConfiguration =
this.driverConfigurationProvider.getDriverConfiguration(
+ null,
+ jobSubmissionEvent.getRemoteId(),
+ jobSubmissionEvent.getIdentifier(),
+ jobSubmissionEvent.getConfiguration());
+
final File runtimeConfigurationFile = new File(driverFolder,
this.fileNames.getDriverConfigurationPath());
this.configurationSerializer.toFile(driverConfiguration,
runtimeConfigurationFile);
@@ -143,11 +133,11 @@ final class MesosJobSubmissionHandler implements
JobSubmissionHandler {
final File outFile = new File(driverFolder,
fileNames.getDriverStdoutFileName());
new ProcessBuilder()
- .command(launchCommand)
- .directory(driverFolder)
- .redirectError(errFile)
- .redirectOutput(outFile)
- .start();
+ .command(launchCommand)
+ .directory(driverFolder)
+ .redirectError(errFile)
+ .redirectOutput(outFile)
+ .start();
} catch (final IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/ExtensibleYarnClientConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/ExtensibleYarnClientConfiguration.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/ExtensibleYarnClientConfiguration.java
new file mode 100644
index 0000000..516c600
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/ExtensibleYarnClientConfiguration.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.client.parameters.JobPriority;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.ConfigurationProvider;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.util.logging.LoggingSetup;
+
+/**
+ * An extensible ConfigurationModule for the YARN resourcemanager.
+ */
+public final class ExtensibleYarnClientConfiguration extends
ConfigurationModuleBuilder {
+ static {
+ LoggingSetup.setupCommonsLogging();
+ }
+
+ public static final OptionalParameter<String> YARN_QUEUE_NAME = new
OptionalParameter<>();
+ public static final OptionalParameter<Integer> YARN_PRIORITY = new
OptionalParameter<>();
+
+ public static final OptionalParameter<Double> JVM_HEAP_SLACK = new
OptionalParameter<>();
+
+ /**
+ * Configuration provides whose Configuration will be merged into all Driver
Configuration.
+ */
+ public static final OptionalImpl<ConfigurationProvider>
DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>();
+
+ /**
+ * Driver configuration provider for the client.
+ */
+ public static final RequiredImpl<DriverConfigurationProvider>
DRIVER_CONFIGURATION_PROVIDER = new RequiredImpl<>();
+
+ public static final ConfigurationModule CONF = new
ExtensibleYarnClientConfiguration()
+ .merge(CommonRuntimeConfiguration.CONF)
+ // Bind YARN
+ .bindImplementation(JobSubmissionHandler.class,
YarnJobSubmissionHandler.class)
+ .bindImplementation(DriverConfigurationProvider.class,
DRIVER_CONFIGURATION_PROVIDER)
+ // Bind the parameters given by the user
+ .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
+ .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
+ .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+ .bindImplementation(RuntimeClasspathProvider.class,
YarnClasspathProvider.class)
+ // Bind external constructors. Taken from
YarnExternalConstructors.registerClientConstructors
+ .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class,
YarnConfigurationConstructor.class)
+ .bindSetEntry(DriverConfigurationProviders.class,
DRIVER_CONFIGURATION_PROVIDERS)
+ .build();
+
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
index 8731aa4..33fb177 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Public;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
@@ -30,10 +31,7 @@ import
org.apache.reef.runtime.yarn.client.parameters.JobPriority;
import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
import org.apache.reef.tang.ConfigurationProvider;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.OptionalImpl;
-import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.*;
import org.apache.reef.util.logging.LoggingSetup;
/**
@@ -57,17 +55,17 @@ public class YarnClientConfiguration extends
ConfigurationModuleBuilder {
public static final OptionalImpl<ConfigurationProvider>
DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>();
public static final ConfigurationModule CONF = new YarnClientConfiguration()
- .merge(CommonRuntimeConfiguration.CONF)
+ .merge(CommonRuntimeConfiguration.CONF)
// Bind YARN
- .bindImplementation(JobSubmissionHandler.class,
YarnJobSubmissionHandler.class)
+ .bindImplementation(JobSubmissionHandler.class,
YarnJobSubmissionHandler.class)
+ .bindImplementation(DriverConfigurationProvider.class,
YarnDriverConfigurationProviderImpl.class)
// Bind the parameters given by the user
- .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
- .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
- .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
- .bindImplementation(RuntimeClasspathProvider.class,
YarnClasspathProvider.class)
+ .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
+ .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
+ .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+ .bindImplementation(RuntimeClasspathProvider.class,
YarnClasspathProvider.class)
// Bind external constructors. Taken from
YarnExternalConstructors.registerClientConstructors
- .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class,
YarnConfigurationConstructor.class)
- .bindSetEntry(DriverConfigurationProviders.class,
DRIVER_CONFIGURATION_PROVIDERS)
- .build();
-
+
.bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class,
YarnConfigurationConstructor.class)
+ .bindSetEntry(DriverConfigurationProviders.class,
DRIVER_CONFIGURATION_PROVIDERS)
+ .build();
}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
new file mode 100644
index 0000000..f7aabb8
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.net.URI;
+
+import static org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.*;
+
+/**
+ * Default driver configuration provider for yarn runtime.
+ */
+final class YarnDriverConfigurationProviderImpl implements
DriverConfigurationProvider {
+ private final double jvmSlack;
+
+ @Inject
+ YarnDriverConfigurationProviderImpl(@Parameter(JVMHeapSlack.class) final
double jvmSlack) {
+ this.jvmSlack = jvmSlack;
+ }
+
+ @Override
+ public Configuration getDriverConfiguration(final URI jobFolder,
+ final String clientRemoteId,
+ final String jobId,
+ final Configuration
applicationConfiguration) {
+ return Configurations.merge(
+ org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.CONF
+ .set(JOB_SUBMISSION_DIRECTORY, jobFolder.toString())
+ .set(JOB_IDENTIFIER, jobId)
+ .set(CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
+ .set(JVM_HEAP_SLACK, this.jvmSlack)
+ .build(),
+ applicationConfiguration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/c63219ff/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index faf8e1f..e2c6389 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -26,18 +26,16 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.JobJarMaker;
import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
-import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
@@ -60,29 +58,29 @@ final class YarnJobSubmissionHandler implements
JobSubmissionHandler {
private final REEFFileNames fileNames;
private final ClasspathProvider classpath;
private final JobUploader uploader;
- private final double jvmSlack;
private final String defaultQueueName;
private final SecurityTokenProvider tokenProvider;
+ private final DriverConfigurationProvider driverConfigurationProvider;
@Inject
YarnJobSubmissionHandler(
- final YarnConfiguration yarnConfiguration,
- final JobJarMaker jobJarMaker,
- final REEFFileNames fileNames,
- final ClasspathProvider classpath,
- final JobUploader uploader,
- @Parameter(JVMHeapSlack.class) final double jvmSlack,
- @Parameter(JobQueue.class) final String defaultQueueName,
- final SecurityTokenProvider tokenProvider) throws IOException {
+ final YarnConfiguration yarnConfiguration,
+ final JobJarMaker jobJarMaker,
+ final REEFFileNames fileNames,
+ final ClasspathProvider classpath,
+ final JobUploader uploader,
+ @Parameter(JobQueue.class) final String defaultQueueName,
+ final SecurityTokenProvider tokenProvider,
+ final DriverConfigurationProvider driverConfigurationProvider)
throws IOException {
this.yarnConfiguration = yarnConfiguration;
this.jobJarMaker = jobJarMaker;
this.fileNames = fileNames;
this.classpath = classpath;
this.uploader = uploader;
- this.jvmSlack = jvmSlack;
this.defaultQueueName = defaultQueueName;
this.tokenProvider = tokenProvider;
+ this.driverConfigurationProvider = driverConfigurationProvider;
}
@Override
@@ -130,14 +128,12 @@ final class YarnJobSubmissionHandler implements
JobSubmissionHandler {
private Configuration makeDriverConfiguration(
final JobSubmissionEvent jobSubmissionEvent,
final Path jobFolderPath) throws IOException {
- return Configurations.merge(
- YarnDriverConfiguration.CONF
- .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY,
jobFolderPath.toString())
- .set(YarnDriverConfiguration.JOB_IDENTIFIER,
jobSubmissionEvent.getIdentifier())
- .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER,
jobSubmissionEvent.getRemoteId())
- .set(YarnDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
- .build(),
- jobSubmissionEvent.getConfiguration());
+
+ return this.driverConfigurationProvider.getDriverConfiguration(
+ jobFolderPath.toUri(),
+ jobSubmissionEvent.getRemoteId(),
+ jobSubmissionEvent.getIdentifier(),
+ jobSubmissionEvent.getConfiguration());
}
private static int getPriority(final JobSubmissionEvent jobSubmissionEvent) {
@@ -168,7 +164,7 @@ final class YarnJobSubmissionHandler implements
JobSubmissionHandler {
/**
* Extracts the queue name from the driverConfiguration or return default if
none is set.
*
- * @param driverConfiguration
+ * @param driverConfiguration The drievr configuration
* @return the queue name from the driverConfiguration or return default if
none is set.
*/
private String getQueue(final Configuration driverConfiguration) {