Repository: incubator-reef Updated Branches: refs/heads/master d92ab9885 -> 3fdfdfdf6
[REEF-604] Support Hadoop security tokens in the YARN runtime This change addressed the issue by: * Creating SecurityTokenProvider which provides security tokens * Add default implementation of SecurityTokenProvider, reading security token from user credentials * Get tokens from SecurityTokenProvider and add the tokens ContainerLaunchContext during application and container creation. JIRA: [REEF-604](https://issues.apache.org/jira/browse/REEF-604) Pull Request: This closes #402 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/3fdfdfdf Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/3fdfdfdf Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/3fdfdfdf Branch: refs/heads/master Commit: 3fdfdfdf6bf6dd003d615bdbbd675afc5ae8b85c Parents: d92ab98 Author: Anupam <anupam...@gmail.com> Authored: Fri Aug 21 15:01:42 2015 -0700 Committer: Markus Weimer <wei...@apache.org> Committed: Mon Aug 24 13:49:29 2015 -0700 ---------------------------------------------------------------------- .../bridge/client/YarnJobSubmissionClient.java | 8 ++- .../yarn/client/SecurityTokenProvider.java | 34 ++++++++++++ .../UserCredentialSecurityTokenProvider.java | 58 ++++++++++++++++++++ .../yarn/client/YarnJobSubmissionHandler.java | 7 ++- .../yarn/client/YarnSubmissionHelper.java | 10 +++- .../yarn/driver/YARNResourceLaunchHandler.java | 10 +++- .../reef/runtime/yarn/util/YarnTypes.java | 13 ++++- 7 files changed, 129 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java index de8638b..3a4aa9a 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java @@ -30,6 +30,7 @@ import org.apache.reef.javabridge.generic.JobDriver; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper; import org.apache.reef.runtime.yarn.client.uploader.JobFolder; @@ -68,6 +69,7 @@ public final class YarnJobSubmissionClient { private final ClasspathProvider classpath; private final int maxApplicationSubmissions; private final boolean enableRestart; + private final SecurityTokenProvider tokenProvider; @Inject YarnJobSubmissionClient(final JobUploader uploader, @@ -78,7 +80,8 @@ public final class YarnJobSubmissionClient { @Parameter(MaxApplicationSubmissions.class) final int maxApplicationSubmissions, @Parameter(EnableRestart.class) - final boolean enableRestart) { + final boolean enableRestart, + final SecurityTokenProvider tokenProvider) { this.uploader = uploader; this.configurationSerializer = configurationSerializer; this.fileNames = fileNames; @@ -86,6 +89,7 @@ public final class YarnJobSubmissionClient { this.classpath = classpath; this.maxApplicationSubmissions = maxApplicationSubmissions; this.enableRestart = enableRestart; + this.tokenProvider = tokenProvider; } private Configuration addYarnDriverConfiguration(final File driverFolder, @@ -167,7 +171,7 @@ public final class YarnJobSubmissionClient { // ------------------------------------------------------------------------ // Get an application ID try (final YarnSubmissionHelper submissionHelper = - new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath)) { + new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, tokenProvider)) { // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/SecurityTokenProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/SecurityTokenProvider.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/SecurityTokenProvider.java new file mode 100644 index 0000000..37cfa76 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/SecurityTokenProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client; + +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * Provides security token for setting up YARN container context. + */ +@DefaultImplementation(UserCredentialSecurityTokenProvider.class) +public interface SecurityTokenProvider { + + /** + * Returns a ByteBuffer containing security tokens. + * @return a ByteBuffer + */ + byte[] getTokens(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/UserCredentialSecurityTokenProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/UserCredentialSecurityTokenProvider.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/UserCredentialSecurityTokenProvider.java new file mode 100644 index 0000000..5fde825 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/UserCredentialSecurityTokenProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client; + +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Reads security token from user credentials. + */ +final class UserCredentialSecurityTokenProvider implements SecurityTokenProvider { + + private static final Logger LOG = Logger.getLogger(UserCredentialSecurityTokenProvider.class.getName()); + + @Inject + private UserCredentialSecurityTokenProvider(){} + + @Override + public byte[] getTokens() { + try { + final UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + final Credentials credentials = ugi.getCredentials(); + if (credentials.numberOfTokens() > 0) { + try(final DataOutputBuffer dob = new DataOutputBuffer()) { + credentials.writeTokenStorageToStream(dob); + return dob.getData(); + } + } + } catch (IOException e) { + LOG.log(Level.WARNING, "Could not access tokens in user credentials.", e); + } + + LOG.log(Level.FINE, "No security token found."); + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java index ba35f41..3436424 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java @@ -61,6 +61,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { private final JobUploader uploader; private final double jvmSlack; private final String defaultQueueName; + private final SecurityTokenProvider tokenProvider; @Inject YarnJobSubmissionHandler( @@ -70,7 +71,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { final ClasspathProvider classpath, final JobUploader uploader, @Parameter(JVMHeapSlack.class) final double jvmSlack, - @Parameter(JobQueue.class) final String defaultQueueName) throws IOException { + @Parameter(JobQueue.class) final String defaultQueueName, + final SecurityTokenProvider tokenProvider) throws IOException { this.yarnConfiguration = yarnConfiguration; this.jobJarMaker = jobJarMaker; @@ -79,6 +81,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { this.uploader = uploader; this.jvmSlack = jvmSlack; this.defaultQueueName = defaultQueueName; + this.tokenProvider = tokenProvider; } @Override @@ -91,7 +94,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionEvent.getIdentifier()); try (final YarnSubmissionHelper submissionHelper = - new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, this.classpath)) { + new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, this.classpath, this.tokenProvider)) { LOG.log(Level.FINE, "Assembling submission JAR for the Driver."); final Optional<String> userBoundJobSubmissionDirectory = http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java index ca6a04f..a8ef709 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java @@ -53,12 +53,14 @@ public final class YarnSubmissionHelper implements Closeable{ private final Map<String, LocalResource> resources = new HashMap<>(); private final REEFFileNames fileNames; private final ClasspathProvider classpath; + private final SecurityTokenProvider tokenProvider; private boolean preserveEvaluators; private int maxAppSubmissions; public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, final REEFFileNames fileNames, - final ClasspathProvider classpath) throws IOException, YarnException { + final ClasspathProvider classpath, + final SecurityTokenProvider tokenProvider) throws IOException, YarnException { this.fileNames = fileNames; this.classpath = classpath; @@ -75,6 +77,7 @@ public final class YarnSubmissionHelper implements Closeable{ this.applicationId = applicationSubmissionContext.getApplicationId(); this.maxAppSubmissions = 1; this.preserveEvaluators = false; + this.tokenProvider = tokenProvider; LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId); } @@ -194,8 +197,9 @@ public final class YarnSubmissionHelper implements Closeable{ " since the max application submissions is 1. Proceeding to submit application..."); } - this.applicationSubmissionContext.setAMContainerSpec(YarnTypes.getContainerLaunchContext(launchCommand, - this.resources)); + final ContainerLaunchContext containerLaunchContext = YarnTypes.getContainerLaunchContext( + launchCommand, this.resources, tokenProvider.getTokens()); + this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext); LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", this.applicationId); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java index 551ef34..953090c 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java @@ -28,6 +28,7 @@ import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.parameters.JVMHeapSlack; +import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; import org.apache.reef.runtime.yarn.util.YarnTypes; import org.apache.reef.tang.InjectionFuture; import org.apache.reef.tang.annotations.Parameter; @@ -50,19 +51,22 @@ public final class YARNResourceLaunchHandler implements ResourceLaunchHandler { private final EvaluatorSetupHelper evaluatorSetupHelper; private final REEFFileNames filenames; private final double jvmHeapFactor; + private final SecurityTokenProvider tokenProvider; @Inject YARNResourceLaunchHandler(final Containers containers, final InjectionFuture<YarnContainerManager> yarnContainerManager, final EvaluatorSetupHelper evaluatorSetupHelper, final REEFFileNames filenames, - @Parameter(JVMHeapSlack.class) final double jvmHeapSlack) { + @Parameter(JVMHeapSlack.class) final double jvmHeapSlack, + final SecurityTokenProvider tokenProvider) { this.jvmHeapFactor = 1.0 - jvmHeapSlack; LOG.log(Level.FINEST, "Instantiating 'YARNResourceLaunchHandler'"); this.containers = containers; this.yarnContainerManager = yarnContainerManager; this.evaluatorSetupHelper = evaluatorSetupHelper; this.filenames = filenames; + this.tokenProvider = tokenProvider; LOG.log(Level.FINE, "Instantiated 'YARNResourceLaunchHandler'"); } @@ -84,7 +88,9 @@ public final class YARNResourceLaunchHandler implements ResourceLaunchHandler { new Object[]{containerId, StringUtils.join(command, ' '), localResources}); } - final ContainerLaunchContext ctx = YarnTypes.getContainerLaunchContext(command, localResources); + final byte[] securityTokensBuffer = this.tokenProvider.getTokens(); + final ContainerLaunchContext ctx = YarnTypes.getContainerLaunchContext( + command, localResources, securityTokensBuffer); this.yarnContainerManager.get().submit(container, ctx); LOG.log(Level.FINEST, "TIME: End ResourceLaunch {0}", containerId); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3fdfdfdf/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java index 48347be..ef1ff27 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java @@ -24,8 +24,11 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.util.Records; import org.apache.reef.annotations.audience.Private; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Helper class that creates the various records in the YARN API. @@ -35,7 +38,7 @@ public final class YarnTypes { // TODO[REEF-537]: Remove once the hadoop version is updated. public static final String MIN_VERSION_KEEP_CONTAINERS_AVAILABLE = "2.4.0"; - + private static final Logger LOG = Logger.getLogger(YarnTypes.class.getName()); private YarnTypes() { } @@ -43,10 +46,16 @@ public final class YarnTypes { * @return a ContainerLaunchContext with the given commands and LocalResources. */ public static ContainerLaunchContext getContainerLaunchContext( - final List<String> commands, final Map<String, LocalResource> localResources) { + final List<String> commands, + final Map<String, LocalResource> localResources, + final byte[] securityTokenBuffer) { final ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class); context.setLocalResources(localResources); context.setCommands(commands); + if (securityTokenBuffer != null) { + context.setTokens(ByteBuffer.wrap(securityTokenBuffer)); + LOG.log(Level.INFO, "Added tokens to container launch context"); + } return context; }