[ https://issues.apache.org/jira/browse/APEXCORE-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473572#comment-16473572 ]
ASF GitHub Bot commented on APEXCORE-807: ----------------------------------------- tweise closed pull request #594: APEXCORE-807 Added renewal of tokens before renewal expiry interval URL: https://github.com/apache/apex-core/pull/594 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 6c640ee1b8..e17254140f 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -28,7 +28,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -52,13 +51,12 @@ import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator; import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator; import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator; -import org.apache.commons.io.FileUtils; +import org.apache.apex.engine.security.TokenRenewer; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.CompositeService; @@ -111,7 +109,6 @@ import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.security.StramDelegationTokenIdentifier; import com.datatorrent.stram.security.StramDelegationTokenManager; -import com.datatorrent.stram.security.StramUserLogin; import com.datatorrent.stram.util.ConfigUtils; import com.datatorrent.stram.util.SecurityUtils; import com.datatorrent.stram.webapp.AppInfo; @@ -166,6 +163,7 @@ private ApexPluginDispatcher apexPluginDispatcher; private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance(); private static final long REMOVE_CONTAINER_TIMEOUT = PropertiesHelper.getLong("org.apache.apex.nodemanager.containerKill.timeout", 30 * 1000, 0, Long.MAX_VALUE); + private TokenRenewer tokenRenewer; public StreamingAppMasterService(ApplicationAttemptId appAttemptID) { @@ -693,19 +691,10 @@ public boolean run() throws Exception private void execute() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); - final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); - LOG.info("number of tokens: {}", credentials.getAllTokens().size()); - Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); - while (iter.hasNext()) { - Token<?> token = iter.next(); - LOG.debug("token: {}", token); - } final Configuration conf = getConfig(); - long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME))); - long expiryTime = System.currentTimeMillis() + tokenLifeTime; - LOG.debug(" expiry token time {}", tokenLifeTime); - String principal = dag.getValue(LogicalPlan.PRINCIPAL); - String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE); + if (UserGroupInformation.isSecurityEnabled()) { + tokenRenewer = new TokenRenewer(dag, true, conf, appAttemptID.getApplicationId().toString()); + } // Register self with ResourceManager RegisterApplicationMasterResponse response = amRmClient.registerApplicationMaster(appMasterHostname, 0, appMasterTrackingUrl); @@ -778,9 +767,8 @@ private void execute() throws YarnException, IOException loopCounter++; final long currentTimeMillis = System.currentTimeMillis(); - if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) { - String applicationId = appAttemptID.getApplicationId().toString(); - expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true); + if (tokenRenewer != null) { + tokenRenewer.checkAndRenew(); } if (currentTimeMillis > nodeReportUpdateTime) { diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index 2019f489f0..d3079d0fd6 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -608,13 +608,23 @@ public ApplicationId launchApp(AppFactory appConfig) throws Exception if (UserGroupInformation.isSecurityEnabled()) { long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT)); dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime); + LOG.debug("HDFS token life time {}", hdfsTokenMaxLifeTime); + long hdfsTokenRenewInterval = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_RENEW_INTERVAL, conf.getLong(StramClientUtils.HDFS_TOKEN_RENEW_INTERVAL, StramClientUtils.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT)); + dag.setAttribute(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL, hdfsTokenRenewInterval); + LOG.debug("HDFS token renew interval {}", hdfsTokenRenewInterval); long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT)); dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime); + LOG.debug("RM token life time {}", rmTokenMaxLifeTime); + long rmTokenRenewInterval = conf.getLong(StramClientUtils.DT_RM_TOKEN_RENEW_INTERVAL, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY, YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT)); + dag.setAttribute(LogicalPlan.RM_TOKEN_RENEWAL_INTERVAL, rmTokenRenewInterval); + LOG.debug("RM token renew interval {}", rmTokenRenewInterval); setTokenRefreshCredentials(dag, conf); } String tokenRefreshFactor = conf.get(StramClientUtils.TOKEN_ANTICIPATORY_REFRESH_FACTOR); if (tokenRefreshFactor != null && tokenRefreshFactor.trim().length() > 0) { - dag.setAttribute(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR, Double.parseDouble(tokenRefreshFactor)); + double refreshFactor = Double.parseDouble(tokenRefreshFactor); + dag.setAttribute(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR, refreshFactor); + LOG.debug("Token refresh anticipatory factor {}", refreshFactor); } StramClient client = new StramClient(conf, dag); try { diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index a310ee2a54..d4f190fb64 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -111,12 +111,16 @@ public static final String SUBDIR_CONF = "conf"; public static final long RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE = 10 * 1000; public static final String DT_HDFS_TOKEN_MAX_LIFE_TIME = StreamingApplication.DT_PREFIX + "namenode.delegation.token.max-lifetime"; + public static final String DT_HDFS_TOKEN_RENEW_INTERVAL = StreamingApplication.DT_PREFIX + "namenode.delegation.token.renew-interval"; public static final String HDFS_TOKEN_MAX_LIFE_TIME = "dfs.namenode.delegation.token.max-lifetime"; + public static final String HDFS_TOKEN_RENEW_INTERVAL = "dfs.namenode.delegation.token.renew-interval"; public static final String DT_RM_TOKEN_MAX_LIFE_TIME = StreamingApplication.DT_PREFIX + "resourcemanager.delegation.token.max-lifetime"; + public static final String DT_RM_TOKEN_RENEW_INTERVAL = StreamingApplication.DT_PREFIX + "resourcemanager.delegation.token.renew-interval"; @Deprecated public static final String KEY_TAB_FILE = StramUserLogin.DT_AUTH_PREFIX + "store.keytab"; public static final String TOKEN_ANTICIPATORY_REFRESH_FACTOR = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.factor"; public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60 * 1000; + public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 24 * 60 * 60 * 1000; public static final String TOKEN_REFRESH_PRINCIPAL = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.principal"; public static final String TOKEN_REFRESH_KEYTAB = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.keytab"; /** diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index f5aaf352c7..927ad6d76b 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -42,15 +42,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.engine.security.TokenRenewer; import org.apache.apex.log.LogFileInformation; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.log4j.LogManager; @@ -106,7 +104,6 @@ import com.datatorrent.stram.plan.logical.Operators.PortContextPair; import com.datatorrent.stram.plan.logical.Operators.PortMappingDescriptor; import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance; -import com.datatorrent.stram.security.StramUserLogin; import com.datatorrent.stram.stream.BufferServerPublisher; import com.datatorrent.stram.stream.BufferServerSubscriber; import com.datatorrent.stram.stream.FastPublisher; @@ -164,6 +161,7 @@ private final MBassador<ContainerEvent> eventBus; // event bus for publishing container events HashSet<Component<ContainerContext>> components; private RequestFactory requestFactory; + private TokenRenewer tokenRenewer; static { try { @@ -608,22 +606,16 @@ public void heartbeatLoop() throws Exception logger.debug("Entering heartbeat loop (interval is {} ms)", this.heartbeatIntervalMillis); umbilical.log(containerId, "[" + containerId + "] Entering heartbeat loop.."); final YarnConfiguration conf = new YarnConfiguration(); - long tokenLifeTime = (long)(containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME)); - long expiryTime = System.currentTimeMillis(); - final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); - String stackTrace = null; - Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); - while (iter.hasNext()) { - Token<?> token = iter.next(); - logger.debug("token: {}", token); + if (UserGroupInformation.isSecurityEnabled()) { + tokenRenewer = new TokenRenewer(containerContext, false, conf, containerId); } - String principal = containerContext.getValue(LogicalPlan.PRINCIPAL); - String hdfsKeyTabFile = containerContext.getValue(LogicalPlan.KEY_TAB_FILE); + String stackTrace = null; while (!exitHeartbeatLoop) { - if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) { - expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), containerId, conf, principal, hdfsKeyTabFile, credentials, null, false); + if (tokenRenewer != null) { + tokenRenewer.checkAndRenew(); } + synchronized (this.heartbeatTrigger) { try { this.heartbeatTrigger.wait(heartbeatIntervalMillis); diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index bf4b2cbac9..18a9a63574 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -161,7 +161,9 @@ * Then it can be moved back to DAGContext. */ public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false); + public static Attribute<Long> HDFS_TOKEN_RENEWAL_INTERVAL = new Attribute<>(86400000L); public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L); + public static Attribute<Long> RM_TOKEN_RENEWAL_INTERVAL = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); public static Attribute<String> PRINCIPAL = new Attribute<>(null, StringCodec.String2String.getInstance()); public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, StringCodec.String2String.getInstance()); diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java index 71eb8253fd..75229061ad 100644 --- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java +++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java @@ -18,28 +18,16 @@ */ package com.datatorrent.stram.security; -import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.datatorrent.api.StreamingApplication; -import com.datatorrent.stram.client.StramClientUtils; -import com.datatorrent.stram.util.FSUtil; - /** * <p>StramUserLogin class.</p> * @@ -85,63 +73,6 @@ public static void authenticate(String principal, String keytab) throws IOExcept } } - public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile, final Configuration conf, String principal, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException - { - long expiryTime = System.currentTimeMillis() + tokenLifeTime; - //renew tokens - final String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); - if (tokenRenewer == null || tokenRenewer.length() == 0) { - throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer"); - } - - File keyTabFile; - try (FileSystem fs = FileSystem.newInstance(conf)) { - keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf); - } - - if (principal == null) { - principal = UserGroupInformation.getCurrentUser().getUserName(); - } - UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFile.getAbsolutePath()); - try { - ugi.doAs(new PrivilegedExceptionAction<Object>() - { - @Override - public Object run() throws Exception - { - - Credentials creds = new Credentials(); - try (FileSystem fs1 = FileSystem.newInstance(conf)) { - fs1.addDelegationTokens(tokenRenewer, creds); - } - if (renewRMToken) { - try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) { - new StramClientUtils.ClientRMHelper(yarnClient, conf).addRMDelegationToken(tokenRenewer, creds); - } - } - credentials.addAll(creds); - - return null; - } - }); - UserGroupInformation.getCurrentUser().addCredentials(credentials); - } catch (InterruptedException e) { - LOG.error("Error while renewing tokens ", e); - expiryTime = System.currentTimeMillis(); - } catch (IOException e) { - LOG.error("Error while renewing tokens ", e); - expiryTime = System.currentTimeMillis(); - } - LOG.debug("number of tokens: {}", credentials.getAllTokens().size()); - Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); - while (iter.hasNext()) { - Token<?> token = iter.next(); - LOG.debug("updated token: {}", token); - } - keyTabFile.delete(); - return expiryTime; - } - public static String getPrincipal() { return principal; diff --git a/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java b/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java new file mode 100644 index 0000000000..cda7c98aed --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.security; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + +import com.datatorrent.api.Context; +import com.datatorrent.stram.client.StramClientUtils; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.util.FSUtil; + +public class TokenRenewer +{ + + // The constant is not available hence defining here. If in future it is available this can be removed + private static final Text HDFS_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + private static final Logger logger = LoggerFactory.getLogger(TokenRenewer.class); + + boolean renewRMToken; + Configuration conf; + String destinationFile; + + long tokenLifeTime; + long tokenRenewalInterval; + String principal; + String hdfsKeyTabFile; + InetSocketAddress rmAddress; + + long expiryTime; + long renewTime; + Credentials credentials; + + public TokenRenewer(Context context, boolean renewRMToken, Configuration conf, String destinationFile) throws IOException + { + this.renewRMToken = renewRMToken; + this.destinationFile = destinationFile; + this.conf = conf; + + if (renewRMToken) { + tokenLifeTime = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(context.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), context.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME))); + tokenRenewalInterval = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(context.getValue(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL), context.getValue(LogicalPlan.RM_TOKEN_RENEWAL_INTERVAL))); + rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); + } else { + tokenLifeTime = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * context.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME)); + tokenRenewalInterval = (long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * context.getValue(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL)); + } + + principal = context.getValue(LogicalPlan.PRINCIPAL); + hdfsKeyTabFile = context.getValue(LogicalPlan.KEY_TAB_FILE); + + expiryTime = System.currentTimeMillis() + tokenLifeTime; + renewTime = expiryTime; + + logger.debug("token life time {} renewal interval {}", tokenLifeTime, tokenRenewalInterval); + logger.debug("Token expiry time {} renew time {}", expiryTime, renewTime); + + credentials = UserGroupInformation.getCurrentUser().getCredentials(); + // Check credentials are proper at RM + if (renewRMToken) { + renewTokens(false, true); + } + } + + public void checkAndRenew() throws IOException + { + boolean renew = false; + boolean refresh = false; + long currentTimeMillis = System.currentTimeMillis(); + if (currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) { + refresh = true; + } else if (currentTimeMillis >= renewTime) { + renew = true; + } + if (refresh || renew) { + long updateTime = renewTokens(refresh, false); + if (refresh) { + expiryTime = updateTime; + renewTime = currentTimeMillis + tokenRenewalInterval; + logger.debug("Token expiry time {} renew time {}", expiryTime, renewTime); + } else { + renewTime = updateTime; + logger.debug("Token renew time {}", renewTime); + } + } + } + + private long renewTokens(final boolean refresh, boolean checkOnly) throws IOException + { + logger.info("{}", checkOnly ? "Checking renewal" : (refresh ? "Refreshing tokens" : "Renewing tokens")); + long expiryTime = System.currentTimeMillis() + (refresh ? tokenLifeTime : tokenRenewalInterval); + + final String tokenRenewer = UserGroupInformation.getCurrentUser().getUserName(); + logger.debug("Token renewer {}", tokenRenewer); + + File keyTabFile = null; + try (FileSystem fs = FileSystem.newInstance(conf)) { + String destinationDir = FileUtils.getTempDirectoryPath(); + keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf); + + if (principal == null) { + //principal = UserGroupInformation.getCurrentUser().getUserName(); + principal = UserGroupInformation.getLoginUser().getUserName(); + } + logger.debug("Principal {}", principal); + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFile.getAbsolutePath()); + if (!checkOnly) { + try { + UserGroupInformation currUGI = UserGroupInformation.createProxyUser(tokenRenewer, ugi); + currUGI.doAs(new PrivilegedExceptionAction<Object>() + { + @Override + public Object run() throws Exception + { + + if (refresh) { + Credentials creds = new Credentials(); + try (FileSystem fs1 = FileSystem.newInstance(conf)) { + logger.info("Refreshing fs tokens"); + fs1.addDelegationTokens(tokenRenewer, creds); + logger.info("Refreshed tokens"); + } + if (renewRMToken) { + try (YarnClient yarnClient = StramClientUtils.createYarnClient(conf)) { + logger.info("Refreshing rm tokens"); + new StramClientUtils.ClientRMHelper(yarnClient, conf).addRMDelegationToken(tokenRenewer, creds); + logger.info("Refreshed tokens"); + } + } + credentials.addAll(creds); + } else { + Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); + for (Token<? extends TokenIdentifier> token : tokens) { + logger.debug("Token {}", token); + if (token.getKind().equals(HDFS_TOKEN_KIND) || (renewRMToken && token.getKind().equals(RMDelegationTokenIdentifier.KIND_NAME))) { + logger.info("Renewing token {}", token.getKind()); + token.renew(conf); + logger.info("Renewed token"); + } + } + } + + return null; + } + }); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + } catch (InterruptedException e) { + logger.error("Error while renewing tokens ", e); + expiryTime = System.currentTimeMillis(); + } catch (IOException e) { + logger.error("Error while renewing tokens ", e); + expiryTime = System.currentTimeMillis(); + } + } + if (logger.isDebugEnabled()) { + logger.debug("number of tokens: {}", credentials.getAllTokens().size()); + Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + Token<?> token = iter.next(); + logger.debug("updated token: {}", token); + } + } + } finally { + if (keyTabFile != null) { + keyTabFile.delete(); + } + } + return expiryTime; + } + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > In secure mode containers are failing after one day and the application is > failing after seven days > --------------------------------------------------------------------------------------------------- > > Key: APEXCORE-807 > URL: https://issues.apache.org/jira/browse/APEXCORE-807 > Project: Apache Apex Core > Issue Type: Bug > Reporter: Pramod Immaneni > Assignee: Pramod Immaneni > Priority: Major > > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (HDFS_DELEGATION_TOKEN token nnnnnn for xxxxxx) can't be found in cache > at com.google.common.base.Throwables.propagate(Throwables.java:156) > at com.datatorrent.stram.engine.Node.reportStats(Node.java:489) > at com.datatorrent.stram.engine.GenericNode.reportStats(GenericNode.java:825) > at > com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:184) > at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1465) > Caused by: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (HDFS_DELEGATION_TOKEN token nnnnnn for xxxxxx) can't be found in cache > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at com.datatorrent.stram.engine.Node.reportStats(Node.java:482) > ... 4 more > Caused by: java.lang.RuntimeException: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (HDFS_DELEGATION_TOKEN token nnnnnn for xxxxxx) can't be found in cache > at com.google.common.base.Throwables.propagate(Throwables.java:156) > at > com.datatorrent.common.util.AsyncFSStorageAgent.copyToHDFS(AsyncFSStorageAgent.java:131) > at > com.datatorrent.common.util.AsyncFSStorageAgent.flush(AsyncFSStorageAgent.java:156) > at com.datatorrent.stram.engine.Node$CheckpointHandler.call(Node.java:706) > at com.datatorrent.stram.engine.Node$CheckpointHandler.call(Node.java:696) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (HDFS_DELEGATION_TOKEN token nnnnnn for xxxxxx) can't be found in cache > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554) > at org.apache.hadoop.ipc.Client.call(Client.java:1498) > at org.apache.hadoop.ipc.Client.call(Client.java:1398) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) > at com.sun.proxy.$Proxy10.create(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185) > at com.sun.proxy.$Proxy11.create(Unknown Source) > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1822) > at org.apache.hadoop.hdfs.DFSClient.primitiveCreate(DFSClient.java:1762) > at org.apache.hadoop.fs.Hdfs.createInternal(Hdfs.java:104) > at org.apache.hadoop.fs.Hdfs.createInternal(Hdfs.java:60) > at > org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:585) > at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:688) > at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:684) > at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) > at org.apache.hadoop.fs.FileContext.create(FileContext.java:684) > at > com.datatorrent.common.util.AsyncFSStorageAgent.copyToHDFS(AsyncFSStorageAgent.java:119) > ... 9 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)