Repository: incubator-reef
Updated Branches:
  refs/heads/master d92ab9885 -> 3fdfdfdf6


[REEF-604] Support Hadoop security tokens in the YARN runtime

This change addressed the issue by:
 * Creating SecurityTokenProvider which provides security tokens
 * Add default implementation of SecurityTokenProvider, reading security
   token from user credentials
 * Get tokens from SecurityTokenProvider and add the tokens
   ContainerLaunchContext during application and container creation.

JIRA:
 [REEF-604](https://issues.apache.org/jira/browse/REEF-604)

Pull Request:
  This closes #402


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/3fdfdfdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/3fdfdfdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/3fdfdfdf

Branch: refs/heads/master
Commit: 3fdfdfdf6bf6dd003d615bdbbd675afc5ae8b85c
Parents: d92ab98
Author: Anupam <anupam...@gmail.com>
Authored: Fri Aug 21 15:01:42 2015 -0700
Committer: Markus Weimer <wei...@apache.org>
Committed: Mon Aug 24 13:49:29 2015 -0700

----------------------------------------------------------------------
 .../bridge/client/YarnJobSubmissionClient.java  |  8 ++-
 .../yarn/client/SecurityTokenProvider.java      | 34 ++++++++++++
 .../UserCredentialSecurityTokenProvider.java    | 58 ++++++++++++++++++++
 .../yarn/client/YarnJobSubmissionHandler.java   |  7 ++-
 .../yarn/client/YarnSubmissionHelper.java       | 10 +++-
 .../yarn/driver/YARNResourceLaunchHandler.java  | 10 +++-
 .../reef/runtime/yarn/util/YarnTypes.java       | 13 ++++-
 7 files changed, 129 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index de8638b..3a4aa9a 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -30,6 +30,7 @@ import org.apache.reef.javabridge.generic.JobDriver;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
 import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
 import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
 import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
@@ -68,6 +69,7 @@ public final class YarnJobSubmissionClient {
   private final ClasspathProvider classpath;
   private final int maxApplicationSubmissions;
   private final boolean enableRestart;
+  private final SecurityTokenProvider tokenProvider;
 
   @Inject
   YarnJobSubmissionClient(final JobUploader uploader,
@@ -78,7 +80,8 @@ public final class YarnJobSubmissionClient {
                           @Parameter(MaxApplicationSubmissions.class)
                           final int maxApplicationSubmissions,
                           @Parameter(EnableRestart.class)
-                          final boolean enableRestart) {
+                          final boolean enableRestart,
+                          final SecurityTokenProvider tokenProvider) {
     this.uploader = uploader;
     this.configurationSerializer = configurationSerializer;
     this.fileNames = fileNames;
@@ -86,6 +89,7 @@ public final class YarnJobSubmissionClient {
     this.classpath = classpath;
     this.maxApplicationSubmissions = maxApplicationSubmissions;
     this.enableRestart = enableRestart;
+    this.tokenProvider = tokenProvider;
   }
 
   private Configuration addYarnDriverConfiguration(final File driverFolder,
@@ -167,7 +171,7 @@ public final class YarnJobSubmissionClient {
     // ------------------------------------------------------------------------
     // Get an application ID
     try (final YarnSubmissionHelper submissionHelper =
-             new YarnSubmissionHelper(yarnConfiguration, fileNames, 
classpath)) {
+             new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, 
tokenProvider)) {
 
 
       // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/SecurityTokenProvider.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/SecurityTokenProvider.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/SecurityTokenProvider.java
new file mode 100644
index 0000000..37cfa76
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/SecurityTokenProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * Provides security token for setting up YARN container context.
+ */
+@DefaultImplementation(UserCredentialSecurityTokenProvider.class)
+public interface SecurityTokenProvider {
+
+  /**
+   * Returns a ByteBuffer containing security tokens.
+   * @return a ByteBuffer
+   */
+  byte[] getTokens();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/UserCredentialSecurityTokenProvider.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/UserCredentialSecurityTokenProvider.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/UserCredentialSecurityTokenProvider.java
new file mode 100644
index 0000000..5fde825
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/UserCredentialSecurityTokenProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Reads security token from user credentials.
+ */
+final class UserCredentialSecurityTokenProvider implements 
SecurityTokenProvider {
+
+  private static final Logger LOG = 
Logger.getLogger(UserCredentialSecurityTokenProvider.class.getName());
+
+  @Inject
+  private UserCredentialSecurityTokenProvider(){}
+
+  @Override
+  public byte[] getTokens() {
+    try {
+      final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      final Credentials credentials = ugi.getCredentials();
+      if (credentials.numberOfTokens() > 0) {
+        try(final DataOutputBuffer dob = new DataOutputBuffer()) {
+          credentials.writeTokenStorageToStream(dob);
+          return dob.getData();
+        }
+      }
+    } catch (IOException e) {
+      LOG.log(Level.WARNING, "Could not access tokens in user credentials.", 
e);
+    }
+
+    LOG.log(Level.FINE, "No security token found.");
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index ba35f41..3436424 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -61,6 +61,7 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
   private final JobUploader uploader;
   private final double jvmSlack;
   private final String defaultQueueName;
+  private final SecurityTokenProvider tokenProvider;
 
   @Inject
   YarnJobSubmissionHandler(
@@ -70,7 +71,8 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
       final ClasspathProvider classpath,
       final JobUploader uploader,
       @Parameter(JVMHeapSlack.class) final double jvmSlack,
-      @Parameter(JobQueue.class) final String defaultQueueName) throws 
IOException {
+      @Parameter(JobQueue.class) final String defaultQueueName,
+      final SecurityTokenProvider tokenProvider) throws IOException {
 
     this.yarnConfiguration = yarnConfiguration;
     this.jobJarMaker = jobJarMaker;
@@ -79,6 +81,7 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
     this.uploader = uploader;
     this.jvmSlack = jvmSlack;
     this.defaultQueueName = defaultQueueName;
+    this.tokenProvider = tokenProvider;
   }
 
   @Override
@@ -91,7 +94,7 @@ final class YarnJobSubmissionHandler implements 
JobSubmissionHandler {
     LOG.log(Level.FINEST, "Submitting job with ID [{0}]", 
jobSubmissionEvent.getIdentifier());
 
     try (final YarnSubmissionHelper submissionHelper =
-             new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, 
this.classpath)) {
+             new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, 
this.classpath, this.tokenProvider)) {
 
       LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
       final Optional<String> userBoundJobSubmissionDirectory =

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index ca6a04f..a8ef709 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -53,12 +53,14 @@ public final class YarnSubmissionHelper implements 
Closeable{
   private final Map<String, LocalResource> resources = new HashMap<>();
   private final REEFFileNames fileNames;
   private final ClasspathProvider classpath;
+  private final SecurityTokenProvider tokenProvider;
   private boolean preserveEvaluators;
   private int maxAppSubmissions;
 
   public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
                               final REEFFileNames fileNames,
-                              final ClasspathProvider classpath) throws 
IOException, YarnException {
+                              final ClasspathProvider classpath,
+                              final SecurityTokenProvider tokenProvider) 
throws IOException, YarnException {
     this.fileNames = fileNames;
     this.classpath = classpath;
 
@@ -75,6 +77,7 @@ public final class YarnSubmissionHelper implements Closeable{
     this.applicationId = applicationSubmissionContext.getApplicationId();
     this.maxAppSubmissions = 1;
     this.preserveEvaluators = false;
+    this.tokenProvider = tokenProvider;
     LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
   }
 
@@ -194,8 +197,9 @@ public final class YarnSubmissionHelper implements 
Closeable{
           " since the max application submissions is 1. Proceeding to submit 
application...");
     }
 
-    
this.applicationSubmissionContext.setAMContainerSpec(YarnTypes.getContainerLaunchContext(launchCommand,
-        this.resources));
+    final ContainerLaunchContext containerLaunchContext = 
YarnTypes.getContainerLaunchContext(
+        launchCommand, this.resources, tokenProvider.getTokens());
+    
this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
 
     LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", 
this.applicationId);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
index 551ef34..953090c 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
@@ -28,6 +28,7 @@ import 
org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
 import org.apache.reef.runtime.yarn.util.YarnTypes;
 import org.apache.reef.tang.InjectionFuture;
 import org.apache.reef.tang.annotations.Parameter;
@@ -50,19 +51,22 @@ public final class YARNResourceLaunchHandler implements 
ResourceLaunchHandler {
   private final EvaluatorSetupHelper evaluatorSetupHelper;
   private final REEFFileNames filenames;
   private final double jvmHeapFactor;
+  private final SecurityTokenProvider tokenProvider;
 
   @Inject
   YARNResourceLaunchHandler(final Containers containers,
                             final InjectionFuture<YarnContainerManager> 
yarnContainerManager,
                             final EvaluatorSetupHelper evaluatorSetupHelper,
                             final REEFFileNames filenames,
-                            @Parameter(JVMHeapSlack.class) final double 
jvmHeapSlack) {
+                            @Parameter(JVMHeapSlack.class) final double 
jvmHeapSlack,
+                            final SecurityTokenProvider tokenProvider) {
     this.jvmHeapFactor = 1.0 - jvmHeapSlack;
     LOG.log(Level.FINEST, "Instantiating 'YARNResourceLaunchHandler'");
     this.containers = containers;
     this.yarnContainerManager = yarnContainerManager;
     this.evaluatorSetupHelper = evaluatorSetupHelper;
     this.filenames = filenames;
+    this.tokenProvider = tokenProvider;
     LOG.log(Level.FINE, "Instantiated 'YARNResourceLaunchHandler'");
   }
 
@@ -84,7 +88,9 @@ public final class YARNResourceLaunchHandler implements 
ResourceLaunchHandler {
             new Object[]{containerId, StringUtils.join(command, ' '), 
localResources});
       }
 
-      final ContainerLaunchContext ctx = 
YarnTypes.getContainerLaunchContext(command, localResources);
+      final byte[] securityTokensBuffer = this.tokenProvider.getTokens();
+      final ContainerLaunchContext ctx = YarnTypes.getContainerLaunchContext(
+          command, localResources, securityTokensBuffer);
       this.yarnContainerManager.get().submit(container, ctx);
 
       LOG.log(Level.FINEST, "TIME: End ResourceLaunch {0}", containerId);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
index 48347be..ef1ff27 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
@@ -24,8 +24,11 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.reef.annotations.audience.Private;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Helper class that creates the various records in the YARN API.
@@ -35,7 +38,7 @@ public final class YarnTypes {
 
   // TODO[REEF-537]: Remove once the hadoop version is updated.
   public static final String MIN_VERSION_KEEP_CONTAINERS_AVAILABLE = "2.4.0";
-
+  private static final Logger LOG = 
Logger.getLogger(YarnTypes.class.getName());
   private YarnTypes() {
   }
 
@@ -43,10 +46,16 @@ public final class YarnTypes {
    * @return a ContainerLaunchContext with the given commands and 
LocalResources.
    */
   public static ContainerLaunchContext getContainerLaunchContext(
-      final List<String> commands, final Map<String, LocalResource> 
localResources) {
+      final List<String> commands,
+      final Map<String, LocalResource> localResources,
+      final byte[] securityTokenBuffer) {
     final ContainerLaunchContext context = 
Records.newRecord(ContainerLaunchContext.class);
     context.setLocalResources(localResources);
     context.setCommands(commands);
+    if (securityTokenBuffer != null) {
+      context.setTokens(ByteBuffer.wrap(securityTokenBuffer));
+      LOG.log(Level.INFO, "Added tokens to container launch context");
+    }
     return context;
   }
 

Reply via email to