This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e427ce0d572 HIVE-26569: Support renewal and recreation of LLAP_TOKENs 
(#3626) (Laszlo Bodor reviewed by Ayush Saxena)
e427ce0d572 is described below

commit e427ce0d572c9adf6f194693a1b3ba85f246f3b7
Author: Bodor Laszlo <[email protected]>
AuthorDate: Thu Dec 8 12:17:36 2022 +0100

    HIVE-26569: Support renewal and recreation of LLAP_TOKENs (#3626) (Laszlo 
Bodor reviewed by Ayush Saxena)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +-
 .../hadoop/hive/llap/security/LlapTokenClient.java |  91 ++++++++-
 .../llap/security/LlapTokenLocalClientImpl.java    |  14 +-
 .../hive/llap/tez/LlapProtocolClientProxy.java     |  39 ++++
 .../apache/hadoop/hive/llap/AsyncPbRpcProxy.java   |  55 ++++--
 .../impl/LlapManagementProtocolClientImpl.java     |  18 +-
 .../hive/llap/security/LlapServerSecurityInfo.java |   4 +-
 .../hadoop/hive/llap/security/SecretManager.java   |  17 +-
 .../llap/daemon/impl/DefaultLlapTokenManager.java  | 215 +++++++++++++++++++++
 .../hive/llap/daemon/impl/DummyTokenManager.java   |  30 +++
 .../hadoop/hive/llap/daemon/impl/LlapDaemon.java   |  15 +-
 .../llap/daemon/impl/LlapProtocolServerImpl.java   |  11 +-
 .../hive/llap/daemon/impl/LlapTokenManager.java    |  32 +++
 .../hive/llap/tezplugins/LlapTaskCommunicator.java |  95 ++++-----
 14 files changed, 548 insertions(+), 93 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 24b9d0eb2b6..44452b970e5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -5060,10 +5060,13 @@ public class HiveConf extends Configuration {
         "cluster by setting it to true or 'except_llap_owner' (the latter 
returns such tokens\n" +
         "to everyone except the user LLAP cluster is authenticating under)."),
 
-    // Hadoop DelegationTokenManager default is 1 week.
     
LLAP_DELEGATION_TOKEN_LIFETIME("hive.llap.daemon.delegation.token.lifetime", 
"14d",
          new TimeValidator(TimeUnit.SECONDS),
         "LLAP delegation token lifetime, in seconds if specified without a 
unit."),
+    
LLAP_DELEGATION_TOKEN_RENEW_INTERVAL("hive.llap.daemon.delegation.token.renew.interval",
 "1d",
+        new TimeValidator(TimeUnit.SECONDS),
+        "LLAP delegation token renew interval, in seconds if specified without 
a unit."
+            + "Tokens are typically renewed in the LlapDaemons by 
LlapTokenManager currently."),
     LLAP_MANAGEMENT_RPC_PORT("hive.llap.management.rpc.port", 15004,
         "RPC port for LLAP daemon management service."),
     LLAP_WEB_AUTO_AUTH("hive.llap.auto.auth", false,
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
index 3208e21269d..e153e470c2e 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -19,11 +19,12 @@
 package org.apache.hadoop.hive.llap.security;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import javax.net.SocketFactory;
@@ -31,16 +32,18 @@ import javax.net.SocketFactory;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
-import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.slf4j.Logger;
@@ -60,6 +63,7 @@ public class LlapTokenClient {
   private Collection<LlapServiceInstance> lastKnownInstances;
   private LlapManagementProtocolClientImpl client;
   private LlapServiceInstance clientInstance;
+  private Token<LlapTokenIdentifier> currentToken;
 
   public LlapTokenClient(Configuration conf) {
     this.conf = conf;
@@ -89,6 +93,7 @@ public class LlapTokenClient {
         LOG.error("Cannot get a token, trying a different instance", ex);
         client = null;
         clientInstance = null;
+        currentToken = null;
       }
       if (llaps == null || !llaps.hasNext()) {
         if (hasRefreshed) { // Only refresh once.
@@ -101,9 +106,8 @@ public class LlapTokenClient {
     }
 
     Token<LlapTokenIdentifier> token = extractToken(tokenBytes);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": 
" + token);
-    }
+    LOG.info("Obtained a LLAP delegation token from {}: {}", clientInstance, 
token);
+
     return token;
   }
 
@@ -117,10 +121,72 @@ public class LlapTokenClient {
 
   private ByteString getTokenBytes(final String appId) throws IOException, 
ServiceException {
     assert clientInstance != null;
+    if (currentToken != null) {
+      try {
+        return fetchTokenWithCurrentToken(appId);
+      } catch (IOException | ServiceException e) {
+        LOG.warn("Exception while getting delegation token, trying with 
kerberos login", e);
+        return fetchTokenWithKerberosLogin(appId);
+      }
+    } else {
+      LOG.info("currentToken is null, let's try to fetch one with kerberos 
login");
+      return fetchTokenWithKerberosLogin(appId);
+    }
+  }
+
+  /**
+   * Tries to fetch token by LlapManagementProtocolClientImpl from a daemon on 
the cluster.
+   * Assumes that we already have a valid currentToken for communicating with 
the daemons.
+   */
+  private ByteString fetchTokenWithCurrentToken(final String appId) throws 
IOException, ServiceException {
+    UserGroupInformation ugi = getUgiWithCurrentToken();
+    return getTokenWithUgi(appId, ugi);
+  }
+
+  /**
+   * Tries to fetch token by LlapManagementProtocolClientImpl with a kerberos 
login.
+   * 1. Logs in to kerberos
+   * 2. Obtains an authenticated ugi
+   * 3. Fetches a delegation token from a daemon.
+   * This is supposed to be used if currentToken is not available or expired.
+   * Kerberos login is a valid use-case if there is no other way to 
authenticate and get a new token. It always
+   * means communication with the KDC, but if we use delegation token in at 
least ~90% of the time,
+   * KDC should not become the bottleneck.
+   */
+  private ByteString fetchTokenWithKerberosLogin(final String appId) throws 
IOException, ServiceException {
+    UserGroupInformation ugi = getUgiFromKerberosLogin();
+    return getTokenWithUgi(appId, ugi);
+  }
+
+  private UserGroupInformation getUgiWithCurrentToken() throws IOException {
+    String currentUser = 
UserGroupInformation.getCurrentUser().getShortUserName();
+    UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(currentUser);
+
+    String address = new URL(clientInstance.getServicesAddress()).getHost();
+    int port = clientInstance.getManagementPort();
+
+    InetSocketAddress socketAddr = NetUtils.createSocketAddrForHost(address, 
port);
+    LOG.debug("Setup token with {}:{}, socketAddr: {}", address, port, 
socketAddr);
+    SecurityUtil.setTokenService(currentToken, socketAddr);
+    ugi.addToken(currentToken);
+    return ugi;
+  }
+
+  private UserGroupInformation getUgiFromKerberosLogin() throws IOException {
+    String llapPrincipal = HiveConf.getVar(conf, 
ConfVars.LLAP_KERBEROS_PRINCIPAL),
+        llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+
+    LOG.info("Logging in using principal {} and keytab {}", llapPrincipal, 
llapKeytab);
+    return LlapUtil.loginWithKerberos(llapPrincipal, llapKeytab);
+  }
+
+  private ByteString getTokenWithUgi(final String appId, UserGroupInformation 
ugi) throws ServiceException {
     if (client == null) {
-      client = new LlapManagementProtocolClientImpl(conf, 
clientInstance.getHost(),
-          clientInstance.getManagementPort(), retryPolicy, socketFactory);
+      client = new LlapManagementProtocolClientImpl(conf, 
clientInstance.getHost(), clientInstance.getManagementPort(),
+          retryPolicy, socketFactory);
     }
+    client.withUgi(ugi);
+
     GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
     if (!StringUtils.isBlank(appId)) {
       req.setAppId(appId);
@@ -146,4 +212,13 @@ public class LlapTokenClient {
     return new ArrayList<LlapServiceInstance>(lastKnownInstances);
   }
 
+  /**
+   * An initial token can be set with this method. A typical path is
+   * LlapTaskCommunicator -- 
LlapProtocolClientProxy::initPeriodicTokenRefresh. If the LlapTaskCommunicator 
is
+   * instantiated an initial token, it can be used for subsequent calls, even 
for fetching new tokens.
+   */
+  public LlapTokenClient withCurrentToken(Token<LlapTokenIdentifier> token) {
+    this.currentToken = token;
+    return this;
+  }
 }
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
index 423b733558c..d5f26d557cd 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.llap.security;
 
 import java.io.IOException;
 
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.Token;
 import org.slf4j.Logger;
@@ -36,17 +35,14 @@ public class LlapTokenLocalClientImpl implements 
LlapTokenLocalClient {
   }
 
   @Override
-  public Token<LlapTokenIdentifier> createToken(
-      String appId, String user, boolean isSignatureRequired) throws 
IOException {
+  public Token<LlapTokenIdentifier> createToken(String appId, String user, 
boolean isSignatureRequired)
+      throws IOException {
     try {
-      Token<LlapTokenIdentifier> token = secretManager.createLlapToken(
-          appId, user, isSignatureRequired);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Created a LLAP delegation token locally: " + token);
-      }
+      Token<LlapTokenIdentifier> token = secretManager.createLlapToken(appId, 
user, isSignatureRequired);
+      LOG.info("Created a LLAP delegation token locally: {}", token);
       return token;
     } catch (Exception ex) {
-      throw (ex instanceof IOException) ? (IOException)ex : new 
IOException(ex);
+      throw (ex instanceof IOException) ? (IOException) ex : new 
IOException(ex);
     }
   }
 
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
index 67025311e8f..142b63d3493 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
@@ -15,6 +15,8 @@
 package org.apache.hadoop.hive.llap.tez;
 
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.net.SocketFactory;
@@ -38,13 +40,21 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFra
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFragmentResponseProto;
 import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
 import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.security.LlapTokenClient;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LlapProtocolClientProxy
   extends AsyncPbRpcProxy<LlapProtocolBlockingPB, LlapTokenIdentifier> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapProtocolClientProxy.class);
+  private static final long LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS = 300;
+
+  protected final ScheduledExecutorService newTokenChecker = 
Executors.newScheduledThreadPool(1);
+  private LlapTokenClient tokenClient;
 
   public LlapProtocolClientProxy(
       int numThreads, Configuration conf, Token<LlapTokenIdentifier> 
llapToken) {
@@ -55,6 +65,7 @@ public class LlapProtocolClientProxy
             TimeUnit.MILLISECONDS),
         HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
             TimeUnit.MILLISECONDS), -1, HiveConf.getIntVar(conf, 
ConfVars.LLAP_MAX_CONCURRENT_REQUESTS_PER_NODE));
+    initPeriodicTokenRefresh(conf);
   }
 
   public void registerDag(RegisterDagRequestProto request, String host, int 
port,
@@ -94,6 +105,30 @@ public class LlapProtocolClientProxy
     queueRequest(new SendUpdateFragmentCallable(nodeId, request, callback));
   }
 
+  protected void initPeriodicTokenRefresh(Configuration conf) {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    LOG.info("Initializing periodic token refresh in AM, will run in every 
{}s",
+        LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS);
+    tokenClient = new LlapTokenClient(conf);
+
+    newTokenChecker.scheduleAtFixedRate(this::fetchToken, 0, 
LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS,
+        TimeUnit.SECONDS);
+  }
+
+  private synchronized void fetchToken() {
+    LOG.debug("Trying to fetch a new token...");
+    try {
+      Token<LlapTokenIdentifier> newToken =
+          tokenClient.withCurrentToken(new 
Token<LlapTokenIdentifier>(token)).getDelegationToken(null);
+      LOG.debug("Received new token: {}, old was: {}", newToken, token);
+      setToken(newToken);
+    } catch (Exception e) {
+      LOG.error("Caught exception while fetching token", e);
+    }
+  }
+
   private class RegisterDagCallable extends
       NodeCallableRequest<RegisterDagRequestProto, RegisterDagResponseProto> {
     protected RegisterDagCallable(LlapNodeId nodeId,
@@ -202,4 +237,8 @@ public class LlapProtocolClientProxy
   protected void shutdownProtocolImpl(LlapProtocolBlockingPB client) {
     // Nothing to do.
   }
+
+  public void refreshToken() {
+    fetchToken();
+  }
 }
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
index 66e40829683..f4dba123eac 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
@@ -41,6 +41,7 @@ import javax.net.SocketFactory;
 
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 // TODO: LlapNodeId is just a host+port pair; we could make this class more 
generic.
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.retry.RetryPolicies;
@@ -80,8 +81,8 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType 
extends TokenIdent
   private final SocketFactory socketFactory;
   private final ListeningExecutorService requestManagerExecutor;
   private volatile ListenableFuture<Void> requestManagerFuture;
-  private final Token<TokenType> token;
-  private final String tokenUser;
+  protected Token<TokenType> token;
+  protected String tokenUser;
 
   @VisibleForTesting
   public static class RequestManager implements Callable<Void> {
@@ -516,7 +517,36 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
     }
     this.hostProxies = cb.build();
     this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
-    this.token = token;
+
+    try {
+      setToken(token);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+        connectionTimeoutMs, retrySleepMs, TimeUnit.MILLISECONDS);
+
+    this.requestManager = new RequestManager(numThreads, maxPerNode);
+    ExecutorService localExecutor = Executors.newFixedThreadPool(1,
+        new 
ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
+    this.requestManagerExecutor = 
MoreExecutors.listeningDecorator(localExecutor);
+
+    LOG.info("Setting up AsyncPbRpcProxy with" +
+        "numThreads=" + numThreads +
+        "retryTime(millis)=" + connectionTimeoutMs +
+        "retrySleep(millis)=" + retrySleepMs);
+  }
+
+  protected void setToken(Token<TokenType> newToken) throws IOException {
+    if (tokensAreEqual(newToken)) {
+      return;
+    }
+    LOG.info("Setting new token as it's not equal to the old one, new token 
is: {}", newToken);
+    // clear cache to make proxies use the new token
+    hostProxies.invalidateAll();
+
+    this.token = newToken;
     if (token != null) {
       String tokenUser = getTokenUser(token);
       if (tokenUser == null) {
@@ -531,19 +561,16 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
     } else {
       this.tokenUser = null;
     }
+  }
 
-    this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
-        connectionTimeoutMs, retrySleepMs, TimeUnit.MILLISECONDS);
-
-    this.requestManager = new RequestManager(numThreads, maxPerNode);
-    ExecutorService localExecutor = Executors.newFixedThreadPool(1,
-        new 
ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
-    this.requestManagerExecutor = 
MoreExecutors.listeningDecorator(localExecutor);
+  protected boolean tokensAreEqual(Token<TokenType> otherToken) throws 
IOException {
+    int oldSeqNumber =
+        this.token == null ? -1 : ((LlapTokenIdentifier) 
this.token.decodeIdentifier()).getSequenceNumber();
+    int newSeqNumber =
+        otherToken == null ? -1 : ((LlapTokenIdentifier) 
otherToken.decodeIdentifier()).getSequenceNumber();
 
-    LOG.info("Setting up AsyncPbRpcProxy with" +
-        "numThreads=" + numThreads +
-        "retryTime(millis)=" + connectionTimeoutMs +
-        "retrySleep(millis)=" + retrySleepMs);
+    LOG.debug("Check token equality be sequenceNumber: {} <-> {}", 
oldSeqNumber, newSeqNumber);
+    return oldSeqNumber == newSeqNumber;
   }
 
   /**
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
 
b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
index e811c0069a5..67fbde8e1ec 100644
--- 
a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
+++ 
b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
@@ -40,6 +40,8 @@ public class LlapManagementProtocolClientImpl implements 
LlapManagementProtocolP
   private final RetryPolicy retryPolicy;
   private final SocketFactory socketFactory;
   LlapManagementProtocolPB proxy;
+  private UserGroupInformation ugi;
+  private boolean ugiChanged = false;
 
 
   public LlapManagementProtocolClientImpl(Configuration conf, String hostname, 
int port,
@@ -56,17 +58,18 @@ public class LlapManagementProtocolClientImpl implements 
LlapManagementProtocolP
   }
 
   public LlapManagementProtocolPB getProxy() throws IOException {
-    if (proxy == null) {
+    if (proxy == null || ugiChanged) {
       proxy = createProxy();
+      ugiChanged = false;
     }
     return proxy;
   }
 
-  public LlapManagementProtocolPB createProxy() throws IOException {
+  private LlapManagementProtocolPB createProxy() throws IOException {
     RPC.setProtocolEngine(conf, LlapManagementProtocolPB.class, 
ProtobufRpcEngine.class);
     ProtocolProxy<LlapManagementProtocolPB> proxy =
         RPC.getProtocolProxy(LlapManagementProtocolPB.class, 0, serverAddr,
-            UserGroupInformation.getCurrentUser(), conf, socketFactory, 0, 
retryPolicy);
+            ugi == null ? UserGroupInformation.getCurrentUser() : ugi, conf, 
socketFactory, 0, retryPolicy);
     return proxy.getProxy();
   }
 
@@ -131,4 +134,13 @@ public class LlapManagementProtocolClientImpl implements 
LlapManagementProtocolP
     }
   }
 
+  /**
+   * Sets an UserGroupInformation instance to be used with this client (a new 
ugi might be for a new token).
+   * The field ugiChanged is for telling that e.g. new proxies have to be 
created.
+   */
+  public LlapManagementProtocolClientImpl withUgi(UserGroupInformation ugi) {
+    this.ugi = ugi;
+    this.ugiChanged = true;
+    return this;
+  }
 }
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
 
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
similarity index 96%
rename from 
llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
rename to 
llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
index 158c264bc6f..73deeb76aed 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
+++ 
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
@@ -57,8 +57,8 @@ public class LlapServerSecurityInfo extends SecurityInfo {
   @Override
   public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
     LOG.debug("Trying to get TokenInfo for {}", protocol);
-    // Tokens cannot be used for the management protocol (for now).
-    if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+    if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)
+        && !LlapManagementProtocolPB.class.isAssignableFrom(protocol)) return 
null;
     return new TokenInfo() {
       @Override
       public Class<? extends Annotation> annotationType() {
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index a2021a3a31e..78a1d4870bf 100644
--- 
a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ 
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapUtil;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -165,13 +164,20 @@ public class SecretManager extends 
ZKDelegationTokenSecretManager<LlapTokenIdent
 
   private static LlapZkConf createLlapZkConf(
       Configuration conf, String llapPrincipal, String llapKeytab, String 
clusterId) {
-     // Override the default delegation token lifetime for LLAP.
-     // Also set all the necessary ZK settings to defaults and LLAP configs, 
if not set.
-     final Configuration zkConf = new Configuration(conf);
+    // Override the default delegation token lifetime for LLAP.
+    // Also set all the necessary ZK settings to defaults and LLAP configs, if 
not set.
+    final Configuration zkConf = new Configuration(conf);
     long tokenLifetime = HiveConf.getTimeVar(
         conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS);
     zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
-    zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
+
+    long tokenRenewInterval = HiveConf.getTimeVar(
+        conf, ConfVars.LLAP_DELEGATION_TOKEN_RENEW_INTERVAL, TimeUnit.SECONDS);
+    zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenRenewInterval);
+
+    LOG.info("SecretManager ZkConf created: tokenLifetime {}, 
tokenRenewInterval: {}", tokenLifetime,
+        tokenRenewInterval);
+
     try {
       zkConf.set(ZK_DTSM_ZK_KERBEROS_PRINCIPAL,
           SecurityUtil.getServerPrincipal(llapPrincipal, "0.0.0.0"));
@@ -248,7 +254,6 @@ public class SecretManager extends 
ZKDelegationTokenSecretManager<LlapTokenIdent
     }
     LlapTokenIdentifier llapId = new LlapTokenIdentifier(
         new Text(user), renewer, realUser, clusterId, appId, 
isSignatureRequired);
-    // TODO: note that the token is not renewable right now and will last for 
2 weeks by default.
     Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, 
this);
     LOG.info("Created LLAP token {}", token);
     return token;
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DefaultLlapTokenManager.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DefaultLlapTokenManager.java
new file mode 100644
index 00000000000..a81cbd6ba0d
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DefaultLlapTokenManager.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.SecretManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DefaultLlapTokenManager is for renewing and recreating LLAP tokens on a 
cluster.
+ *
+ * This class runs in the LLAP Daemon, but the whole the process involves two 
components:
+ * AM(s): LlapTaskCommunicator -- LlapProtocolClientProxy
+ * Daemon(s): LlapTokenManager
+ *
+ * DefaultLlapTokenManager tracks llap token instances and handles them as:
+ * a) renews periodically when they get closer to expiry date
+ * b) recreates when they get close to max lifetime.
+ * This logic is needed because Hadoop does not have a unified way for all 
applications to do this,
+ * hence the LLAP_TOKEN specific implementation can be found in this class.
+ * The tokens are persisted by a zookeeper-based secretmanager 
(ZKDelegationTokenSecretManager), and the typical
+ * use-case can be like:
+ * 1. AM calls Daemons to obtain a token (LlapProtocolClientProxy), using an 
existing token, or by kerberos login
+ * 2. Daemon generates (if needed) and handles token which is then in its 
scope, and returns the token on request
+ * 3. on shutdown: Daemon cancels all tokens which were made by it
+ * 4. AM re-requests for a token from existing Daemons in case of 
communication failure (as the token is cancelled)
+ *
+ * This approach doesn't need a centralized component to handle tokens, as all 
deamons handle
+ * renewal/recreation of their tokens. The only known drawback is 4) above: if 
a Daemon stops/fails/crashes, AM should
+ * obtain a new token if it uses a token that was generated by that particular 
Daemon.
+ */
+public class DefaultLlapTokenManager implements LlapTokenManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLlapTokenManager.class);
+
+  private final ScheduledExecutorService tokenChecker = 
Executors.newScheduledThreadPool(1);
+  private final List<TokenWrapper> tokens = new ArrayList<>();
+  private SecretManager secretManager;
+  private String clusterUser;
+
+  /**
+   * Convenience wrapper for llap tokens, that decodes max lifetime only once 
on adding and maintains expiration time.
+   */
+  private class TokenWrapper {
+
+    private final Token<LlapTokenIdentifier> realToken;
+    private final long issueDate;
+    private final long maxDate;
+    private long expirationTime = 0;
+    private long renewalTime = 0;
+
+    public TokenWrapper(Token<LlapTokenIdentifier> token) {
+      this.realToken = token;
+      try {
+        renew(); // renew immediately in order to become aware of the 
expiration date
+        issueDate = token.decodeIdentifier().getIssueDate();
+        maxDate = token.decodeIdentifier().getMaxDate();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private void renew() {
+      LOG.info("Renewing token: " + realToken);
+      try {
+        expirationTime = secretManager.renewToken(realToken, clusterUser);
+        renewalTime = Time.now();
+        LOG.info("Renewed token: " + realToken);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public String toString() {
+      return realToken.toString();
+    }
+  }
+
+  public DefaultLlapTokenManager(Configuration conf, SecretManager 
secretManager) {
+    this.secretManager = secretManager;
+    try {
+      this.clusterUser = 
UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    LOG.info("Initializing periodic token refresh in daemon, will run in every 
{}s",
+        LLAP_TOKEN_CHECK_INTERVAL_IN_DAEMON_SECONDS);
+    tokenChecker.scheduleAtFixedRate(() -> {
+      LOG.debug("Checking tokens, count: {}", tokens.size());
+      ListIterator<TokenWrapper> tokensIt = tokens.listIterator();
+      while (tokensIt.hasNext()) {
+        TokenWrapper token = tokensIt.next();
+        if (needsRecreate(token)) {
+          /* If the token needs to be recreated, it's because maxDate is 
close. We can remove it here,
+           * so it won't be returned by this daemon anymore. The cancelToken 
represents the same in the ZK-based
+           * secretManager (removes the token), so additional cleanup is not 
needed.
+           */
+          try {
+            LOG.info("Cancelling token: {}", token.realToken);
+            secretManager.cancelToken(token.realToken, clusterUser);
+            tokensIt.remove();
+          } catch (IOException e) {
+            LOG.error("Error while cancelling token: {}", token.realToken, e);
+          }
+        } else if (needsRenew(token)) {
+          token.renew();
+        }
+      }
+    }, 0, LLAP_TOKEN_CHECK_INTERVAL_IN_DAEMON_SECONDS, TimeUnit.SECONDS);
+  }
+
+  private boolean needsRecreate(TokenWrapper token) {
+    long now = Time.now();
+    long tokenWholeLifeTimeMs = token.maxDate - token.issueDate;
+    long tokenRemainingLifeTimeMs = Math.max(token.maxDate - now, 0);
+    // a number which tells how close if maxDate, e.g. 0.1 means only 10% of 
the token lifetime left
+    double tokenRemainingLifeTimePercent = (double) tokenRemainingLifeTimeMs / 
tokenWholeLifeTimeMs;
+    boolean needsRecreate = tokenRemainingLifeTimePercent < 0.1;
+
+    LOG.debug(
+        "Token needsRecreate? {}, now: {}, maxDate: {}, issueDate: {}, 
tokenWholeLifeTime(ms): {}, "
+            + "tokenRemainingLifeTime(ms): {}, tokenRemainingLifeTimePercent: 
{}%",
+        needsRecreate, now, token.maxDate, token.issueDate, 
tokenWholeLifeTimeMs, tokenRemainingLifeTimeMs,
+        toPercentString(tokenRemainingLifeTimePercent));
+    return needsRecreate;
+  }
+
+  private boolean needsRenew(TokenWrapper token) {
+    long now = Time.now();
+    long tokenWholeValidityPeriodMs = token.expirationTime - token.renewalTime;
+    long tokenRemainingValidityPeriodMs = Math.max(token.expirationTime - now, 
0);
+    // a number which tells how close is token expiry time, e.g. 0.1 means 
only 10% of the token validity period left
+    double tokenRemainingValidityPeriodPercent = (double) 
tokenRemainingValidityPeriodMs / tokenWholeValidityPeriodMs;
+    boolean needsRenew = tokenRemainingValidityPeriodPercent < 0.1;
+
+    LOG.debug(
+        "Token needsRenew? {}, now: {}, expirationTime: {}, renewalTime: {}, 
tokenWholeValidityPeriod(ms): {},"
+            + " tokenRemainingValidityPeriod(ms): {}, 
tokenRelativeRemainingValidityPeriod: {}%",
+        needsRenew, now, token.expirationTime, token.renewalTime, 
tokenWholeValidityPeriodMs,
+        tokenRemainingValidityPeriodMs, 
toPercentString(tokenRemainingValidityPeriodPercent));
+    return needsRenew;
+  }
+
+  private String toPercentString(double dblNumber) {
+    return Double.toString((double)Math.round(dblNumber * 1000) / 10);
+  }
+
+  @Override
+  public Token<LlapTokenIdentifier> getToken(GetTokenRequestProto request, 
boolean isSigningRequired)
+      throws IOException {
+    Token<LlapTokenIdentifier> token = tokens.isEmpty() ? null : 
tokens.get(0).realToken;
+
+    if (token == null) {
+      token = generateToken(request, isSigningRequired);
+    } else {
+      LOG.debug("Returning already existing token: {}", token);
+    }
+
+    return token;
+  }
+
+  private Token<LlapTokenIdentifier> generateToken(GetTokenRequestProto 
request, boolean isSigningRequired)
+      throws IOException {
+    Token<LlapTokenIdentifier> token =
+        secretManager.createLlapToken(request.hasAppId() ? request.getAppId() 
: null, null, isSigningRequired);
+    tokens.add(new TokenWrapper(token));
+
+    LOG.info("Added new token: {}, #tokens: {}", token, tokens.size());
+    return token;
+  }
+
+  @Override
+  public void close() {
+    cancelTokens(tokens);
+  }
+
+  private void cancelTokens(List<TokenWrapper> tokensToCancel) {
+    ListIterator<TokenWrapper> tokensIt = tokensToCancel.listIterator();
+    while (tokensIt.hasNext()) {
+      TokenWrapper token = tokensIt.next();
+      try {
+        secretManager.cancelToken(token.realToken, clusterUser);
+        tokensIt.remove();
+      } catch (IOException e) {
+        LOG.warn("Cannot cancel token while shutting down (on IOException): " 
+ token + ", giving up", e);
+      }
+    }
+  }
+}
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DummyTokenManager.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DummyTokenManager.java
new file mode 100644
index 00000000000..3a24c876581
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DummyTokenManager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+public class DummyTokenManager implements LlapTokenManager {
+
+  @Override
+  public Token<LlapTokenIdentifier> getToken(GetTokenRequestProto request, 
boolean isSigningRequired) {
+    return null;
+  }
+
+  @Override
+  public void close() {
+  }
+}
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index d18f74b87ee..4f04793d33e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.security.LlapExtClientJwtHelper;
 import org.apache.hadoop.hive.llap.security.LlapUgiFactoryFactory;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.security.SecretManager;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
@@ -80,6 +81,7 @@ import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecke
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
@@ -122,6 +124,7 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
   private final String[] localDirs;
   private final DaemonId daemonId;
   private final SocketFactory socketFactory;
+  private final LlapTokenManager llapTokenManager;
 
   // TODO Not the best way to share the address
   private final AtomicReference<InetSocketAddress> srvAddress = new 
AtomicReference<>(),
@@ -322,10 +325,14 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
     SecretManager sm = null;
     if (UserGroupInformation.isSecurityEnabled()) {
       sm = SecretManager.createSecretManager(daemonConf, 
daemonId.getClusterString());
+      this.llapTokenManager = new DefaultLlapTokenManager(daemonConf, sm);
+    } else {
+      this.llapTokenManager = new DummyTokenManager();
     }
+
     this.secretManager = sm;
-    this.server = new LlapProtocolServerImpl(secretManager,
-        numHandlers, this, srvAddress, mngAddress, srvPort, 
externalClientsRpcPort, mngPort, daemonId, metrics);
+    this.server = new LlapProtocolServerImpl(secretManager, numHandlers, this, 
srvAddress, mngAddress, srvPort,
+        externalClientsRpcPort, mngPort, daemonId, 
metrics).withTokenManager(this.llapTokenManager);
 
     UgiFactory fsUgiFactory = null;
     try {
@@ -528,6 +535,10 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
 
   public void shutdown() {
     LOG.info("LlapDaemon shutdown invoked");
+
+    // invalidate tokens
+    this.llapTokenManager.close();
+
     if (llapDaemonInfoBean != null) {
       try {
         MBeans.unregister(llapDaemonInfoBean);
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index 26edaf5ce69..7203a37b9ac 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -84,9 +84,9 @@ public class LlapProtocolServerImpl extends AbstractService
   private final SecretManager secretManager;
   private String clusterUser = null;
   private boolean isRestrictedToClusterUser = false;
-  private final DaemonId daemonId;
   private final LlapDaemonExecutorMetrics executorMetrics;
   private TokenRequiresSigning isSigningRequiredConfig = 
TokenRequiresSigning.TRUE;
+  private LlapTokenManager llapTokenManager = new DummyTokenManager();
 
   public LlapProtocolServerImpl(SecretManager secretManager, int numHandlers,
       ContainerRunner containerRunner, AtomicReference<InetSocketAddress> 
srvAddress,
@@ -102,7 +102,6 @@ public class LlapProtocolServerImpl extends AbstractService
     this.mngAddress = mngAddress;
     this.externalClientsRpcPort = externalClientsRpcPort;
     this.mngPort = mngPort;
-    this.daemonId = daemonId;
     this.executorMetrics = executorMetrics;
     LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() +
         " with port configured to: " + srvPort);
@@ -292,8 +291,7 @@ public class LlapProtocolServerImpl extends AbstractService
       callingUser = UserGroupInformation.getCurrentUser();
       // Determine if the user would need to sign fragments.
       boolean isSigningRequired = determineIfSigningIsRequired(callingUser);
-      token = secretManager.createLlapToken(
-          request.hasAppId() ? request.getAppId() : null, null, 
isSigningRequired);
+      token = llapTokenManager.getToken(request, isSigningRequired);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -395,4 +393,9 @@ public class LlapProtocolServerImpl extends AbstractService
     default: throw new AssertionError("Unknown value " + 
isSigningRequiredConfig);
     }
   }
+
+  public LlapProtocolServerImpl withTokenManager(LlapTokenManager 
llapTokenManager) {
+    this.llapTokenManager = llapTokenManager;
+    return this;
+  }
 }
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenManager.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenManager.java
new file mode 100644
index 00000000000..06b197b8dd5
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenManager.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * LlapTokenManager is for renewing and recreating LLAP tokens on a cluster.
+ * See further details in implementations.
+ */
+public interface LlapTokenManager {
+  long LLAP_TOKEN_CHECK_INTERVAL_IN_DAEMON_SECONDS = 300;
+
+  Token<LlapTokenIdentifier> getToken(GetTokenRequestProto request, boolean 
isSigningRequired) throws IOException;
+
+  void close();
+}
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index c5bb75b8ec6..1b23b6a5480 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -18,17 +18,14 @@ import org.apache.hadoop.hive.conf.Validator.RangeValidator;
 import 
org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService.NodeInfo;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
-import org.apache.hadoop.io.Writable;
 import 
org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray;
 import 
org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
 
 import java.io.IOException;
 import java.net.BindException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -42,7 +39,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -77,7 +73,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -91,7 +86,6 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
@@ -105,7 +99,6 @@ import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.DagInfo;
-import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.slf4j.Logger;
@@ -115,7 +108,6 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapTaskCommunicator.class);
 
-  private static final String RESOURCE_URI_STR = "/ws/v1/applicationhistory";
   private static final Joiner JOINER = Joiner.on("");
   private static final Joiner PATH_JOINER = Joiner.on("/");
   private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap;
@@ -130,7 +122,6 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   private LlapProtocolClientProxy communicator;
   private long deleteDelayOnDagComplete;
   private final LlapTaskUmbilicalProtocol umbilical;
-  private final Token<LlapTokenIdentifier> token;
   private final String user;
   private String amHost;
   private String timelineServerUri;
@@ -155,17 +146,6 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   public LlapTaskCommunicator(
       TaskCommunicatorContext taskCommunicatorContext) {
     super(taskCommunicatorContext);
-    Credentials credentials = taskCommunicatorContext.getAMCredentials();
-    if (credentials != null) {
-      @SuppressWarnings("unchecked")
-      Token<LlapTokenIdentifier> llapToken =
-          
(Token<LlapTokenIdentifier>)credentials.getToken(LlapTokenIdentifier.KIND_NAME);
-      this.token = llapToken;
-    } else {
-      this.token = null;
-    }
-    LOG.info("Task communicator with a token " + token);
-    Preconditions.checkState((token != null) == 
UserGroupInformation.isSecurityEnabled());
 
     // Not closing this at the moment at shutdown, since this could be a 
shared instance.
     serviceRegistry = LlapRegistryService.getClient(conf);
@@ -190,31 +170,48 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  private Token<LlapTokenIdentifier> getLlapToken() {
+    Token<LlapTokenIdentifier> token = null;
+    Credentials credentials = getContext().getAMCredentials();
+    if (credentials != null) {
+      token = (Token<LlapTokenIdentifier>) 
credentials.getToken(LlapTokenIdentifier.KIND_NAME);
+    }
+    // supposed to have a token only if security is enabled (any mismatch here 
implies a configuration problem)
+    Preconditions.checkState((token != null) == 
UserGroupInformation.isSecurityEnabled());
+    if (token != null) {
+      LOG.info("Task communicator with a token {}", token);
+    }
+    return token;
+  }
+
   void setScheduler(LlapTaskSchedulerService peer) {
     this.scheduler = peer;
   }
 
   private static final String LLAP_TOKEN_NAME = 
LlapTokenIdentifier.KIND_NAME.toString();
 
-  /**
-   * @return true iff the error is fatal and we should give up on everything.
-   */
-  private boolean processSendError(Throwable t) {
+  private void processSendError(Throwable t) {
     Throwable cause = t;
     while (cause != null) {
-      if (cause instanceof RetriableException) return false;
-      if (((cause instanceof InvalidToken && cause.getMessage() != null)
-          || (cause instanceof RemoteException && cause.getCause() == null
-              && cause.getMessage() != null && 
cause.getMessage().contains("InvalidToken")))
-          && cause.getMessage().contains(LLAP_TOKEN_NAME)) {
-        break;
+      if (isInvalidTokenError(cause)) {
+        handleInvalidToken();
+        return;
       }
       cause = cause.getCause();
     }
-    if (cause == null) return false;
-    LOG.error("Reporting fatal error - LLAP token appears to be invalid.", t);
-    getContext().reportError(ServicePluginErrorDefaults.OTHER_FATAL, 
cause.getMessage(), null);
-    return true;
+  }
+
+  private boolean isInvalidTokenError(Throwable cause) {
+    LOG.debug("Checking for invalid token error, cause: {}, cause.getCause(): 
{}", cause, cause.getCause());
+    /*
+     * The lastest message discovered (while doing HIVE-26569) for a 
RemoteException is:
+     * "Current (LLAP_TOKEN; LLAP_TOKEN owner=hive/***, renewer=hive, 
realUser=, issueDate=1670317803579, maxDate=1671527403579,
+     * sequenceNumber=297, masterKeyId=296, cluster ***, app ID , signing 
false) can't be found in cache"
+     */
+    return ((cause instanceof InvalidToken && cause.getMessage() != null) || 
(cause instanceof RemoteException
+        && cause.getCause() == null && cause.getMessage() != null && 
cause.getMessage().contains(LLAP_TOKEN_NAME)
+        && (cause.getMessage().contains("InvalidToken") || 
cause.getMessage().contains("can't be found in cache"))));
   }
 
   @Override
@@ -344,7 +341,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
   @VisibleForTesting
   protected LlapProtocolClientProxy createLlapProtocolClientProxy(int 
numThreads, Configuration conf) {
-    return new LlapProtocolClientProxy(numThreads, conf, token);
+    return new LlapProtocolClientProxy(numThreads, conf, getLlapToken());
   }
 
   @Override
@@ -403,9 +400,8 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
                   + " appId=" + 
currentQueryIdentifierProto.getApplicationIdString()
                   + " dagId=" + currentQueryIdentifierProto.getDagIndex()
                   + " to node " + node.getHost());
-              if (!processSendError(t)) {
-                callback.setError(null, t);
-              }
+              processSendError(t);
+              callback.setError(null, t);
             }
           });
     } catch (IOException e) {
@@ -442,9 +438,8 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
           @Override
           public void indicateError(Throwable t) {
             LOG.warn("Failed to send update fragment request for {}", 
attemptId.toString());
-            if (!processSendError(t)) {
-              callback.setError(ctx, t);
-            }
+            processSendError(t);
+            callback.setError(ctx, t);
           }
         });
   }
@@ -551,6 +546,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
               ServiceException se = (ServiceException) t;
               t = se.getCause();
             }
+
             if (t instanceof RemoteException) {
               // All others from the remote service cause the task to FAIL.
               LOG.info(
@@ -566,7 +562,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
                 LOG.info(
                     "Unable to run task: " + taskSpec.getTaskAttemptID() + " 
on containerId: " +
                         containerId + ", Communication Error");
-                 processSendError(originalError);
+                processSendError(originalError);
                 getContext().taskKilled(taskSpec.getTaskAttemptID(),
                     TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication 
Error");
               } else {
@@ -574,7 +570,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
                 LOG.info(
                     "Failed to run task: " + taskSpec.getTaskAttemptID() + " 
on containerId: " +
                         containerId, t);
-                 processSendError(originalError);
+                processSendError(originalError);
                 getContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), 
TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER,
                         t.getMessage());
@@ -584,6 +580,17 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
         });
   }
 
+  /**
+   * This is a best effort action in case of an invalid LLAP_TOKEN. The 
communicator (LlapProtocolClientProxy) periodically
+   * refreshes tokens, but there is a relatively small time window (compared 
to the whole token maxLifetime),
+   * when the token is cancelled and it's not yet fetched from daemons. By the 
time we detect this problem, the current
+   * task attempt is already failed, but usually there are at least 3 task 
attempts before failing the task and dag, therefore
+   * fetching an LLAP_TOKEN synchronously here definitely solves the invalid 
token problem (as the next task attempt will use it).
+   */
+  private void handleInvalidToken() {
+    this.communicator.refreshToken();
+  }
+
   @Override
   public void unregisterRunningTaskAttempt(final TezTaskAttemptID 
taskAttemptId,
                                            TaskAttemptEndReason endReason,
@@ -623,7 +630,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
             public void indicateError(Throwable t) {
               LOG.warn("Failed to send terminate fragment request for {}",
                   taskAttemptId.toString());
-               processSendError(t);
+              processSendError(t);
             }
           });
     } else {

Reply via email to