Repository: reef
Updated Branches:
  refs/heads/master d5e448101 -> 4550b841d


[REEF-1727] Add the AM token to the client app credentials in the Unmanaged AM 
mode

Summary of changes:
   * For Unmanaged AM mode, add YARN security token to teh app credentials so 
that (in-process) AM can use it.
   * Bugfix: `YarnSubmissionHelper` must implement `AutoCloseable`, not 
`java.io.Closeable`.
   * `.close()` method does not throw.
   * Minor logging improvements.

JIRA: [REEF-1727](https://issues.apache.org/jira/browse/REEF-1727)

This closes  #1242


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4550b841
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4550b841
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4550b841

Branch: refs/heads/master
Commit: 4550b841db1ddedf6c59aaf93b4dc042ed760162
Parents: d5e4481
Author: Sergiy Matusevych <[email protected]>
Authored: Wed Jan 25 15:56:46 2017 -0800
Committer: Julia Wang <[email protected]>
Committed: Fri Jan 27 17:26:42 2017 -0800

----------------------------------------------------------------------
 .../yarn/client/YarnSubmissionHelper.java        | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/4550b841/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index 278e4e8..6df3ffc 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -19,6 +19,7 @@
 package org.apache.reef.runtime.yarn.client;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.*;
@@ -26,13 +27,13 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.reef.runtime.common.REEFLauncher;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
 import org.apache.reef.runtime.yarn.util.YarnTypes;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.*;
 import java.util.logging.Level;
@@ -41,7 +42,7 @@ import java.util.logging.Logger;
 /**
  * Helper code that wraps the YARN Client API for our purposes.
  */
-public final class YarnSubmissionHelper implements Closeable {
+public final class YarnSubmissionHelper implements AutoCloseable {
 
   private static final Logger LOG = 
Logger.getLogger(YarnSubmissionHelper.class.getName());
 
@@ -52,6 +53,7 @@ public final class YarnSubmissionHelper implements Closeable {
   private final Map<String, LocalResource> resources = new HashMap<>();
   private final ClasspathProvider classpath;
   private final SecurityTokenProvider tokenProvider;
+  private final boolean isUnmanaged;
   private final List<String> commandPrefixList;
 
   private String driverStdoutFilePath;
@@ -67,6 +69,7 @@ public final class YarnSubmissionHelper implements Closeable {
                               final List<String> commandPrefixList) throws 
IOException, YarnException {
 
     this.classpath = classpath;
+    this.isUnmanaged = isUnmanaged;
 
     this.driverStdoutFilePath =
         ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + 
fileNames.getDriverStdoutFileName();
@@ -279,6 +282,13 @@ public final class YarnSubmissionHelper implements 
Closeable {
     }
 
     this.yarnClient.submitApplication(applicationSubmissionContext);
+
+    if (this.isUnmanaged) {
+      // For Unmanaged AM mode, add a new app token to the
+      // current process so it can talk to the RM as an AM.
+      final Token<AMRMTokenIdentifier> token = 
this.yarnClient.getAMRMToken(this.applicationId);
+      
this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token));
+    }
   }
 
   /**
@@ -304,7 +314,8 @@ public final class YarnSubmissionHelper implements 
Closeable {
   }
 
   @Override
-  public void close() throws IOException {
-    this.yarnClient.stop();
+  public void close() {
+    LOG.log(Level.FINE, "Closing YARN application: {0}", this.applicationId);
+    this.yarnClient.stop(); // same as yarnClient.close()
   }
 }

Reply via email to