[ 
https://issues.apache.org/jira/browse/APEXCORE-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473572#comment-16473572
 ] 

ASF GitHub Bot commented on APEXCORE-807:
-----------------------------------------

tweise closed pull request #594: APEXCORE-807 Added renewal of tokens before 
renewal expiry interval
URL: https://github.com/apache/apex-core/pull/594
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 6c640ee1b8..e17254140f 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -28,7 +28,6 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -52,13 +51,12 @@
 import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
 import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
 import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
-import org.apache.commons.io.FileUtils;
+import org.apache.apex.engine.security.TokenRenewer;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.CompositeService;
@@ -111,7 +109,6 @@
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.security.StramDelegationTokenIdentifier;
 import com.datatorrent.stram.security.StramDelegationTokenManager;
-import com.datatorrent.stram.security.StramUserLogin;
 import com.datatorrent.stram.util.ConfigUtils;
 import com.datatorrent.stram.util.SecurityUtils;
 import com.datatorrent.stram.webapp.AppInfo;
@@ -166,6 +163,7 @@
   private ApexPluginDispatcher apexPluginDispatcher;
   private final GroupingManager groupingManager = 
GroupingManager.getGroupingManagerInstance();
   private static final long REMOVE_CONTAINER_TIMEOUT = 
PropertiesHelper.getLong("org.apache.apex.nodemanager.containerKill.timeout", 
30 * 1000, 0, Long.MAX_VALUE);
+  private TokenRenewer tokenRenewer;
 
   public StreamingAppMasterService(ApplicationAttemptId appAttemptID)
   {
@@ -693,19 +691,10 @@ public boolean run() throws Exception
   private void execute() throws YarnException, IOException
   {
     LOG.info("Starting ApplicationMaster");
-    final Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
-    LOG.info("number of tokens: {}", credentials.getAllTokens().size());
-    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<?> token = iter.next();
-      LOG.debug("token: {}", token);
-    }
     final Configuration conf = getConfig();
-    long tokenLifeTime = 
(long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * 
Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), 
dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
-    long expiryTime = System.currentTimeMillis() + tokenLifeTime;
-    LOG.debug(" expiry token time {}", tokenLifeTime);
-    String principal = dag.getValue(LogicalPlan.PRINCIPAL);
-    String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      tokenRenewer = new TokenRenewer(dag, true, conf, 
appAttemptID.getApplicationId().toString());
+    }
 
     // Register self with ResourceManager
     RegisterApplicationMasterResponse response = 
amRmClient.registerApplicationMaster(appMasterHostname, 0, 
appMasterTrackingUrl);
@@ -778,9 +767,8 @@ private void execute() throws YarnException, IOException
         loopCounter++;
         final long currentTimeMillis = System.currentTimeMillis();
 
