SLIDER-1077: improve oozie/credential support. This isn't quite right yet
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5dfac853 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5dfac853 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5dfac853 Branch: refs/heads/feature/SLIDER-906_docker_support Commit: 5dfac853256ef4a3cf8f0a5fec003cdc4848f7f9 Parents: 8d4da2d Author: Steve Loughran <ste...@apache.org> Authored: Fri Jan 29 20:06:00 2016 +0000 Committer: Steve Loughran <ste...@apache.org> Committed: Fri Jan 29 20:06:00 2016 +0000 ---------------------------------------------------------------------- .../org/apache/slider/client/SliderClient.java | 21 +- .../slider/client/SliderYarnClientImpl.java | 2 +- .../slider/core/launch/AbstractLauncher.java | 53 ++-- .../slider/core/launch/AppMasterLauncher.java | 68 +++-- .../slider/core/launch/ContainerLauncher.java | 8 + .../slider/core/launch/CredentialUtils.java | 251 +++++++++++++++++++ .../server/appmaster/RoleLaunchService.java | 21 +- .../server/appmaster/SliderAppMaster.java | 101 ++++---- .../core/launch/TestAppMasterLauncher.java | 6 +- .../TestAppMasterLauncherWithAmReset.java | 4 +- 10 files changed, 415 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java index aa19a3b..5f694e2 100644 --- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -44,6 +44,7 @@ import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.RegistryPathStatus; import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.KerberosDiags; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.alias.CredentialProvider; @@ -131,6 +132,7 @@ import org.apache.slider.core.exceptions.UsageException; import org.apache.slider.core.exceptions.WaitTimeoutException; import org.apache.slider.core.launch.AppMasterLauncher; import org.apache.slider.core.launch.ClasspathConstructor; +import org.apache.slider.core.launch.CredentialUtils; import org.apache.slider.core.launch.JavaCommandLineBuilder; import org.apache.slider.core.launch.LaunchedApplication; import org.apache.slider.core.launch.RunningApplication; @@ -1909,6 +1911,22 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe // add the tags if available Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem, getApplicationDefinitionPath(appOperations)); + + Credentials credentials = null; + if (clusterSecure) { + // pick up oozie credentials + credentials = CredentialUtils.loadFromEnvironment( + System.getenv(), config); + if (credentials == null) { + // nothing from oozie, so build up directly + credentials = new Credentials( + UserGroupInformation.getCurrentUser().getCredentials()); + CredentialUtils.addRMRenewableFSDelegationTokens(config, + sliderFileSystem.getFileSystem(), + credentials); + } + } + AppMasterLauncher amLauncher = new AppMasterLauncher(clustername, SliderKeys.APP_TYPE, config, @@ -1917,7 +1935,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe clusterSecure, sliderAMResourceComponent, resourceGlobalOptions, - applicationTags); + applicationTags, + credentials); ApplicationId appId = amLauncher.getApplicationId(); // set the application name; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java b/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java index 85a582b..d471cdb 100644 --- a/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java +++ b/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java @@ -190,7 +190,7 @@ public class SliderYarnClientImpl extends YarnClientImpl { } /** - * Force kill a yarn application by ID. No niceities here + * Force kill a yarn application by ID. No niceties here * @param applicationId app Id. "all" means "kill all instances of the current user * */ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java index 22bf328..f92ffb1 100644 --- a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java +++ b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java @@ -24,7 +24,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -68,25 +67,39 @@ public abstract class AbstractLauncher extends Configured { * Env vars; set up at final launch stage */ protected final Map<String, String> envVars = new HashMap<>(); + protected final MapOperations env = new MapOperations("env", envVars); protected final ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class); protected final List<String> commands = new ArrayList<>(20); protected final Map<String, LocalResource> localResources = new HashMap<>(); private final Map<String, ByteBuffer> serviceData = new HashMap<>(); + // security - protected final Credentials credentials = new Credentials(); + protected final Credentials credentials; protected LogAggregationContext logAggregationContext; + /** + * Create instance + * @param conf configuration + * @param coreFileSystem filesystem + * @param credentials initial set of credentials -null is permitted + */ + protected AbstractLauncher(Configuration conf, + CoreFileSystem coreFileSystem, + Credentials credentials) { + super(conf); + this.coreFileSystem = coreFileSystem; + this.credentials = credentials != null ? credentials: new Credentials(); + } protected AbstractLauncher(Configuration conf, CoreFileSystem fs) { - super(conf); - this.coreFileSystem = fs; + this(conf, fs, null); } protected AbstractLauncher(CoreFileSystem fs) { - this.coreFileSystem = fs; + this(null, fs, null); } /** @@ -133,7 +146,6 @@ public abstract class AbstractLauncher extends Configured { localResources.putAll(resourceMap); } - public Map<String, ByteBuffer> getServiceData() { return serviceData; } @@ -169,7 +181,7 @@ public abstract class AbstractLauncher extends Configured { /** * Get all commands as a string, separated by ";". This is for diagnostics - * @return a string descriptionof the commands + * @return a string description of the commands */ public String getCommandsAsString() { return SliderUtils.join(getCommands(), "; "); @@ -194,7 +206,7 @@ public abstract class AbstractLauncher extends Configured { } } containerLaunchContext.setEnvironment(env); - + //service data if (log.isDebugEnabled()) { log.debug("Service Data size"); @@ -211,21 +223,8 @@ public abstract class AbstractLauncher extends Configured { //tokens log.debug("{} tokens", credentials.numberOfTokens()); - DataOutputBuffer dob = new DataOutputBuffer(); - String tokenFileName = - this.getConf().get(MAPREDUCE_JOB_CREDENTIALS_BINARY); - if (tokenFileName != null) { - // use delegation tokens, i.e. from Oozie - Credentials creds = - Credentials.readTokenStorageFile(new File(tokenFileName), getConf()); - creds.writeTokenStorageToStream(dob); - } else { - // normal auth - credentials.writeTokenStorageToStream(dob); - } - - ByteBuffer tokenBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - containerLaunchContext.setTokens(tokenBuffer); + containerLaunchContext.setTokens(CredentialUtils.marshallCredentials( + credentials)); return containerLaunchContext; } @@ -282,7 +281,7 @@ public abstract class AbstractLauncher extends Configured { /** * Extract the value for option - * yarn.resourcemanager.am.retry-count-window-ms + * {@code yarn.resourcemanager.am.retry-count-window-ms} * and set it on the ApplicationSubmissionContext. Use the default value * if option is not set. * @@ -433,7 +432,7 @@ public abstract class AbstractLauncher extends Configured { public String[] dumpEnvToString() { - List<String> nodeEnv = new ArrayList<String>(); + List<String> nodeEnv = new ArrayList<>(); for (Map.Entry<String, String> entry : env.entrySet()) { String envElt = String.format("%s=\"%s\"", @@ -453,8 +452,8 @@ public abstract class AbstractLauncher extends Configured { * @param destRelativeDir relative path under destination local dir * @throws IOException IO problems */ - public void submitDirectory(Path srcDir, String destRelativeDir) throws - IOException { + public void submitDirectory(Path srcDir, String destRelativeDir) + throws IOException { //add the configuration resources Map<String, LocalResource> confResources; confResources = coreFileSystem.submitDirectory( http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java index c82affa..091b80e 100644 --- a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java +++ b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java @@ -20,7 +20,7 @@ package org.apache.slider.core.launch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; @@ -34,12 +34,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; import org.apache.slider.client.SliderYarnClientImpl; import org.apache.slider.common.tools.CoreFileSystem; -import org.apache.slider.common.tools.SliderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.InetAddress; import java.text.DateFormat; import java.util.Date; import java.util.Map; @@ -76,20 +74,21 @@ public class AppMasterLauncher extends AbstractLauncher { * @param options map of options. All values are extracted in this constructor only * @param resourceGlobalOptions global options * @param applicationTags any app tags + * @param credentials initial set of credentials * @throws IOException * @throws YarnException */ public AppMasterLauncher(String name, - String type, - Configuration conf, - CoreFileSystem fs, - SliderYarnClientImpl yarnClient, - boolean secureCluster, - Map<String, String> options, - Map<String, String> resourceGlobalOptions, - Set<String> applicationTags - ) throws IOException, YarnException { - super(conf, fs); + String type, + Configuration conf, + CoreFileSystem fs, + SliderYarnClientImpl yarnClient, + boolean secureCluster, + Map<String, String> options, + Map<String, String> resourceGlobalOptions, + Set<String> applicationTags, + Credentials credentials) throws IOException, YarnException { + super(conf, fs, credentials); this.yarnClient = yarnClient; this.application = yarnClient.createApplication(); this.name = name; @@ -165,10 +164,8 @@ public class AppMasterLauncher extends AbstractLauncher { * Complete the launch context (copy in env vars, etc). * @return the container to launch */ - public ApplicationSubmissionContext completeAppMasterLaunch() throws - IOException { - - + public ApplicationSubmissionContext completeAppMasterLaunch() + throws IOException { //queue priority Priority pri = Records.newRecord(Priority.class); @@ -196,6 +193,7 @@ public class AppMasterLauncher extends AbstractLauncher { } if (secureCluster) { + //tokens addSecurityTokens(); } else { propagateUsernameInInsecureCluster(); @@ -211,42 +209,40 @@ public class AppMasterLauncher extends AbstractLauncher { */ private void addSecurityTokens() throws IOException { - String tokenRenewer = SecurityUtil.getServerPrincipal( - getConf().get(YarnConfiguration.RM_PRINCIPAL), - InetAddress.getLocalHost().getCanonicalHostName()); - if (SliderUtils.isUnset(tokenRenewer)) { - throw new IOException( - "Can't get Master Kerberos principal for the RM to use as renewer: " - + YarnConfiguration.RM_PRINCIPAL - ); - } + CredentialUtils.addRMRenewableFSDelegationTokens(getConf(), + coreFileSystem.getFileSystem(), credentials); + + String tokenRenewer = CredentialUtils.getRMPrincipal(getConf()); Token<? extends TokenIdentifier>[] tokens = null; - boolean tokensProvided = getConf().get(MAPREDUCE_JOB_CREDENTIALS_BINARY) != null; + boolean tokensProvided = getConf().get(MAPREDUCE_JOB_CREDENTIALS_BINARY) != + null; if (!tokensProvided) { - // For now, only getting tokens for the default file-system. - FileSystem fs = coreFileSystem.getFileSystem(); - tokens = fs.addDelegationTokens(tokenRenewer, credentials); + // For now, only getting tokens for the default file-system. + FileSystem fs = coreFileSystem.getFileSystem(); + tokens = fs.addDelegationTokens(tokenRenewer, credentials); } // obtain the token expiry from the first token - should be the same for all // HDFS tokens if (tokens != null && tokens.length > 0) { AbstractDelegationTokenIdentifier id = - (AbstractDelegationTokenIdentifier)tokens[0].decodeIdentifier(); + (AbstractDelegationTokenIdentifier) tokens[0].decodeIdentifier(); Date d = new Date(id.getIssueDate() + 24 * 60 * 60 * 1000); - log.info("HDFS delegation tokens for AM launch context require renewal by {}", - DateFormat.getDateTimeInstance().format(d)); + log.info( + "HDFS delegation tokens for AM launch context require renewal by {}", + DateFormat.getDateTimeInstance().format(d)); } else { if (!tokensProvided) { log.warn("No HDFS delegation tokens obtained for AM launch context"); } else { - log.info("Tokens provided via "+ MAPREDUCE_JOB_CREDENTIALS_BINARY +" property " - + "being used for AM launch"); + log.info("Tokens provided via " + MAPREDUCE_JOB_CREDENTIALS_BINARY + + " property " + + "being used for AM launch"); } } - } + } /** * Submit the application. http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java index 69b937d..e586743 100644 --- a/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java +++ b/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java @@ -20,6 +20,7 @@ package org.apache.slider.core.launch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.Container; @@ -41,6 +42,13 @@ public class ContainerLauncher extends AbstractLauncher { public final Container container; public ContainerLauncher(Configuration conf, + CoreFileSystem coreFileSystem, + Container container, Credentials credentials) { + super(conf, coreFileSystem, credentials); + this.container = container; + } + + public ContainerLauncher(Configuration conf, CoreFileSystem fs, Container container) { super(conf, fs); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/core/launch/CredentialUtils.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/CredentialUtils.java b/slider-core/src/main/java/org/apache/slider/core/launch/CredentialUtils.java new file mode 100644 index 0000000..32068e2 --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/core/launch/CredentialUtils.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.launch; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.DateFormat; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; + +/** + * Utils to work with credentials and tokens. + * + * Designed to be movable to Hadoop core + */ +public final class CredentialUtils { + + private CredentialUtils() { + } + + private static final Logger LOG = + LoggerFactory.getLogger(CredentialUtils.class); + + /** + * Save credentials to a byte buffer. Returns null if there were no + * credentials to save + * @param credentials credential set + * @return a byte buffer of serialized tokens + * @throws IOException if the credentials could not be written to the stream + */ + public static ByteBuffer marshallCredentials(Credentials credentials) throws IOException { + ByteBuffer buffer = null; + if (!credentials.getAllTokens().isEmpty()) { + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + dob.close(); + buffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + return buffer; + } + + /** + * Load the credentials from the environment. This looks at + * the value of {@link UserGroupInformation#HADOOP_TOKEN_FILE_LOCATION} + * and attempts to read in the value + * @param env environment to resolve the variable from + * @param conf configuration use when reading the tokens + * @return a set of credentials, or null if the environment did not + * specify any + * @throws IOException if a location for credentials was defined, but + * the credentials could not be loaded. + */ + public static Credentials loadFromEnvironment(Map<String, String> env, + Configuration conf) + throws IOException { + String tokenFilename = env.get(HADOOP_TOKEN_FILE_LOCATION); + if (tokenFilename != null) { + // use delegation tokens, i.e. from Oozie + File file = new File(tokenFilename.trim()); + String details = String.format("Token File %s from environment variable %s", + file, + HADOOP_TOKEN_FILE_LOCATION); + LOG.debug("Using {}", details); + if (!file.exists()) { + throw new FileNotFoundException("No " + details); + } + if (!file.isFile() && !file.canRead()) { + throw new IOException("Cannot read " + details); + } + Credentials creds = Credentials.readTokenStorageFile(file, conf); + return creds; + } else { + return null; + } + } + + /** + * Look up and return the resource manager's principal. This method + * automatically does the <code>_HOST</code> replacement in the principal and + * correctly handles HA resource manager configurations. + * + * From: YARN-4629 + * @param conf the {@link Configuration} file from which to read the + * principal + * @return the resource manager's principal string + * @throws IOException thrown if there's an error replacing the host name + */ + public static String getRMPrincipal(Configuration conf) throws IOException { + String principal = conf.get(RM_PRINCIPAL, ""); + String hostname; + Preconditions.checkState(!principal.isEmpty(), "Not set: " + RM_PRINCIPAL); + + if (HAUtil.isHAEnabled(conf)) { + YarnConfiguration yarnConf = new YarnConfiguration(conf); + if (yarnConf.get(RM_HA_ID) == null) { + // If RM_HA_ID is not configured, use the first of RM_HA_IDS. + // Any valid RM HA ID should work. + String[] rmIds = yarnConf.getStrings(RM_HA_IDS); + Preconditions.checkState((rmIds != null) && (rmIds.length > 0), + "Not set " + RM_HA_IDS); + yarnConf.set(RM_HA_ID, rmIds[0]); + } + + hostname = yarnConf.getSocketAddr( + RM_ADDRESS, + DEFAULT_RM_ADDRESS, + DEFAULT_RM_PORT).getHostName(); + } else { + hostname = conf.getSocketAddr( + RM_ADDRESS, + DEFAULT_RM_ADDRESS, + DEFAULT_RM_PORT).getHostName(); + } + return SecurityUtil.getServerPrincipal(principal, hostname); + } + + /** + * Create and add any filesystem delegation tokens with + * the RM(s) configured to be able to renew them. Returns null + * on an insecure cluster (i.e. harmless) + * @param conf configuration + * @param fs filesystem + * @param credentials credentials to update + * @return a list of all added tokens. + * @throws IOException + */ + public static Token<?>[] addRMRenewableFSDelegationTokens(Configuration conf, + FileSystem fs, + Credentials credentials) throws IOException { + Preconditions.checkArgument(conf != null); + Preconditions.checkArgument(credentials != null); + if (UserGroupInformation.isSecurityEnabled()) { + String tokenRenewer = CredentialUtils.getRMPrincipal(conf); + Token<? extends TokenIdentifier>[] tokens = null; + return fs.addDelegationTokens(tokenRenewer, credentials); + } + return null; + } + + /** + * Add an FS delegation token which can be renewed by the current user + * @param fs filesystem + * @param credentials credentials to update + * @throws IOException problems. + */ + public static void addSelfRenewableFSDelegationTokens( + FileSystem fs, + Credentials credentials) throws IOException { + Preconditions.checkArgument(fs != null); + Preconditions.checkArgument(credentials != null); + fs.addDelegationTokens( + UserGroupInformation.getLoginUser().getShortUserName(), + credentials); + } + + /** + * Filter a list of tokens from a set of credentials + * @param credentials credential source (a new credential set os re + * @param filter List of tokens to strip out + * @return a new, filtered, set of credentials + */ + public static Credentials filterTokens(Credentials credentials, + List<Text> filter) { + Credentials result = new Credentials(credentials); + Iterator<Token<? extends TokenIdentifier>> iter = + result.getAllTokens().iterator(); + while (iter.hasNext()) { + Token<? extends TokenIdentifier> token = iter.next(); + LOG.debug("Token {}", token.getKind()); + if (filter.contains(token.getKind())) { + LOG.debug("Filtering token {}", token.getKind()); + iter.remove(); + } + } + return result; + } + + public static String dumpTokens(Credentials credentials, String separator) { + Collection<Token<? extends TokenIdentifier>> allTokens + = credentials.getAllTokens(); + StringBuilder buffer = new StringBuilder(allTokens.size()* 128); + DateFormat df = DateFormat.getDateTimeInstance( + DateFormat.SHORT, DateFormat.SHORT); + for (Token<? extends TokenIdentifier> token : allTokens) { + buffer.append(toString(token)).append(separator); + } + return buffer.toString(); + } + + public static String toString(Token<? extends TokenIdentifier> token) { + DateFormat df = DateFormat.getDateTimeInstance( + DateFormat.SHORT, DateFormat.SHORT); + StringBuilder buffer = new StringBuilder(128); + buffer.append(token.toString()); + try { + TokenIdentifier ti = token.decodeIdentifier(); + buffer.append("; ").append(ti); + if (ti instanceof AbstractDelegationTokenIdentifier) { + AbstractDelegationTokenIdentifier dt + = (AbstractDelegationTokenIdentifier) ti; + buffer.append(" Issued: ") + .append(df.format(new Date(dt.getIssueDate()))); + buffer.append(" Max Date: ") + .append(df.format(new Date(dt.getMaxDate()))); + } + } catch (IOException e) { + LOG.debug("Failed to decode {}: {}", token, e, e); + } + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java index d4f2fd5..7515c1a 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java @@ -21,6 +21,7 @@ package org.apache.slider.server.appmaster; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.slider.common.SliderKeys; @@ -63,7 +64,7 @@ public class RoleLaunchService private final QueueAccess actionQueue; /** - * Provider bulding up the command + * Provider building up the command */ private final ProviderService provider; @@ -122,23 +123,26 @@ public class RoleLaunchService * @param container container target * @param role role * @param clusterSpec cluster spec to use for template + * @param credentials credentials to use */ public void launchRole(ContainerAssignment assignment, - AggregateConf clusterSpec) { + AggregateConf clusterSpec, + Credentials credentials) { RoleStatus role = assignment.role; String roleName = role.getName(); // prelaunch safety check Preconditions.checkArgument(provider.isSupportedRole(roleName)); RoleLaunchService.RoleLauncher launcher = new RoleLaunchService.RoleLauncher(assignment, - clusterSpec, + clusterSpec, clusterSpec.getResourceOperations().getOrAddComponent(roleName), - clusterSpec.getAppConfOperations().getOrAddComponent(roleName)); + clusterSpec.getAppConfOperations().getOrAddComponent(roleName), + credentials); execute(launcher); } /** - * Thread that runs on the AM to launch a region server. + * Thread that runs on the AM to launch a container */ private class RoleLauncher implements Runnable { @@ -150,13 +154,16 @@ public class RoleLaunchService private final MapOperations appComponent; private final AggregateConf instanceDefinition; public final ProviderRole role; + private final Credentials credentials; private Exception raisedException; public RoleLauncher(ContainerAssignment assignment, AggregateConf instanceDefinition, MapOperations resourceComponent, - MapOperations appComponent) { + MapOperations appComponent, + Credentials credentials) { this.assignment = assignment; + this.credentials = credentials; this.container = assignment.container; RoleStatus roleStatus = assignment.role; @@ -187,7 +194,7 @@ public class RoleLaunchService public void run() { try { ContainerLauncher containerLauncher = - new ContainerLauncher(getConfig(), fs, container); + new ContainerLauncher(getConfig(), fs, container, credentials); containerLauncher.setupUGI(); containerLauncher.putEnv(envVars); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 729d46e..82c9fb9 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -20,9 +20,6 @@ package org.apache.slider.server.appmaster; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.health.HealthCheckRegistry; -import com.codahale.metrics.jvm.GarbageCollectorMetricSet; -import com.codahale.metrics.jvm.MemoryUsageGaugeSet; -import com.codahale.metrics.jvm.ThreadStatesGaugeSet; import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; @@ -35,7 +32,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.http.HttpConfig; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils; @@ -110,6 +106,7 @@ import org.apache.slider.core.exceptions.BadConfigException; import org.apache.slider.core.exceptions.SliderException; import org.apache.slider.core.exceptions.SliderInternalStateException; import org.apache.slider.core.exceptions.TriggerClusterTeardownException; +import org.apache.slider.core.launch.CredentialUtils; import org.apache.slider.core.main.ExitCodeProvider; import org.apache.slider.core.main.LauncherExitCodes; import org.apache.slider.core.main.RunService; @@ -257,7 +254,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService public NMClientAsync nmClientAsync; /** - * token blob + * Credentials for propagating down to launched containers */ private Credentials containerCredentials; @@ -698,7 +695,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /* * Extract the container ID. This is then - * turned into an (incompete) container + * turned into an (incomplete) container */ appMasterContainerID = ConverterUtils.toContainerId( SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name())); @@ -1117,35 +1114,29 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } } - private void processAMCredentials(SecurityConfiguration securityConfiguration) + /** + * Process the initial user to obtain the set of user + * supplied credentials (tokens were passed in by client). + * Removes the AM/RM token. + * If a keytab has been provided, also strip the HDFS delegation token. + * @param securityConfig slider security config + * @throws IOException + */ + private void processAMCredentials(SecurityConfiguration securityConfig) throws IOException { - // process the initial user to obtain the set of user - // supplied credentials (tokens were passed in by client). Remove AMRM - // token and HDFS delegation token, the latter because we will provide an - // up to date token for container launches (getContainerCredentials()). - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - Credentials credentials = currentUser.getCredentials(); - List<Text> filteredTokens = new ArrayList<>(); + + List<Text> filteredTokens = new ArrayList<>(2); filteredTokens.add(AMRMTokenIdentifier.KIND_NAME); - boolean keytabProvided = securityConfiguration.isKeytabProvided(); + boolean keytabProvided = securityConfig.isKeytabProvided(); log.info("Slider AM Security Mode: {}", keytabProvided ? "KEYTAB" : "TOKEN"); if (keytabProvided) { filteredTokens.add(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); } - Iterator<Token<? extends TokenIdentifier>> iter = - credentials.getAllTokens().iterator(); - while (iter.hasNext()) { - Token<? extends TokenIdentifier> token = iter.next(); - log.info("Token {}", token.getKind()); - if (filteredTokens.contains(token.getKind())) { - log.debug("Filtering token {} from AM tokens", token.getKind()); - iter.remove(); - } - } - // at this point this credentials map is probably clear, but leaving this - // code to allow for future tokens... - containerCredentials = credentials; + containerCredentials = CredentialUtils.filterTokens( + UserGroupInformation.getCurrentUser().getCredentials(), + filteredTokens); + log.info(CredentialUtils.dumpTokens(containerCredentials, "\n")); } /** @@ -1698,9 +1689,18 @@ public class SliderAppMaster extends AbstractSliderLaunchedService //for each assignment: instantiate that role for (ContainerAssignment assignment : assignments) { - launchService.launchRole(assignment, getInstanceDefinition()); + try { + launchService.launchRole(assignment, getInstanceDefinition(), + buildContainerCredentials()); + } catch (IOException e) { + // Can be caused by failure to renew credentials with the remote + // service. If so, don't launch the application. Container is retained, + // though YARN will take it away after a timeout. + log.error("Failed to build credentials to launch container: {}", e, e); + + } } - + //for all the operations, exec them execute(operations); log.info("Diagnostics: {}", getContainerDiagnosticInfo()); @@ -2211,34 +2211,49 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // add current HDFS delegation token with an up to date token ByteBuffer tokens = getContainerCredentials(); + /* + ByteBuffer tokens = getContainerCredentials(); + if (tokens != null) { ctx.setTokens(tokens); } else { log.warn("No delegation tokens obtained and set for launch context"); } +*/ appState.containerStartSubmitted(container, instance); nmClientAsync.startContainerAsync(container, ctx); } + /** + * Build the credentials needed for containers. This will include + * getting new delegation tokens for HDFS if the AM is running + * with a keytab. + * @return a buffer of credentials + * @throws IOException + */ private ByteBuffer getContainerCredentials() throws IOException { // a delegation token can be retrieved from filesystem since // the login is via a keytab (see above) + Credentials credentials = buildContainerCredentials(); + return CredentialUtils.marshallCredentials(credentials); + } + + /** + * Build the credentials needed for containers. This will include + * getting new delegation tokens for HDFS if the AM is running + * with a keytab. + * @return a buffer of credentials + * @throws IOException + */ + + private Credentials buildContainerCredentials() throws IOException { Credentials credentials = new Credentials(containerCredentials); - ByteBuffer tokens = null; if (securityConfiguration.isKeytabProvided()) { - Token<? extends TokenIdentifier>[] hdfsTokens = - getClusterFS().getFileSystem().addDelegationTokens( - UserGroupInformation.getLoginUser().getShortUserName(), - credentials); + CredentialUtils.addSelfRenewableFSDelegationTokens( + getClusterFS().getFileSystem(), + credentials); } - if (!credentials.getAllTokens().isEmpty()) { - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - dob.close(); - tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - } - - return tokens; + return credentials; } @Override // NMClientAsync.CallbackHandler http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java index 60af770..eae9658 100644 --- a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java +++ b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java @@ -88,7 +88,7 @@ public class TestAppMasterLauncher { EasyMock.replay(mockYarnClient, appSubmissionContext, yarnClientApp); appMasterLauncher = new AppMasterLauncher("cl1", SliderKeys.APP_TYPE, null, - null, mockYarnClient, false, null, options, tags); + null, mockYarnClient, false, null, options, tags, null); // Verify the include/exclude patterns String expectedInclude = "slider*.txt|agent.out"; @@ -109,7 +109,7 @@ public class TestAppMasterLauncher { EasyMock.replay(mockYarnClient, appSubmissionContext, yarnClientApp); appMasterLauncher = new AppMasterLauncher("cl1", SliderKeys.APP_TYPE, null, - null, mockYarnClient, false, null, options, tags); + null, mockYarnClient, false, null, options, tags, null); // Verify the include/exclude patterns String expectedInclude = isOldApi ? "" : ".*"; @@ -128,7 +128,7 @@ public class TestAppMasterLauncher { EasyMock.replay(mockYarnClient, appSubmissionContext, yarnClientApp); appMasterLauncher = new AppMasterLauncher("cl1", SliderKeys.APP_TYPE, null, - null, mockYarnClient, false, null, options, tags); + null, mockYarnClient, false, null, options, tags, null); // Verify the include/exclude patterns String expectedInclude = isOldApi ? "" : ".*"; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java index cc64cab..a8f6b26 100644 --- a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java +++ b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java @@ -64,7 +64,7 @@ public class TestAppMasterLauncherWithAmReset { EasyMock.replay(mockYarnClient, yarnClientApp); appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null, - null, mockYarnClient, false, null, options, tags); + null, mockYarnClient, false, null, options, tags, null); ApplicationSubmissionContext ctx = appMasterLauncher.application .getApplicationSubmissionContext(); @@ -80,7 +80,7 @@ public class TestAppMasterLauncherWithAmReset { EasyMock.replay(mockYarnClient, yarnClientApp); appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null, - null, mockYarnClient, false, null, options, tags); + null, mockYarnClient, false, null, options, tags, null); ApplicationSubmissionContext ctx = appMasterLauncher.application .getApplicationSubmissionContext();