Repository: reef
Updated Branches:
  refs/heads/master 2a4d08d4f -> 45f7f1730


[REEF-1712] Make JobSubmissionHandler return the Application ID

This change adds `.getApplicationId()` method
to `JobSubmissionHandler` interface and its implementations.

JIRA:
  [REEF-1712](https://issues.apache.org/jira/browse/REEF-1712)

Pull request:
  Closes #1227


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/45f7f173
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/45f7f173
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/45f7f173

Branch: refs/heads/master
Commit: 45f7f17305ccec0cc56b6e1e287e0029954189ec
Parents: 2a4d08d
Author: Sergiy Matusevych <[email protected]>
Authored: Fri Jan 13 14:42:21 2017 -0800
Committer: Mariia Mykhailova <[email protected]>
Committed: Tue Jan 17 16:47:42 2017 -0800

----------------------------------------------------------------------
 .../common/client/api/JobSubmissionHandler.java |  8 +++--
 .../client/HDInsightJobSubmissionHandler.java   | 35 ++++++++++++--------
 .../local/client/LocalJobSubmissionHandler.java | 14 ++++++++
 .../mesos/client/MesosJobSubmissionHandler.java | 15 +++++++++
 .../yarn/client/YarnJobSubmissionHandler.java   | 25 +++++++++++---
 5 files changed, 78 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
index 89a1068..71e44fd 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java
@@ -27,6 +27,10 @@ import org.apache.reef.wake.EventHandler;
 @RuntimeAuthor
 public interface JobSubmissionHandler extends 
EventHandler<JobSubmissionEvent>, AutoCloseable {
 
-  @Override
-  void close();
+  /**
+   * Get the RM application ID.
+   * Return null if the application has not been submitted yet, or was 
submitted unsuccessfully.
+   * @return string application ID or null if no app has been submitted yet.
+   */
+  String getApplicationId();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
index 7c70a5a..6d61a11 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
@@ -57,6 +57,8 @@ public final class HDInsightJobSubmissionHandler implements 
JobSubmissionHandler
   private final ClasspathProvider classpath;
   private final DriverConfigurationProvider driverConfigurationProvider;
 
+  private String applicationId;
+
   @Inject
   HDInsightJobSubmissionHandler(final AzureUploader uploader,
                                 final JobJarMaker jobJarMaker,
@@ -83,16 +85,15 @@ public final class HDInsightJobSubmissionHandler implements 
JobSubmissionHandler
     try {
 
       LOG.log(Level.FINE, "Requesting Application ID from HDInsight.");
-      final ApplicationID applicationID = 
this.hdInsightInstance.getApplicationID();
+      final String appId = 
this.hdInsightInstance.getApplicationID().getApplicationId();
 
-      LOG.log(Level.INFO, "Submitting application {0} to YARN.", 
applicationID.getApplicationId());
+      LOG.log(Level.INFO, "Submitting application {0} to YARN.", appId);
 
       LOG.log(Level.FINE, "Creating a job folder on Azure.");
-      final URI jobFolderURL = 
this.uploader.createJobFolder(applicationID.getApplicationId());
+      final URI jobFolderURL = this.uploader.createJobFolder(appId);
 
       LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
-      final Configuration driverConfiguration =
-          makeDriverConfiguration(jobSubmissionEvent, 
applicationID.getApplicationId(), jobFolderURL);
+      final Configuration driverConfiguration = 
makeDriverConfiguration(jobSubmissionEvent, appId, jobFolderURL);
 
       LOG.log(Level.FINE, "Making Job JAR.");
       final File jobSubmissionJarFile =
@@ -105,7 +106,7 @@ public final class HDInsightJobSubmissionHandler implements 
JobSubmissionHandler
       final String command = getCommandString(jobSubmissionEvent);
 
       final ApplicationSubmission applicationSubmission = new 
ApplicationSubmission()
-          .setApplicationId(applicationID.getApplicationId())
+          .setApplicationId(appId)
           .setApplicationName(jobSubmissionEvent.getIdentifier())
           .setResource(getResource(jobSubmissionEvent))
           .setAmContainerSpec(new AmContainerSpec()
@@ -113,8 +114,8 @@ public final class HDInsightJobSubmissionHandler implements 
JobSubmissionHandler
                   .setCommand(command));
 
       this.hdInsightInstance.submitApplication(applicationSubmission);
-      LOG.log(Level.INFO, "Submitted application to HDInsight. The application 
id is: {0}",
-          applicationID.getApplicationId());
+      this.applicationId = appId;
+      LOG.log(Level.INFO, "Submitted application to HDInsight. The application 
id is: {0}", appId);
 
     } catch (final IOException ex) {
       LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex);
@@ -123,6 +124,16 @@ public final class HDInsightJobSubmissionHandler 
implements JobSubmissionHandler
   }
 
   /**
+   * Get the RM application ID.
+   * Return null if the application has not been submitted yet, or was 
submitted unsuccessfully.
+   * @return string application ID or null if no app has been submitted yet.
+   */
+  @Override
+  public String getApplicationId() {
+    return this.applicationId;
+  }
+
+  /**
    * Extracts the resource demands from the jobSubmissionEvent.
    */
   private Resource getResource(
@@ -159,12 +170,10 @@ public final class HDInsightJobSubmissionHandler 
implements JobSubmissionHandler
 
   private Configuration makeDriverConfiguration(
       final JobSubmissionEvent jobSubmissionEvent,
-      final String applicationId,
+      final String appId,
       final URI jobFolderURL) throws IOException {
 
-    return 
this.driverConfigurationProvider.getDriverConfiguration(jobFolderURL,
-                                                            
jobSubmissionEvent.getRemoteId(),
-                                                            applicationId,
-                                                            
jobSubmissionEvent.getConfiguration());
+    return this.driverConfigurationProvider.getDriverConfiguration(
+        jobFolderURL, jobSubmissionEvent.getRemoteId(), appId, 
jobSubmissionEvent.getConfiguration());
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
index be41c94..7a8996e 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
@@ -54,6 +54,8 @@ public final class LocalJobSubmissionHandler implements 
JobSubmissionHandler {
   private final LoggingScopeFactory loggingScopeFactory;
   private final DriverConfigurationProvider driverConfigurationProvider;
 
+  private String applicationId;
+
   @Inject
   LocalJobSubmissionHandler(
       final ExecutorService executor,
@@ -108,10 +110,22 @@ public final class LocalJobSubmissionHandler implements 
JobSubmissionHandler {
         this.configurationSerializer.toFile(driverConfiguration,
             new File(driverFolder, 
this.fileNames.getDriverConfigurationPath()));
         this.driverLauncher.launch(driverFolder);
+        this.applicationId = t.getIdentifier();
+
       } catch (final Exception e) {
         LOG.log(Level.SEVERE, "Unable to setup driver.", e);
         throw new RuntimeException("Unable to setup driver.", e);
       }
     }
   }
+
+  /**
+   * Get the RM application ID.
+   * Return null if the application has not been submitted yet, or was 
submitted unsuccessfully.
+   * @return string application ID or null if no app has been submitted yet.
+   */
+  @Override
+  public String getApplicationId() {
+    return this.applicationId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
index f84e082..fe5eb0d 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -59,6 +59,8 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
   private final String rootFolderName;
   private final DriverConfigurationProvider driverConfigurationProvider;
 
+  private String applicationId;
+
   @Inject
   MesosJobSubmissionHandler(@Parameter(RootFolder.class) final String 
rootFolderName,
                             final ConfigurationSerializer 
configurationSerializer,
@@ -138,8 +140,21 @@ final class MesosJobSubmissionHandler implements 
JobSubmissionHandler {
               .redirectError(errFile)
               .redirectOutput(outFile)
               .start();
+
+      this.applicationId = jobSubmissionEvent.getIdentifier();
+
     } catch (final IOException e) {
       throw new RuntimeException(e);
     }
   }
+
+  /**
+   * Get the RM application ID.
+   * Return null if the application has not been submitted yet, or was 
submitted unsuccessfully.
+   * @return string application ID or null if no app has been submitted yet.
+   */
+  @Override
+  public String getApplicationId() {
+    return this.applicationId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index ab361bc..9457f90 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -64,6 +64,8 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
   private final SecurityTokenProvider tokenProvider;
   private final DriverConfigurationProvider driverConfigurationProvider;
 
+  private String applicationId;
+
   @Inject
   YarnJobSubmissionHandler(
           @Parameter(JobQueue.class) final String defaultQueueName,
@@ -94,7 +96,9 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
   @Override
   public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
 
-    LOG.log(Level.FINEST, "Submitting job with ID [{0}]", 
jobSubmissionEvent.getIdentifier());
+    final String id = jobSubmissionEvent.getIdentifier();
+    LOG.log(Level.FINEST, "Submitting{0} job: {1}",
+        new Object[] {this.isUnmanaged ? " UNMANAGED AM" : "", 
jobSubmissionEvent});
 
     try (final YarnSubmissionHelper submissionHelper = new 
YarnSubmissionHelper(
         this.yarnConfiguration, this.fileNames, this.classpath, 
this.tokenProvider, this.isUnmanaged)) {
@@ -112,7 +116,7 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
 
       submissionHelper
           .addLocalResource(this.fileNames.getREEFFolderName(), driverJarOnDfs)
-          .setApplicationName(jobSubmissionEvent.getIdentifier())
+          .setApplicationName(id)
           .setDriverMemory(jobSubmissionEvent.getDriverMemory().get())
           .setPriority(getPriority(jobSubmissionEvent))
           .setQueue(getQueue(jobSubmissionEvent))
@@ -120,13 +124,26 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
           
.setMaxApplicationAttempts(getMaxApplicationSubmissions(jobSubmissionEvent))
           .submit();
 
-      LOG.log(Level.FINEST, "Submitted job with ID [{0}]", 
jobSubmissionEvent.getIdentifier());
+      this.applicationId = submissionHelper.getStringApplicationId();
+      LOG.log(Level.FINEST, "Submitted{0} job with ID {1} :: {2}", new 
String[] {
+          this.isUnmanaged ? " UNMANAGED AM" : "", id, this.applicationId});
+
     } catch (final YarnException | IOException e) {
-      throw new RuntimeException("Unable to submit Driver to YARN.", e);
+      throw new RuntimeException("Unable to submit Driver to YARN: " + id, e);
     }
   }
 
   /**
+   * Get the RM application ID.
+   * Return null if the application has not been submitted yet, or was 
submitted unsuccessfully.
+   * @return string application ID or null if no app has been submitted yet.
+   */
+  @Override
+  public String getApplicationId() {
+    return this.applicationId;
+  }
+
+  /**
    * Assembles the Driver configuration.
    */
   private Configuration makeDriverConfiguration(

Reply via email to