-        if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= 
expiryTime && hdfsKeyTabFile != null) {
-          String applicationId = appAttemptID.getApplicationId().toString();
-          expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, 
FileUtils.getTempDirectoryPath(), applicationId, conf, principal, 
hdfsKeyTabFile, credentials, rmAddress, true);
+        if (tokenRenewer != null) {
+          tokenRenewer.checkAndRenew();
         }
 
         if (currentTimeMillis > nodeReportUpdateTime) {
diff --git 
a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java 
b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 2019f489f0..d3079d0fd6 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -608,13 +608,23 @@ public ApplicationId launchApp(AppFactory appConfig) 
throws Exception
     if (UserGroupInformation.isSecurityEnabled()) {
       long hdfsTokenMaxLifeTime = 
conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, 
conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, 
StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
       dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
+      LOG.debug("HDFS token life time {}", hdfsTokenMaxLifeTime);
+      long hdfsTokenRenewInterval = 
conf.getLong(StramClientUtils.DT_HDFS_TOKEN_RENEW_INTERVAL, 
conf.getLong(StramClientUtils.HDFS_TOKEN_RENEW_INTERVAL, 
StramClientUtils.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
+      dag.setAttribute(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL, 
hdfsTokenRenewInterval);
+      LOG.debug("HDFS token renew interval {}", hdfsTokenRenewInterval);
       long rmTokenMaxLifeTime = 
conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, 
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, 
YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
       dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
+      LOG.debug("RM token life time {}", rmTokenMaxLifeTime);
+      long rmTokenRenewInterval = 
conf.getLong(StramClientUtils.DT_RM_TOKEN_RENEW_INTERVAL, 
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
+      dag.setAttribute(LogicalPlan.RM_TOKEN_RENEWAL_INTERVAL, 
rmTokenRenewInterval);
+      LOG.debug("RM token renew interval {}", rmTokenRenewInterval);
       setTokenRefreshCredentials(dag, conf);
     }
     String tokenRefreshFactor = 
conf.get(StramClientUtils.TOKEN_ANTICIPATORY_REFRESH_FACTOR);
     if (tokenRefreshFactor != null && tokenRefreshFactor.trim().length() > 0) {
-      dag.setAttribute(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR, 
Double.parseDouble(tokenRefreshFactor));
+      double refreshFactor = Double.parseDouble(tokenRefreshFactor);
+      dag.setAttribute(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR, 
refreshFactor);
+      LOG.debug("Token refresh anticipatory factor {}", refreshFactor);
     }
     StramClient client = new StramClient(conf, dag);
     try {
diff --git 
a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java 
b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index a310ee2a54..d4f190fb64 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -111,12 +111,16 @@
   public static final String SUBDIR_CONF = "conf";
   public static final long RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE = 10 * 
1000;
   public static final String DT_HDFS_TOKEN_MAX_LIFE_TIME = 
StreamingApplication.DT_PREFIX + "namenode.delegation.token.max-lifetime";
+  public static final String DT_HDFS_TOKEN_RENEW_INTERVAL = 
StreamingApplication.DT_PREFIX + "namenode.delegation.token.renew-interval";
   public static final String HDFS_TOKEN_MAX_LIFE_TIME = 
"dfs.namenode.delegation.token.max-lifetime";
+  public static final String HDFS_TOKEN_RENEW_INTERVAL = 
"dfs.namenode.delegation.token.renew-interval";
   public static final String DT_RM_TOKEN_MAX_LIFE_TIME = 
StreamingApplication.DT_PREFIX + 
"resourcemanager.delegation.token.max-lifetime";
+  public static final String DT_RM_TOKEN_RENEW_INTERVAL = 
StreamingApplication.DT_PREFIX + 
"resourcemanager.delegation.token.renew-interval";
   @Deprecated
   public static final String KEY_TAB_FILE = StramUserLogin.DT_AUTH_PREFIX + 
"store.keytab";
   public static final String TOKEN_ANTICIPATORY_REFRESH_FACTOR = 
StramUserLogin.DT_AUTH_PREFIX + "token.refresh.factor";
   public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7 * 24 * 60 
* 60 * 1000;
+  public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 24 * 60 * 
60 * 1000;
   public static final String TOKEN_REFRESH_PRINCIPAL = 
StramUserLogin.DT_AUTH_PREFIX + "token.refresh.principal";
   public static final String TOKEN_REFRESH_KEYTAB = 
StramUserLogin.DT_AUTH_PREFIX + "token.refresh.keytab";
   /**
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java 
b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index f5aaf352c7..927ad6d76b 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -42,15 +42,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.engine.security.TokenRenewer;
 import org.apache.apex.log.LogFileInformation;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.log4j.LogManager;
 
@@ -106,7 +104,6 @@
 import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
 import com.datatorrent.stram.plan.logical.Operators.PortMappingDescriptor;
 import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
-import com.datatorrent.stram.security.StramUserLogin;
 import com.datatorrent.stram.stream.BufferServerPublisher;
 import com.datatorrent.stram.stream.BufferServerSubscriber;
 import com.datatorrent.stram.stream.FastPublisher;
@@ -164,6 +161,7 @@
   private final MBassador<ContainerEvent> eventBus; // event bus for 
publishing container events
   HashSet<Component<ContainerContext>> components;
   private RequestFactory requestFactory;
+  private TokenRenewer tokenRenewer;
 
   static {
     try {
@@ -608,22 +606,16 @@ public void heartbeatLoop() throws Exception
     logger.debug("Entering heartbeat loop (interval is {} ms)", 
this.heartbeatIntervalMillis);
     umbilical.log(containerId, "[" + containerId + "] Entering heartbeat 
loop..");
     final YarnConfiguration conf = new YarnConfiguration();
-    long tokenLifeTime = 
(long)(containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) 
* containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME));
-    long expiryTime = System.currentTimeMillis();
-    final Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
-    String stackTrace = null;
-    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<?> token = iter.next();
-      logger.debug("token: {}", token);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      tokenRenewer = new TokenRenewer(containerContext, false, conf, 
containerId);
     }
-    String principal = containerContext.getValue(LogicalPlan.PRINCIPAL);
-    String hdfsKeyTabFile = 
containerContext.getValue(LogicalPlan.KEY_TAB_FILE);
+    String stackTrace = null;
     while (!exitHeartbeatLoop) {
 
-      if (UserGroupInformation.isSecurityEnabled() && 
System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) {
-        expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, 
FileUtils.getTempDirectoryPath(), containerId, conf, principal, hdfsKeyTabFile, 
credentials, null, false);
+      if (tokenRenewer != null) {
+        tokenRenewer.checkAndRenew();
       }
+
       synchronized (this.heartbeatTrigger) {
         try {
           this.heartbeatTrigger.wait(heartbeatIntervalMillis);
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index bf4b2cbac9..18a9a63574 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -161,7 +161,9 @@
    * Then it can be moved back to DAGContext.
    */
   public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new 
Attribute<>(false);
+  public static Attribute<Long> HDFS_TOKEN_RENEWAL_INTERVAL = new 
Attribute<>(86400000L);
   public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new 
Attribute<>(604800000L);
+  public static Attribute<Long> RM_TOKEN_RENEWAL_INTERVAL = new 
Attribute<>(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
   public static Attribute<Long> RM_TOKEN_LIFE_TIME = new 
Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
   public static Attribute<String> PRINCIPAL = new Attribute<>(null, 
StringCodec.String2String.getInstance());
   public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, 
StringCodec.String2String.getInstance());
diff --git 
a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java 
b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index 71eb8253fd..75229061ad 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -18,28 +18,16 @@
  */
 package com.datatorrent.stram.security;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.Iterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import com.datatorrent.api.StreamingApplication;
 
-import com.datatorrent.stram.client.StramClientUtils;
-import com.datatorrent.stram.util.FSUtil;
-
 /**
  * <p>StramUserLogin class.</p>
  *
@@ -85,63 +73,6 @@ public static void authenticate(String principal, String 
keytab) throws IOExcept
     }
   }
 
-  public static long refreshTokens(long tokenLifeTime, String destinationDir, 
String destinationFile, final Configuration conf, String principal, String 
hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress 
rmAddress, final boolean renewRMToken) throws IOException
-  {
-    long expiryTime = System.currentTimeMillis() + tokenLifeTime;
-    //renew tokens
-    final String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
-    if (tokenRenewer == null || tokenRenewer.length() == 0) {
-      throw new IOException("Can't get Master Kerberos principal for the RM to 
use as renewer");
-    }
-
-    File keyTabFile;
-    try (FileSystem fs = FileSystem.newInstance(conf)) {
-      keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, 
destinationFile, hdfsKeyTabFile, conf);
-    }
-
-    if (principal == null) {
-      principal = UserGroupInformation.getCurrentUser().getUserName();
-    }
-    UserGroupInformation ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
keyTabFile.getAbsolutePath());
-    try {
-      ugi.doAs(new PrivilegedExceptionAction<Object>()
-      {
-        @Override
-        public Object run() throws Exception
-        {
-
-          Credentials creds = new Credentials();
-          try (FileSystem fs1 = FileSystem.newInstance(conf)) {
-            fs1.addDelegationTokens(tokenRenewer, creds);
-          }
-          if (renewRMToken) {
-            try (YarnClient yarnClient = 
StramClientUtils.createYarnClient(conf)) {
-              new StramClientUtils.ClientRMHelper(yarnClient, 
conf).addRMDelegationToken(tokenRenewer, creds);
-            }
-          }
-          credentials.addAll(creds);
-
-          return null;
-        }
-      });
-      UserGroupInformation.getCurrentUser().addCredentials(credentials);
-    } catch (InterruptedException e) {
-      LOG.error("Error while renewing tokens ", e);
-      expiryTime = System.currentTimeMillis();
-    } catch (IOException e) {
-      LOG.error("Error while renewing tokens ", e);
-      expiryTime = System.currentTimeMillis();
-    }
-    LOG.debug("number of tokens: {}", credentials.getAllTokens().size());
-    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<?> token = iter.next();
-      LOG.debug("updated token: {}", token);
-    }
-    keyTabFile.delete();
-    return expiryTime;
-  }
-
   public static String getPrincipal()
   {
     return principal;
diff --git 
a/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java 
b/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java
new file mode 100644
index 0000000000..cda7c98aed
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/security/TokenRenewer.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.stram.client.StramClientUtils;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.util.FSUtil;
+
+public class TokenRenewer
+{
+
+  // The constant is not available hence defining here. If in future it is 
available this can be removed
+  private static final Text HDFS_TOKEN_KIND = new 
Text("HDFS_DELEGATION_TOKEN");
+
+  private static final Logger logger = 
LoggerFactory.getLogger(TokenRenewer.class);
+
+  boolean renewRMToken;
+  Configuration conf;
+  String destinationFile;
+
+  long tokenLifeTime;
+  long tokenRenewalInterval;
+  String principal;
+  String hdfsKeyTabFile;
+  InetSocketAddress rmAddress;
+
+  long expiryTime;
+  long renewTime;
+  Credentials credentials;
+
+  public TokenRenewer(Context context, boolean renewRMToken, Configuration 
conf, String destinationFile) throws IOException
+  {
+    this.renewRMToken = renewRMToken;
+    this.destinationFile = destinationFile;
+    this.conf = conf;
+
+    if (renewRMToken) {
+      tokenLifeTime = 
(long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * 
Math.min(context.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), 
context.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
+      tokenRenewalInterval = 
(long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * 
Math.min(context.getValue(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL), 
context.getValue(LogicalPlan.RM_TOKEN_RENEWAL_INTERVAL)));
+      rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, 
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
+    } else {
+      tokenLifeTime = 
(long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * 
context.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME));
+      tokenRenewalInterval = 
(long)(context.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * 
context.getValue(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL));
+    }
+
+    principal = context.getValue(LogicalPlan.PRINCIPAL);
+    hdfsKeyTabFile = context.getValue(LogicalPlan.KEY_TAB_FILE);
+
+    expiryTime = System.currentTimeMillis() + tokenLifeTime;
+    renewTime = expiryTime;
+
+    logger.debug("token life time {} renewal interval {}", tokenLifeTime, 
tokenRenewalInterval);
+    logger.debug("Token expiry time {} renew time {}", expiryTime, renewTime);
+
+    credentials = UserGroupInformation.getCurrentUser().getCredentials();
+    // Check credentials are proper at RM
+    if (renewRMToken) {
+      renewTokens(false, true);
+    }
+  }
+
+  public void checkAndRenew() throws IOException
+  {
+    boolean renew = false;
+    boolean refresh = false;
+    long currentTimeMillis = System.currentTimeMillis();
+    if (currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
+      refresh = true;
+    } else if (currentTimeMillis >= renewTime) {
+      renew = true;
+    }
+    if (refresh || renew) {
+      long updateTime = renewTokens(refresh, false);
+      if (refresh) {
+        expiryTime = updateTime;
+        renewTime = currentTimeMillis + tokenRenewalInterval;
+        logger.debug("Token expiry time {} renew time {}", expiryTime, 
renewTime);
+      } else {
+        renewTime = updateTime;
+        logger.debug("Token renew time {}", renewTime);
+      }
+    }
+  }
+
+  private long renewTokens(final boolean refresh, boolean checkOnly) throws 
IOException
+  {
+    logger.info("{}", checkOnly ? "Checking renewal" : (refresh ? "Refreshing 
tokens" : "Renewing tokens"));
+    long expiryTime = System.currentTimeMillis() + (refresh ? tokenLifeTime : 
tokenRenewalInterval);
+
+    final String tokenRenewer = 
UserGroupInformation.getCurrentUser().getUserName();
+    logger.debug("Token renewer {}", tokenRenewer);
+
+    File keyTabFile = null;
+    try (FileSystem fs = FileSystem.newInstance(conf)) {
+      String destinationDir = FileUtils.getTempDirectoryPath();
+      keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, 
destinationFile, hdfsKeyTabFile, conf);
+
+      if (principal == null) {
+        //principal = UserGroupInformation.getCurrentUser().getUserName();
+        principal = UserGroupInformation.getLoginUser().getUserName();
+      }
+      logger.debug("Principal {}", principal);
+      UserGroupInformation ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
keyTabFile.getAbsolutePath());
+      if (!checkOnly) {
+        try {
+          UserGroupInformation currUGI = 
UserGroupInformation.createProxyUser(tokenRenewer, ugi);
+          currUGI.doAs(new PrivilegedExceptionAction<Object>()
+          {
+            @Override
+            public Object run() throws Exception
+            {
+
+              if (refresh) {
+                Credentials creds = new Credentials();
+                try (FileSystem fs1 = FileSystem.newInstance(conf)) {
+                  logger.info("Refreshing fs tokens");
+                  fs1.addDelegationTokens(tokenRenewer, creds);
+                  logger.info("Refreshed tokens");
+                }
+                if (renewRMToken) {
+                  try (YarnClient yarnClient = 
StramClientUtils.createYarnClient(conf)) {
+                    logger.info("Refreshing rm tokens");
+                    new StramClientUtils.ClientRMHelper(yarnClient, 
conf).addRMDelegationToken(tokenRenewer, creds);
+                    logger.info("Refreshed tokens");
+                  }
+                }
+                credentials.addAll(creds);
+              } else {
+                Collection<Token<? extends TokenIdentifier>> tokens = 
credentials.getAllTokens();
+                for (Token<? extends TokenIdentifier> token : tokens) {
+                  logger.debug("Token {}", token);
+                  if (token.getKind().equals(HDFS_TOKEN_KIND) || (renewRMToken 
&& token.getKind().equals(RMDelegationTokenIdentifier.KIND_NAME))) {
+                    logger.info("Renewing token {}", token.getKind());
+                    token.renew(conf);
+                    logger.info("Renewed token");
+                  }
+                }
+              }
+
+              return null;
+            }
+          });
+          UserGroupInformation.getCurrentUser().addCredentials(credentials);
+        } catch (InterruptedException e) {
+          logger.error("Error while renewing tokens ", e);
+          expiryTime = System.currentTimeMillis();
+        } catch (IOException e) {
+          logger.error("Error while renewing tokens ", e);
+          expiryTime = System.currentTimeMillis();
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("number of tokens: {}", 
credentials.getAllTokens().size());
+        Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+        while (iter.hasNext()) {
+          Token<?> token = iter.next();
+          logger.debug("updated token: {}", token);
+        }
+      }
+    } finally {
+      if (keyTabFile != null) {
+        keyTabFile.delete();
+      }
+    }
+    return expiryTime;
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> In secure mode containers are failing after one day and the application is 
> failing after seven days
> ---------------------------------------------------------------------------------------------------
>
>                 Key: APEXCORE-807
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-807
>             Project: Apache Apex Core
>          Issue Type: Bug
>            Reporter: Pramod Immaneni
>            Assignee: Pramod Immaneni
>            Priority: Major
>
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token nnnnnn for xxxxxx) can't be found in cache
>  at com.google.common.base.Throwables.propagate(Throwables.java:156)
>  at com.datatorrent.stram.engine.Node.reportStats(Node.java:489)
>  at com.datatorrent.stram.engine.GenericNode.reportStats(GenericNode.java:825)
>  at 
> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:184)
>  at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
>  at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1465)
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token nnnnnn for xxxxxx) can't be found in cache
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>  at com.datatorrent.stram.engine.Node.reportStats(Node.java:482)
>  ... 4 more
> Caused by: java.lang.RuntimeException: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token nnnnnn for xxxxxx) can't be found in cache
>  at com.google.common.base.Throwables.propagate(Throwables.java:156)
>  at 
> com.datatorrent.common.util.AsyncFSStorageAgent.copyToHDFS(AsyncFSStorageAgent.java:131)
>  at 
> com.datatorrent.common.util.AsyncFSStorageAgent.flush(AsyncFSStorageAgent.java:156)
>  at com.datatorrent.stram.engine.Node$CheckpointHandler.call(Node.java:706)
>  at com.datatorrent.stram.engine.Node$CheckpointHandler.call(Node.java:696)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token nnnnnn for xxxxxx) can't be found in cache
>  at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
>  at org.apache.hadoop.ipc.Client.call(Client.java:1498)
>  at org.apache.hadoop.ipc.Client.call(Client.java:1398)
>  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
>  at com.sun.proxy.$Proxy10.create(Unknown Source)
>  at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313)
>  at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
>  at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
>  at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
>  at com.sun.proxy.$Proxy11.create(Unknown Source)
>  at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1822)
>  at org.apache.hadoop.hdfs.DFSClient.primitiveCreate(DFSClient.java:1762)
>  at org.apache.hadoop.fs.Hdfs.createInternal(Hdfs.java:104)
>  at org.apache.hadoop.fs.Hdfs.createInternal(Hdfs.java:60)
>  at 
> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:585)
>  at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:688)
>  at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:684)
>  at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>  at org.apache.hadoop.fs.FileContext.create(FileContext.java:684)
>  at 
> com.datatorrent.common.util.AsyncFSStorageAgent.copyToHDFS(AsyncFSStorageAgent.java:119)
>  ... 9 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to