Repository: incubator-reef
Updated Branches:
  refs/heads/master b6b9e39bb -> 3aaca1841


[REEF-456] Make job submission folder prefix configurable

This addressed the issue by adding a `JobSubmissionDirectoryPrefix`
named parameter and propogating it to driver.

JIRA:
  [REEF-456](https://issues.apache.org/jira/browse/REEF-456)

Pull Request:
  This closes #284


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/3aaca184
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/3aaca184
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/3aaca184

Branch: refs/heads/master
Commit: 3aaca18417ca2553313dcd0a85516dbd09b58d72
Parents: b6b9e39
Author: Beysim Sezgin <[email protected]>
Authored: Wed Jul 8 09:47:29 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Jul 9 12:19:30 2015 -0700

----------------------------------------------------------------------
 .../resourcemanager/ResourceManagerStatus.java  |  2 +-
 .../yarn/client/YarnJobSubmissionHandler.java   | 32 ++++++++----
 .../yarn/client/uploader/JobUploader.java       | 37 ++++++++++----
 .../driver/JobSubmissionDirectoryProvider.java  | 36 +++++++++++++
 .../JobSubmissionDirectoryProviderImpl.java     | 54 ++++++++++++++++++++
 .../JobSubmissionDirectoryPrefix.java           | 29 +++++++++++
 6 files changed, 168 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
index 24c8e2f..b10262a 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -66,7 +66,7 @@ public final class ResourceManagerStatus implements 
EventHandler<RuntimeStatusEv
   public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) 
{
     final ReefServiceProtos.State newState = runtimeStatusEvent.getState();
     LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent);
-    this.outstandingContainerRequests = 
runtimeStatusEvent.getOutstandingContainerRequests().get();
+    this.outstandingContainerRequests = 
runtimeStatusEvent.getOutstandingContainerRequests().orElse(0);
     this.containerAllocationCount = 
runtimeStatusEvent.getContainerAllocationList().size();
     this.setState(runtimeStatusEvent.getState());
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index eaa960d..5bd17ac 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -36,10 +36,11 @@ import 
org.apache.reef.runtime.yarn.client.uploader.JobUploader;
 import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
-import org.apache.reef.tang.types.NamedParameterNode;
-import org.apache.reef.tang.util.ReflectionUtilities;
+import org.apache.reef.util.Optional;
 
 import javax.inject.Inject;
 import java.io.File;
@@ -47,6 +48,8 @@ import java.io.IOException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import static org.apache.reef.util.Optional.*;
+
 @Private
 @ClientSide
 final class YarnJobSubmissionHandler implements JobSubmissionHandler {
@@ -93,7 +96,10 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
              new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, 
this.classpath)) {
 
       LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
-      final JobFolder jobFolderOnDfs = 
this.uploader.createJobFolder(submissionHelper.getApplicationId());
+      final Optional<String> userBoundJobSubmissionDirectory = 
getUserBoundJobSubmissionDirectory(jobSubmissionEvent.getConfiguration());
+      final JobFolder jobFolderOnDfs = 
userBoundJobSubmissionDirectory.isPresent()
+          ? 
this.uploader.createJobFolder(userBoundJobSubmissionDirectory.get())
+          : this.uploader.createJobFolder(submissionHelper.getApplicationId());
       final Configuration driverConfiguration = 
makeDriverConfiguration(jobSubmissionEvent, jobFolderOnDfs.getPath());
       final File jobSubmissionFile = 
this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, 
driverConfiguration);
       final LocalResource driverJarOnDfs = 
jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile);
@@ -118,20 +124,14 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
   private Configuration makeDriverConfiguration(
       final JobSubmissionEvent jobSubmissionEvent,
       final Path jobFolderPath) throws IOException {
-    final Configuration config = jobSubmissionEvent.getConfiguration();
-    final String userBoundJobSubmissionDirectory = 
config.getNamedParameter((NamedParameterNode<?>) 
config.getClassHierarchy().getNode(ReflectionUtilities.getFullName(DriverJobSubmissionDirectory.class)));
-    LOG.log(Level.FINE, "user bound job submission Directory: " + 
userBoundJobSubmissionDirectory);
-    final String finalJobFolderPath =
-        (userBoundJobSubmissionDirectory == null || 
userBoundJobSubmissionDirectory.isEmpty())
-            ? jobFolderPath.toString() : userBoundJobSubmissionDirectory;
     return Configurations.merge(
         YarnDriverConfiguration.CONF
-            .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, 
finalJobFolderPath)
+            .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, 
jobFolderPath.toString())
             .set(YarnDriverConfiguration.JOB_IDENTIFIER, 
jobSubmissionEvent.getIdentifier())
             .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, 
jobSubmissionEvent.getRemoteId())
             .set(YarnDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
             .build(),
-        config);
+        jobSubmissionEvent.getConfiguration());
   }
 
   private static int getPriority(final JobSubmissionEvent jobSubmissionEvent) {
@@ -147,4 +147,14 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
                           final String defaultQueue) {
     return jobSubmissionEvent.getQueue().orElse(defaultQueue);
   }
