Repository: oozie Updated Branches: refs/heads/master df0e3c24c -> 12ea195dd
OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within JavaActionExecutor (andras.piros via pbacsko) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/12ea195d Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/12ea195d Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/12ea195d Branch: refs/heads/master Commit: 12ea195dd935d64c9a9dbe447acd0a76abef3c8e Parents: df0e3c2 Author: Peter Bacsko <[email protected]> Authored: Thu Aug 31 15:29:58 2017 +0200 Committer: Peter Bacsko <[email protected]> Committed: Thu Aug 31 15:29:58 2017 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 128 ++++++++++++++----- .../oozie/service/HadoopAccessorService.java | 54 ++++++-- .../action/hadoop/TestJavaActionExecutor.java | 11 +- release-log.txt | 1 + 4 files changed, 142 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/12ea195d/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index dc17950..bca79aa 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; @@ -962,7 +963,7 @@ public class JavaActionExecutor extends ActionExecutor { // Setting the credential properties in launcher conf Configuration credentialsConf = null; - HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, + Map<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, action, actionConf); Credentials credentials = null; if (credentialsProperties != null) { @@ -991,6 +992,11 @@ public class JavaActionExecutor extends ActionExecutor { LOG.debug("Creating yarnClient for action {0}", action.getId()); yarnClient = createYarnClient(context, launcherJobConf); + if (UserGroupInformation.isSecurityEnabled()) { + credentials = ensureCredentials(credentials); + acquireHDFSDelegationToken(actionFs, credentialsConf, credentials); + } + if (alreadyRunning && !isUserRetry) { try { ApplicationId appId = ConverterUtils.toApplicationId(launcherId); @@ -1066,6 +1072,56 @@ public class JavaActionExecutor extends ActionExecutor { } } + private Credentials ensureCredentials(final Credentials credentials) { + if (credentials == null) { + LOG.debug("No credentials present, creating a new one."); + return new Credentials(); + } + + return credentials; + } + + /** + * In a secure environment, when both HDFS HA and log aggregation are turned on, {@link JavaActionExecutor} is not able to call + * {@link YarnClient#submitApplication} since {@code HDFS_DELEGATION_TOKEN} is missing. + * + * @param actionFs the {@link FileSystem} to get the delegation token from + * @param credentialsConf the {@link Configuration} to extract the YARN renewer + * @param credentials the {@link Credentials} where the delegation token is stored + * @throws IOException + * @throws ActionExecutorException when security is enabled, but either {@code credentials} are empty, or + * {@code serverPrincipal} is empty, or HDFS delegation token is not present within {@code actionFs} + */ + private void acquireHDFSDelegationToken(final FileSystem actionFs, + final Configuration credentialsConf, + final Credentials credentials) + throws IOException, ActionExecutorException { + LOG.debug("Security is enabled, checking credentials to acquire HDFS delegation token."); + + final HadoopAccessorService hadoopAccessorService = Services.get().get(HadoopAccessorService.class); + final String servicePrincipal = hadoopAccessorService.getServicePrincipal(credentialsConf); + final String serverPrincipal = hadoopAccessorService.getServerPrincipal( + credentialsConf, + servicePrincipal); + if (serverPrincipal == null) { + final String errorTemplate = "No server principal present, won't get HDFS delegation token. [servicePrincipal={0}]"; + LOG.error(errorTemplate, servicePrincipal); + throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022", errorTemplate, servicePrincipal); + } + + LOG.debug("Server principal present, getting HDFS delegation token. [serverPrincipal={0}]", serverPrincipal); + final Token hdfsDelegationToken = actionFs.getDelegationToken(serverPrincipal); + if (hdfsDelegationToken == null) { + final String errorTemplate = "No HDFS delegation token present, won't set credentials. [serverPrincipal={0}]"; + LOG.error(errorTemplate, serverPrincipal); + throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022", errorTemplate, serverPrincipal); + } + + LOG.debug("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]", + hdfsDelegationToken); + credentials.addToken(new Text(hdfsDelegationToken.getService().toString()), hdfsDelegationToken); + } + private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf, String user, Context context, Configuration actionConf, String actionName, Credentials credentials) @@ -1162,45 +1218,51 @@ public class JavaActionExecutor extends ActionExecutor { return appContext; } - protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, - WorkflowAction action, Configuration actionConf) throws Exception { - HashMap<String, CredentialsProperties> credPropertiesMap = null; - if (context != null && action != null) { - if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) { - XConfiguration wfJobConf = getWorkflowConf(context); - if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) || - !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) { - credPropertiesMap = getActionCredentialsProperties(context, action); - if (!credPropertiesMap.isEmpty()) { - for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { - if (entry.getValue() != null) { - CredentialsProperties prop = entry.getValue(); - LOG.debug("Credential Properties set for action : " + action.getId()); - for (Entry<String, String> propEntry : prop.getProperties().entrySet()) { - String key = propEntry.getKey(); - String value = propEntry.getValue(); - actionConf.set(key, value); - LOG.debug("property : '" + key + "', value : '" + value + "'"); - } - } - } - } else { - LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); - } - } else { - LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); + Map<String, CredentialsProperties> setCredentialPropertyToActionConf(final Context context, + final WorkflowAction action, + final Configuration actionConf) throws Exception { + if (context == null || action == null) { + LOG.warn("context or action is null"); + return null; + } + + if (Boolean.TRUE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) && !UserGroupInformation.isSecurityEnabled()) { + LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); + return null; + } + + final XConfiguration wfJobConf = getWorkflowConf(context); + if (!Boolean.FALSE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) && + wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP)) && + !UserGroupInformation.isSecurityEnabled()) { + LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); + return null; + } + + final Map<String, CredentialsProperties> credPropertiesMap = getActionCredentialsProperties(context, action); + if (credPropertiesMap.isEmpty()) { + LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); + return credPropertiesMap; + } + + for (final Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { + if (entry.getValue() != null) { + final CredentialsProperties prop = entry.getValue(); + LOG.debug("Credential Properties set for action : " + action.getId()); + for (final Entry<String, String> propEntry : prop.getProperties().entrySet()) { + final String key = propEntry.getKey(); + final String value = propEntry.getValue(); + actionConf.set(key, value); + LOG.debug("property : '" + key + "', value : '" + value + "'"); } - } else { - LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); } - } else { - LOG.warn("context or action is null"); } + return credPropertiesMap; } protected void setCredentialTokens(Credentials credentials, Configuration jobconf, Context context, WorkflowAction action, - HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { + Map<String, CredentialsProperties> credPropertiesMap) throws Exception { if (context != null && action != null && credPropertiesMap != null) { // Make sure we're logged into Kerberos; if not, or near expiration, it will relogin http://git-wip-us.apache.org/repos/asf/oozie/blob/12ea195d/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java index b507c79..187cee2 100644 --- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java +++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java @@ -18,6 +18,8 @@ package org.apache.oozie.service; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; @@ -661,23 +663,12 @@ public class HadoopAccessorService implements Service { Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException { // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have // support for renewing/cancelling tokens - String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL)); + final String servicePrincipal = getServicePrincipal(jobConf); Text renewer; if (servicePrincipal != null) { // secure cluster renewer = mrTokenRenewers.get(servicePrincipal); if (renewer == null) { - // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal() - String target = jobConf.get(HADOOP_YARN_RM); - try { - String addr = NetUtils.createSocketAddr(target).getHostName(); - renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr)); - LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target - + ",Renewer=" + renewer); - } - catch (IllegalArgumentException iae) { - renewer = new Text(servicePrincipal.split("[/@]")[0]); - LOG.info("Delegation Token Renewer for " + servicePrincipal + " is " + renewer); - } + renewer = new Text(getServerPrincipal(jobConf, servicePrincipal)); mrTokenRenewers.put(servicePrincipal, renewer); } } @@ -687,6 +678,43 @@ public class HadoopAccessorService implements Service { return renewer; } + public String getServicePrincipal(final Configuration configuration) { + return configuration.get(RM_PRINCIPAL, configuration.get(JT_PRINCIPAL)); + } + + /** + * Mimic {@link org.apache.hadoop.mapred.Master#getMasterPrincipal}, get Kerberos principal for use as delegation token renewer. + * + * @param configuration the {@link Configuration} containing the YARN RM address + * @param servicePrincipal the configured service principal + * @return the server principal originating from the host name and the service principal + * @throws IOException when something goes wrong finding out the local address inside + * {@link SecurityUtil#getServerPrincipal(String, String)} + */ + public String getServerPrincipal(final Configuration configuration, final String servicePrincipal) throws IOException { + Preconditions.checkNotNull(configuration, "configuration has to be filled"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(servicePrincipal), "servicePrincipal has to be filled"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(configuration.get(HADOOP_YARN_RM)), + String.format("configuration entry %s has to be filled", HADOOP_YARN_RM)); + + String serverPrincipal; + final String target = configuration.get(HADOOP_YARN_RM); + + try { + final String addr = NetUtils.createSocketAddr(target).getHostName(); + serverPrincipal = SecurityUtil.getServerPrincipal(servicePrincipal, addr); + LOG.info("Delegation Token Renewer details: Principal={0},Target={1}", serverPrincipal, target); + } + catch (final IllegalArgumentException iae) { + LOG.warn("An error happened while trying to get server principal. Getting it from service principal anyway.", iae); + + serverPrincipal = servicePrincipal.split("[/@]")[0]; + LOG.info("Delegation Token Renewer for {0} is {1}", target, serverPrincipal); + } + + return serverPrincipal; + } + public void addFileToClassPath(String user, final Path file, final Configuration conf) throws IOException { ParamChecker.notEmpty(user, "user"); http://git-wip-us.apache.org/repos/asf/oozie/blob/12ea195d/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index c51c64a..ce674ad 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -878,7 +878,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Configuration actionConf = ae.createBaseHadoopConf(context, actionXmlconf); // Setting the credential properties in launcher conf - HashMap<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context, + Map<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context, action, actionConf); assertNotNull(credProperties); @@ -951,8 +951,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Configuration actionConf = ae.createBaseHadoopConf(context, actionXmlconf); try { - // Setting the credential properties in launcher conf should fail - ae.setCredentialPropertyToActionConf(context, action, actionConf); + // Setting the credential properties in launcher conf should fail + ae.setCredentialPropertyToActionConf(context, action, actionConf); } catch (ActionExecutorException e) { assertEquals(e.getErrorCode(), "JA021"); @@ -991,8 +991,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Configuration actionConf = ae.createBaseHadoopConf(context, actionXmlconf); // should not throw JA021 exception - HashMap<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context, action, - actionConf); + ae.setCredentialPropertyToActionConf(context, action, actionConf); } public void testCredentialsSkip() throws Exception { @@ -1091,7 +1090,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { ConfigurationService.setBoolean("oozie.credentials.skip", skipSite); // Setting the credential properties in launcher conf - HashMap<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context, + Map<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context, action, actionConf); // Try to load the token without it being defined in oozie-site; should get an exception http://git-wip-us.apache.org/repos/asf/oozie/blob/12ea195d/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index b63f42d..e2311ea 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within JavaActionExecutor (andras.piros via pbacsko) OOZIE-3026 fix openjpa enhancer stage during build for logging (dbdist13, andras.piros via pbacsko) OOZIE-2746 Several tests failure in TestV2ValidateServlet.java (Dongying Jiao via asasvari) OOZIE-3038 Make all Oozie JUnit tests pass on dist_test (andras.piros via gezapeti)
