Repository: reef Updated Branches: refs/heads/master 2a4d08d4f -> 45f7f1730
[REEF-1712] Make JobSubmissionHandler return the Application ID This change adds `.getApplicationId()` method to `JobSubmissionHandler` interface and its implementations. JIRA: [REEF-1712](https://issues.apache.org/jira/browse/REEF-1712) Pull request: Closes #1227 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/45f7f173 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/45f7f173 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/45f7f173 Branch: refs/heads/master Commit: 45f7f17305ccec0cc56b6e1e287e0029954189ec Parents: 2a4d08d Author: Sergiy Matusevych <[email protected]> Authored: Fri Jan 13 14:42:21 2017 -0800 Committer: Mariia Mykhailova <[email protected]> Committed: Tue Jan 17 16:47:42 2017 -0800 ---------------------------------------------------------------------- .../common/client/api/JobSubmissionHandler.java | 8 +++-- .../client/HDInsightJobSubmissionHandler.java | 35 ++++++++++++-------- .../local/client/LocalJobSubmissionHandler.java | 14 ++++++++ .../mesos/client/MesosJobSubmissionHandler.java | 15 +++++++++ .../yarn/client/YarnJobSubmissionHandler.java | 25 +++++++++++--- 5 files changed, 78 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java index 89a1068..71e44fd 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java @@ -27,6 +27,10 @@ import org.apache.reef.wake.EventHandler; @RuntimeAuthor public interface JobSubmissionHandler extends EventHandler<JobSubmissionEvent>, AutoCloseable { - @Override - void close(); + /** + * Get the RM application ID. + * Return null if the application has not been submitted yet, or was submitted unsuccessfully. + * @return string application ID or null if no app has been submitted yet. + */ + String getApplicationId(); } http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java index 7c70a5a..6d61a11 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java @@ -57,6 +57,8 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler private final ClasspathProvider classpath; private final DriverConfigurationProvider driverConfigurationProvider; + private String applicationId; + @Inject HDInsightJobSubmissionHandler(final AzureUploader uploader, final JobJarMaker jobJarMaker, @@ -83,16 +85,15 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler try { LOG.log(Level.FINE, "Requesting Application ID from HDInsight."); - final ApplicationID applicationID = this.hdInsightInstance.getApplicationID(); + final String appId = this.hdInsightInstance.getApplicationID().getApplicationId(); - LOG.log(Level.INFO, "Submitting application {0} to YARN.", applicationID.getApplicationId()); + LOG.log(Level.INFO, "Submitting application {0} to YARN.", appId); LOG.log(Level.FINE, "Creating a job folder on Azure."); - final URI jobFolderURL = this.uploader.createJobFolder(applicationID.getApplicationId()); + final URI jobFolderURL = this.uploader.createJobFolder(appId); LOG.log(Level.FINE, "Assembling Configuration for the Driver."); - final Configuration driverConfiguration = - makeDriverConfiguration(jobSubmissionEvent, applicationID.getApplicationId(), jobFolderURL); + final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, appId, jobFolderURL); LOG.log(Level.FINE, "Making Job JAR."); final File jobSubmissionJarFile = @@ -105,7 +106,7 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler final String command = getCommandString(jobSubmissionEvent); final ApplicationSubmission applicationSubmission = new ApplicationSubmission() - .setApplicationId(applicationID.getApplicationId()) + .setApplicationId(appId) .setApplicationName(jobSubmissionEvent.getIdentifier()) .setResource(getResource(jobSubmissionEvent)) .setAmContainerSpec(new AmContainerSpec() @@ -113,8 +114,8 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler .setCommand(command)); this.hdInsightInstance.submitApplication(applicationSubmission); - LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", - applicationID.getApplicationId()); + this.applicationId = appId; + LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", appId); } catch (final IOException ex) { LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex); @@ -123,6 +124,16 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler } /** + * Get the RM application ID. + * Return null if the application has not been submitted yet, or was submitted unsuccessfully. + * @return string application ID or null if no app has been submitted yet. + */ + @Override + public String getApplicationId() { + return this.applicationId; + } + + /** * Extracts the resource demands from the jobSubmissionEvent. */ private Resource getResource( @@ -159,12 +170,10 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler private Configuration makeDriverConfiguration( final JobSubmissionEvent jobSubmissionEvent, - final String applicationId, + final String appId, final URI jobFolderURL) throws IOException { - return this.driverConfigurationProvider.getDriverConfiguration(jobFolderURL, - jobSubmissionEvent.getRemoteId(), - applicationId, - jobSubmissionEvent.getConfiguration()); + return this.driverConfigurationProvider.getDriverConfiguration( + jobFolderURL, jobSubmissionEvent.getRemoteId(), appId, jobSubmissionEvent.getConfiguration()); } } http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java index be41c94..7a8996e 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java @@ -54,6 +54,8 @@ public final class LocalJobSubmissionHandler implements JobSubmissionHandler { private final LoggingScopeFactory loggingScopeFactory; private final DriverConfigurationProvider driverConfigurationProvider; + private String applicationId; + @Inject LocalJobSubmissionHandler( final ExecutorService executor, @@ -108,10 +110,22 @@ public final class LocalJobSubmissionHandler implements JobSubmissionHandler { this.configurationSerializer.toFile(driverConfiguration, new File(driverFolder, this.fileNames.getDriverConfigurationPath())); this.driverLauncher.launch(driverFolder); + this.applicationId = t.getIdentifier(); + } catch (final Exception e) { LOG.log(Level.SEVERE, "Unable to setup driver.", e); throw new RuntimeException("Unable to setup driver.", e); } } } + + /** + * Get the RM application ID. + * Return null if the application has not been submitted yet, or was submitted unsuccessfully. + * @return string application ID or null if no app has been submitted yet. + */ + @Override + public String getApplicationId() { + return this.applicationId; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java index f84e082..fe5eb0d 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java @@ -59,6 +59,8 @@ final class MesosJobSubmissionHandler implements JobSubmissionHandler { private final String rootFolderName; private final DriverConfigurationProvider driverConfigurationProvider; + private String applicationId; + @Inject MesosJobSubmissionHandler(@Parameter(RootFolder.class) final String rootFolderName, final ConfigurationSerializer configurationSerializer, @@ -138,8 +140,21 @@ final class MesosJobSubmissionHandler implements JobSubmissionHandler { .redirectError(errFile) .redirectOutput(outFile) .start(); + + this.applicationId = jobSubmissionEvent.getIdentifier(); + } catch (final IOException e) { throw new RuntimeException(e); } } + + /** + * Get the RM application ID. + * Return null if the application has not been submitted yet, or was submitted unsuccessfully. + * @return string application ID or null if no app has been submitted yet. + */ + @Override + public String getApplicationId() { + return this.applicationId; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/45f7f173/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java index ab361bc..9457f90 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java @@ -64,6 +64,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { private final SecurityTokenProvider tokenProvider; private final DriverConfigurationProvider driverConfigurationProvider; + private String applicationId; + @Inject YarnJobSubmissionHandler( @Parameter(JobQueue.class) final String defaultQueueName, @@ -94,7 +96,9 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { @Override public void onNext(final JobSubmissionEvent jobSubmissionEvent) { - LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionEvent.getIdentifier()); + final String id = jobSubmissionEvent.getIdentifier(); + LOG.log(Level.FINEST, "Submitting{0} job: {1}", + new Object[] {this.isUnmanaged ? " UNMANAGED AM" : "", jobSubmissionEvent}); try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper( this.yarnConfiguration, this.fileNames, this.classpath, this.tokenProvider, this.isUnmanaged)) { @@ -112,7 +116,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { submissionHelper .addLocalResource(this.fileNames.getREEFFolderName(), driverJarOnDfs) - .setApplicationName(jobSubmissionEvent.getIdentifier()) + .setApplicationName(id) .setDriverMemory(jobSubmissionEvent.getDriverMemory().get()) .setPriority(getPriority(jobSubmissionEvent)) .setQueue(getQueue(jobSubmissionEvent)) @@ -120,13 +124,26 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { .setMaxApplicationAttempts(getMaxApplicationSubmissions(jobSubmissionEvent)) .submit(); - LOG.log(Level.FINEST, "Submitted job with ID [{0}]", jobSubmissionEvent.getIdentifier()); + this.applicationId = submissionHelper.getStringApplicationId(); + LOG.log(Level.FINEST, "Submitted{0} job with ID {1} :: {2}", new String[] { + this.isUnmanaged ? " UNMANAGED AM" : "", id, this.applicationId}); + } catch (final YarnException | IOException e) { - throw new RuntimeException("Unable to submit Driver to YARN.", e); + throw new RuntimeException("Unable to submit Driver to YARN: " + id, e); } } /** + * Get the RM application ID. + * Return null if the application has not been submitted yet, or was submitted unsuccessfully. + * @return string application ID or null if no app has been submitted yet. + */ + @Override + public String getApplicationId() { + return this.applicationId; + } + + /** * Assembles the Driver configuration. */ private Configuration makeDriverConfiguration(