+
+  private static org.apache.reef.util.Optional<String> 
getUserBoundJobSubmissionDirectory(final Configuration configuration) {
+    try {
+      return 
Optional.ofNullable(Tang.Factory.getTang().newInjector(configuration).getNamedInstance(DriverJobSubmissionDirectory.class));
+    } catch (InjectionException ex) {
+      return Optional.empty();
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
index 417c069..98edbf5 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
@@ -21,23 +21,26 @@ package org.apache.reef.runtime.yarn.client.uploader;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-
+import org.apache.reef.runtime.yarn.driver.JobSubmissionDirectoryProvider;
 import javax.inject.Inject;
 import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Helper class to upload the driver files to HDFS.
  */
 public final class JobUploader {
 
+  private static final Logger LOG = 
Logger.getLogger(JobUploader.class.getName());
+
   private final FileSystem fileSystem;
-  private final REEFFileNames fileNames;
+  private final JobSubmissionDirectoryProvider jobSubmissionDirectoryProvider;
 
   @Inject
   JobUploader(final YarnConfiguration yarnConfiguration,
-              final REEFFileNames fileNames) throws IOException {
-    this.fileNames = fileNames;
+              JobSubmissionDirectoryProvider jobSubmissionDirectoryProvider) 
throws IOException {
+    this.jobSubmissionDirectoryProvider = jobSubmissionDirectoryProvider;
     this.fileSystem = FileSystem.get(yarnConfiguration);
   }
 
@@ -48,10 +51,24 @@ public final class JobUploader {
    * @return a reference to the JobFolder that can be used to upload files to 
it.
    * @throws IOException
    */
-  public JobFolder createJobFolder(final String applicationId) throws 
IOException {
-    // TODO: This really should be configurable, but wasn't in the code I 
moved as part of [REEF-228]
-    final Path jobFolderPath = new Path("/tmp/" + 
this.fileNames.getJobFolderPrefix() + applicationId + "/");
-    return new JobFolder(this.fileSystem, jobFolderPath);
+  public JobFolder createJobFolderWithApplicationId(final String 
applicationId) throws IOException {
+    final Path jobFolderPath = 
jobSubmissionDirectoryProvider.getJobSubmissionDirectoryPath(applicationId);
+    final String finalJobFolderPath = jobFolderPath.toString();
+    LOG.log(Level.FINE, "Final job submission Directory: " + 
finalJobFolderPath);
+    return createJobFolder(finalJobFolderPath);
+  }
+
+
+  /**
+   * Convenience override for int ids.
+   *
+   * @param finalJobFolderPath
+   * @return
+   * @throws IOException
+   */
+  public JobFolder createJobFolder(final String finalJobFolderPath) throws 
IOException {
+    LOG.log(Level.FINE, "Final job submission Directory: " + 
finalJobFolderPath);
+    return new JobFolder(this.fileSystem, new Path(finalJobFolderPath));
   }
 
   /**
@@ -62,6 +79,6 @@ public final class JobUploader {
    * @throws IOException
    */
   public JobFolder createJobFolder(final int applicationId) throws IOException 
{
-    return this.createJobFolder(Integer.toString(applicationId));
+    return 
this.createJobFolderWithApplicationId(Integer.toString(applicationId));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProvider.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProvider.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProvider.java
new file mode 100644
index 0000000..35d0048
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * Provides path to job submission directory.
+ *
+ */
+@DefaultImplementation(JobSubmissionDirectoryProviderImpl.class)
+public interface JobSubmissionDirectoryProvider {
+  /**
+   * Returns path to job submission directory.
+   *
+   * @return job submission directory
+   */
+  Path getJobSubmissionDirectoryPath(String applicationId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProviderImpl.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProviderImpl.java
new file mode 100644
index 0000000..0222ddb
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/JobSubmissionDirectoryProviderImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import 
org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+public final class JobSubmissionDirectoryProviderImpl implements 
JobSubmissionDirectoryProvider {
+
+  /**
+   * The path on (H)DFS which is used as the job's folder.
+   */
+  private final String jobSubmissionDirectory;
+  private final REEFFileNames fileNames;
+
+  @Inject
+  
JobSubmissionDirectoryProviderImpl(@Parameter(JobSubmissionDirectoryPrefix.class)
 final String jobSubmissionDirectoryPrefix,
+                                     final REEFFileNames fileNames) {
+    this.jobSubmissionDirectory = jobSubmissionDirectoryPrefix;
+    this.fileNames = fileNames;
+  }
+
+  @Override
+  public Path getJobSubmissionDirectoryPath(String applicationId) {
+    return new Path(this.jobSubmissionDirectory +
+        "/" +
+        new 
SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_").format(Calendar.getInstance().getTime())
 +
+        this.fileNames.getJobFolderPrefix() +
+        applicationId +
+        "/");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3aaca184/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectoryPrefix.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectoryPrefix.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectoryPrefix.java
new file mode 100644
index 0000000..30f1741
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectoryPrefix.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The job submission directory.
+ */
+@NamedParameter(doc = "The job submission directory prefix.", default_value = 
"/vol1/tmp")
+public final class JobSubmissionDirectoryPrefix implements Name<String> {
+}

Reply via email to