This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e427ce0d572 HIVE-26569: Support renewal and recreation of LLAP_TOKENs
(#3626) (Laszlo Bodor reviewed by Ayush Saxena)
e427ce0d572 is described below
commit e427ce0d572c9adf6f194693a1b3ba85f246f3b7
Author: Bodor Laszlo <[email protected]>
AuthorDate: Thu Dec 8 12:17:36 2022 +0100
HIVE-26569: Support renewal and recreation of LLAP_TOKENs (#3626) (Laszlo
Bodor reviewed by Ayush Saxena)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../hadoop/hive/llap/security/LlapTokenClient.java | 91 ++++++++-
.../llap/security/LlapTokenLocalClientImpl.java | 14 +-
.../hive/llap/tez/LlapProtocolClientProxy.java | 39 ++++
.../apache/hadoop/hive/llap/AsyncPbRpcProxy.java | 55 ++++--
.../impl/LlapManagementProtocolClientImpl.java | 18 +-
.../hive/llap/security/LlapServerSecurityInfo.java | 4 +-
.../hadoop/hive/llap/security/SecretManager.java | 17 +-
.../llap/daemon/impl/DefaultLlapTokenManager.java | 215 +++++++++++++++++++++
.../hive/llap/daemon/impl/DummyTokenManager.java | 30 +++
.../hadoop/hive/llap/daemon/impl/LlapDaemon.java | 15 +-
.../llap/daemon/impl/LlapProtocolServerImpl.java | 11 +-
.../hive/llap/daemon/impl/LlapTokenManager.java | 32 +++
.../hive/llap/tezplugins/LlapTaskCommunicator.java | 95 ++++-----
14 files changed, 548 insertions(+), 93 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 24b9d0eb2b6..44452b970e5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -5060,10 +5060,13 @@ public class HiveConf extends Configuration {
"cluster by setting it to true or 'except_llap_owner' (the latter
returns such tokens\n" +
"to everyone except the user LLAP cluster is authenticating under)."),
- // Hadoop DelegationTokenManager default is 1 week.
LLAP_DELEGATION_TOKEN_LIFETIME("hive.llap.daemon.delegation.token.lifetime",
"14d",
new TimeValidator(TimeUnit.SECONDS),
"LLAP delegation token lifetime, in seconds if specified without a
unit."),
+
LLAP_DELEGATION_TOKEN_RENEW_INTERVAL("hive.llap.daemon.delegation.token.renew.interval",
"1d",
+ new TimeValidator(TimeUnit.SECONDS),
+ "LLAP delegation token renew interval, in seconds if specified without
a unit."
+ + "Tokens are typically renewed in the LlapDaemons by
LlapTokenManager currently."),
LLAP_MANAGEMENT_RPC_PORT("hive.llap.management.rpc.port", 15004,
"RPC port for LLAP daemon management service."),
LLAP_WEB_AUTO_AUTH("hive.llap.auto.auth", false,
diff --git
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
index 3208e21269d..e153e470c2e 100644
---
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
+++
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -19,11 +19,12 @@
package org.apache.hadoop.hive.llap.security;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory;
@@ -31,16 +32,18 @@ import javax.net.SocketFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapUtil;
import
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
-import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.registry.ServiceInstanceSet;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
@@ -60,6 +63,7 @@ public class LlapTokenClient {
private Collection<LlapServiceInstance> lastKnownInstances;
private LlapManagementProtocolClientImpl client;
private LlapServiceInstance clientInstance;
+ private Token<LlapTokenIdentifier> currentToken;
public LlapTokenClient(Configuration conf) {
this.conf = conf;
@@ -89,6 +93,7 @@ public class LlapTokenClient {
LOG.error("Cannot get a token, trying a different instance", ex);
client = null;
clientInstance = null;
+ currentToken = null;
}
if (llaps == null || !llaps.hasNext()) {
if (hasRefreshed) { // Only refresh once.
@@ -101,9 +106,8 @@ public class LlapTokenClient {
}
Token<LlapTokenIdentifier> token = extractToken(tokenBytes);
- if (LOG.isInfoEnabled()) {
- LOG.info("Obtained a LLAP delegation token from " + clientInstance + ":
" + token);
- }
+ LOG.info("Obtained a LLAP delegation token from {}: {}", clientInstance,
token);
+
return token;
}
@@ -117,10 +121,72 @@ public class LlapTokenClient {
private ByteString getTokenBytes(final String appId) throws IOException,
ServiceException {
assert clientInstance != null;
+ if (currentToken != null) {
+ try {
+ return fetchTokenWithCurrentToken(appId);
+ } catch (IOException | ServiceException e) {
+ LOG.warn("Exception while getting delegation token, trying with
kerberos login", e);
+ return fetchTokenWithKerberosLogin(appId);
+ }
+ } else {
+ LOG.info("currentToken is null, let's try to fetch one with kerberos
login");
+ return fetchTokenWithKerberosLogin(appId);
+ }
+ }
+
+ /**
+ * Tries to fetch token by LlapManagementProtocolClientImpl from a daemon on
the cluster.
+ * Assumes that we already have a valid currentToken for communicating with
the daemons.
+ */
+ private ByteString fetchTokenWithCurrentToken(final String appId) throws
IOException, ServiceException {
+ UserGroupInformation ugi = getUgiWithCurrentToken();
+ return getTokenWithUgi(appId, ugi);
+ }
+
+ /**
+ * Tries to fetch token by LlapManagementProtocolClientImpl with a kerberos
login.
+ * 1. Logs in to kerberos
+ * 2. Obtains an authenticated ugi
+ * 3. Fetches a delegation token from a daemon.
+ * This is supposed to be used if currentToken is not available or expired.
+ * Kerberos login is a valid use-case if there is no other way to
authenticate and get a new token. It always
+ * means communication with the KDC, but if we use delegation token in at
least ~90% of the time,
+ * KDC should not become the bottleneck.
+ */
+ private ByteString fetchTokenWithKerberosLogin(final String appId) throws
IOException, ServiceException {
+ UserGroupInformation ugi = getUgiFromKerberosLogin();
+ return getTokenWithUgi(appId, ugi);
+ }
+
+ private UserGroupInformation getUgiWithCurrentToken() throws IOException {
+ String currentUser =
UserGroupInformation.getCurrentUser().getShortUserName();
+ UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(currentUser);
+
+ String address = new URL(clientInstance.getServicesAddress()).getHost();
+ int port = clientInstance.getManagementPort();
+
+ InetSocketAddress socketAddr = NetUtils.createSocketAddrForHost(address,
port);
+ LOG.debug("Setup token with {}:{}, socketAddr: {}", address, port,
socketAddr);
+ SecurityUtil.setTokenService(currentToken, socketAddr);
+ ugi.addToken(currentToken);
+ return ugi;
+ }
+
+ private UserGroupInformation getUgiFromKerberosLogin() throws IOException {
+ String llapPrincipal = HiveConf.getVar(conf,
ConfVars.LLAP_KERBEROS_PRINCIPAL),
+ llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+
+ LOG.info("Logging in using principal {} and keytab {}", llapPrincipal,
llapKeytab);
+ return LlapUtil.loginWithKerberos(llapPrincipal, llapKeytab);
+ }
+
+ private ByteString getTokenWithUgi(final String appId, UserGroupInformation
ugi) throws ServiceException {
if (client == null) {
- client = new LlapManagementProtocolClientImpl(conf,
clientInstance.getHost(),
- clientInstance.getManagementPort(), retryPolicy, socketFactory);
+ client = new LlapManagementProtocolClientImpl(conf,
clientInstance.getHost(), clientInstance.getManagementPort(),
+ retryPolicy, socketFactory);
}
+ client.withUgi(ugi);
+
GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
if (!StringUtils.isBlank(appId)) {
req.setAppId(appId);
@@ -146,4 +212,13 @@ public class LlapTokenClient {
return new ArrayList<LlapServiceInstance>(lastKnownInstances);
}
+ /**
+ * An initial token can be set with this method. A typical path is
+ * LlapTaskCommunicator --
LlapProtocolClientProxy::initPeriodicTokenRefresh. If the LlapTaskCommunicator
is
+ * instantiated an initial token, it can be used for subsequent calls, even
for fetching new tokens.
+ */
+ public LlapTokenClient withCurrentToken(Token<LlapTokenIdentifier> token) {
+ this.currentToken = token;
+ return this;
+ }
}
diff --git
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
index 423b733558c..d5f26d557cd 100644
---
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
+++
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClientImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.llap.security;
import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
@@ -36,17 +35,14 @@ public class LlapTokenLocalClientImpl implements
LlapTokenLocalClient {
}
@Override
- public Token<LlapTokenIdentifier> createToken(
- String appId, String user, boolean isSignatureRequired) throws
IOException {
+ public Token<LlapTokenIdentifier> createToken(String appId, String user,
boolean isSignatureRequired)
+ throws IOException {
try {
- Token<LlapTokenIdentifier> token = secretManager.createLlapToken(
- appId, user, isSignatureRequired);
- if (LOG.isInfoEnabled()) {
- LOG.info("Created a LLAP delegation token locally: " + token);
- }
+ Token<LlapTokenIdentifier> token = secretManager.createLlapToken(appId,
user, isSignatureRequired);
+ LOG.info("Created a LLAP delegation token locally: {}", token);
return token;
} catch (Exception ex) {
- throw (ex instanceof IOException) ? (IOException)ex : new
IOException(ex);
+ throw (ex instanceof IOException) ? (IOException) ex : new
IOException(ex);
}
}
diff --git
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
index 67025311e8f..142b63d3493 100644
---
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
+++
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
@@ -15,6 +15,8 @@
package org.apache.hadoop.hive.llap.tez;
import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory;
@@ -38,13 +40,21 @@ import
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFra
import
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFragmentResponseProto;
import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.security.LlapTokenClient;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LlapProtocolClientProxy
extends AsyncPbRpcProxy<LlapProtocolBlockingPB, LlapTokenIdentifier> {
+ private static final Logger LOG =
LoggerFactory.getLogger(LlapProtocolClientProxy.class);
+ private static final long LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS = 300;
+
+ protected final ScheduledExecutorService newTokenChecker =
Executors.newScheduledThreadPool(1);
+ private LlapTokenClient tokenClient;
public LlapProtocolClientProxy(
int numThreads, Configuration conf, Token<LlapTokenIdentifier>
llapToken) {
@@ -55,6 +65,7 @@ public class LlapProtocolClientProxy
TimeUnit.MILLISECONDS),
HiveConf.getTimeVar(conf,
ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
TimeUnit.MILLISECONDS), -1, HiveConf.getIntVar(conf,
ConfVars.LLAP_MAX_CONCURRENT_REQUESTS_PER_NODE));
+ initPeriodicTokenRefresh(conf);
}
public void registerDag(RegisterDagRequestProto request, String host, int
port,
@@ -94,6 +105,30 @@ public class LlapProtocolClientProxy
queueRequest(new SendUpdateFragmentCallable(nodeId, request, callback));
}
+ protected void initPeriodicTokenRefresh(Configuration conf) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ LOG.info("Initializing periodic token refresh in AM, will run in every
{}s",
+ LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS);
+ tokenClient = new LlapTokenClient(conf);
+
+ newTokenChecker.scheduleAtFixedRate(this::fetchToken, 0,
LLAP_TOKEN_REFRESH_INTERVAL_IN_AM_SECONDS,
+ TimeUnit.SECONDS);
+ }
+
+ private synchronized void fetchToken() {
+ LOG.debug("Trying to fetch a new token...");
+ try {
+ Token<LlapTokenIdentifier> newToken =
+ tokenClient.withCurrentToken(new
Token<LlapTokenIdentifier>(token)).getDelegationToken(null);
+ LOG.debug("Received new token: {}, old was: {}", newToken, token);
+ setToken(newToken);
+ } catch (Exception e) {
+ LOG.error("Caught exception while fetching token", e);
+ }
+ }
+
private class RegisterDagCallable extends
NodeCallableRequest<RegisterDagRequestProto, RegisterDagResponseProto> {
protected RegisterDagCallable(LlapNodeId nodeId,
@@ -202,4 +237,8 @@ public class LlapProtocolClientProxy
protected void shutdownProtocolImpl(LlapProtocolBlockingPB client) {
// Nothing to do.
}
+
+ public void refreshToken() {
+ fetchToken();
+ }
}
diff --git
a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
index 66e40829683..f4dba123eac 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
@@ -41,6 +41,7 @@ import javax.net.SocketFactory;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
// TODO: LlapNodeId is just a host+port pair; we could make this class more
generic.
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.retry.RetryPolicies;
@@ -80,8 +81,8 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType
extends TokenIdent
private final SocketFactory socketFactory;
private final ListeningExecutorService requestManagerExecutor;
private volatile ListenableFuture<Void> requestManagerFuture;
- private final Token<TokenType> token;
- private final String tokenUser;
+ protected Token<TokenType> token;
+ protected String tokenUser;
@VisibleForTesting
public static class RequestManager implements Callable<Void> {
@@ -516,7 +517,36 @@ public abstract class AsyncPbRpcProxy<ProtocolType,
TokenType extends TokenIdent
}
this.hostProxies = cb.build();
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
- this.token = token;
+
+ try {
+ setToken(token);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+ connectionTimeoutMs, retrySleepMs, TimeUnit.MILLISECONDS);
+
+ this.requestManager = new RequestManager(numThreads, maxPerNode);
+ ExecutorService localExecutor = Executors.newFixedThreadPool(1,
+ new
ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
+ this.requestManagerExecutor =
MoreExecutors.listeningDecorator(localExecutor);
+
+ LOG.info("Setting up AsyncPbRpcProxy with" +
+ "numThreads=" + numThreads +
+ "retryTime(millis)=" + connectionTimeoutMs +
+ "retrySleep(millis)=" + retrySleepMs);
+ }
+
+ protected void setToken(Token<TokenType> newToken) throws IOException {
+ if (tokensAreEqual(newToken)) {
+ return;
+ }
+ LOG.info("Setting new token as it's not equal to the old one, new token
is: {}", newToken);
+ // clear cache to make proxies use the new token
+ hostProxies.invalidateAll();
+
+ this.token = newToken;
if (token != null) {
String tokenUser = getTokenUser(token);
if (tokenUser == null) {
@@ -531,19 +561,16 @@ public abstract class AsyncPbRpcProxy<ProtocolType,
TokenType extends TokenIdent
} else {
this.tokenUser = null;
}
+ }
- this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
- connectionTimeoutMs, retrySleepMs, TimeUnit.MILLISECONDS);
-
- this.requestManager = new RequestManager(numThreads, maxPerNode);
- ExecutorService localExecutor = Executors.newFixedThreadPool(1,
- new
ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
- this.requestManagerExecutor =
MoreExecutors.listeningDecorator(localExecutor);
+ protected boolean tokensAreEqual(Token<TokenType> otherToken) throws
IOException {
+ int oldSeqNumber =
+ this.token == null ? -1 : ((LlapTokenIdentifier)
this.token.decodeIdentifier()).getSequenceNumber();
+ int newSeqNumber =
+ otherToken == null ? -1 : ((LlapTokenIdentifier)
otherToken.decodeIdentifier()).getSequenceNumber();
- LOG.info("Setting up AsyncPbRpcProxy with" +
- "numThreads=" + numThreads +
- "retryTime(millis)=" + connectionTimeoutMs +
- "retrySleep(millis)=" + retrySleepMs);
+ LOG.debug("Check token equality be sequenceNumber: {} <-> {}",
oldSeqNumber, newSeqNumber);
+ return oldSeqNumber == newSeqNumber;
}
/**
diff --git
a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
index e811c0069a5..67fbde8e1ec 100644
---
a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
+++
b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
@@ -40,6 +40,8 @@ public class LlapManagementProtocolClientImpl implements
LlapManagementProtocolP
private final RetryPolicy retryPolicy;
private final SocketFactory socketFactory;
LlapManagementProtocolPB proxy;
+ private UserGroupInformation ugi;
+ private boolean ugiChanged = false;
public LlapManagementProtocolClientImpl(Configuration conf, String hostname,
int port,
@@ -56,17 +58,18 @@ public class LlapManagementProtocolClientImpl implements
LlapManagementProtocolP
}
public LlapManagementProtocolPB getProxy() throws IOException {
- if (proxy == null) {
+ if (proxy == null || ugiChanged) {
proxy = createProxy();
+ ugiChanged = false;
}
return proxy;
}
- public LlapManagementProtocolPB createProxy() throws IOException {
+ private LlapManagementProtocolPB createProxy() throws IOException {
RPC.setProtocolEngine(conf, LlapManagementProtocolPB.class,
ProtobufRpcEngine.class);
ProtocolProxy<LlapManagementProtocolPB> proxy =
RPC.getProtocolProxy(LlapManagementProtocolPB.class, 0, serverAddr,
- UserGroupInformation.getCurrentUser(), conf, socketFactory, 0,
retryPolicy);
+ ugi == null ? UserGroupInformation.getCurrentUser() : ugi, conf,
socketFactory, 0, retryPolicy);
return proxy.getProxy();
}
@@ -131,4 +134,13 @@ public class LlapManagementProtocolClientImpl implements
LlapManagementProtocolP
}
}
+ /**
+ * Sets an UserGroupInformation instance to be used with this client (a new
ugi might be for a new token).
+ * The field ugiChanged is for telling that e.g. new proxies have to be
created.
+ */
+ public LlapManagementProtocolClientImpl withUgi(UserGroupInformation ugi) {
+ this.ugi = ugi;
+ this.ugiChanged = true;
+ return this;
+ }
}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
similarity index 96%
rename from
llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
rename to
llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
index 158c264bc6f..73deeb76aed 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
+++
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
@@ -57,8 +57,8 @@ public class LlapServerSecurityInfo extends SecurityInfo {
@Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
LOG.debug("Trying to get TokenInfo for {}", protocol);
- // Tokens cannot be used for the management protocol (for now).
- if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+ if (!LlapProtocolBlockingPB.class.isAssignableFrom(protocol)
+ && !LlapManagementProtocolPB.class.isAssignableFrom(protocol)) return
null;
return new TokenInfo() {
@Override
public Class<? extends Annotation> annotationType() {
diff --git
a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index a2021a3a31e..78a1d4870bf 100644
---
a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapUtil;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -165,13 +164,20 @@ public class SecretManager extends
ZKDelegationTokenSecretManager<LlapTokenIdent
private static LlapZkConf createLlapZkConf(
Configuration conf, String llapPrincipal, String llapKeytab, String
clusterId) {
- // Override the default delegation token lifetime for LLAP.
- // Also set all the necessary ZK settings to defaults and LLAP configs,
if not set.
- final Configuration zkConf = new Configuration(conf);
+ // Override the default delegation token lifetime for LLAP.
+ // Also set all the necessary ZK settings to defaults and LLAP configs, if
not set.
+ final Configuration zkConf = new Configuration(conf);
long tokenLifetime = HiveConf.getTimeVar(
conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS);
zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
- zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
+
+ long tokenRenewInterval = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DELEGATION_TOKEN_RENEW_INTERVAL, TimeUnit.SECONDS);
+ zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenRenewInterval);
+
+ LOG.info("SecretManager ZkConf created: tokenLifetime {},
tokenRenewInterval: {}", tokenLifetime,
+ tokenRenewInterval);
+
try {
zkConf.set(ZK_DTSM_ZK_KERBEROS_PRINCIPAL,
SecurityUtil.getServerPrincipal(llapPrincipal, "0.0.0.0"));
@@ -248,7 +254,6 @@ public class SecretManager extends
ZKDelegationTokenSecretManager<LlapTokenIdent
}
LlapTokenIdentifier llapId = new LlapTokenIdentifier(
new Text(user), renewer, realUser, clusterId, appId,
isSignatureRequired);
- // TODO: note that the token is not renewable right now and will last for
2 weeks by default.
Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId,
this);
LOG.info("Created LLAP token {}", token);
return token;
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DefaultLlapTokenManager.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DefaultLlapTokenManager.java
new file mode 100644
index 00000000000..a81cbd6ba0d
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DefaultLlapTokenManager.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.SecretManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DefaultLlapTokenManager is for renewing and recreating LLAP tokens on a
cluster.
+ *
+ * This class runs in the LLAP Daemon, but the whole the process involves two
components:
+ * AM(s): LlapTaskCommunicator -- LlapProtocolClientProxy
+ * Daemon(s): LlapTokenManager
+ *
+ * DefaultLlapTokenManager tracks llap token instances and handles them as:
+ * a) renews periodically when they get closer to expiry date
+ * b) recreates when they get close to max lifetime.
+ * This logic is needed because Hadoop does not have a unified way for all
applications to do this,
+ * hence the LLAP_TOKEN specific implementation can be found in this class.
+ * The tokens are persisted by a zookeeper-based secretmanager
(ZKDelegationTokenSecretManager), and the typical
+ * use-case can be like:
+ * 1. AM calls Daemons to obtain a token (LlapProtocolClientProxy), using an
existing token, or by kerberos login
+ * 2. Daemon generates (if needed) and handles token which is then in its
scope, and returns the token on request
+ * 3. on shutdown: Daemon cancels all tokens which were made by it
+ * 4. AM re-requests for a token from existing Daemons in case of
communication failure (as the token is cancelled)
+ *
+ * This approach doesn't need a centralized component to handle tokens, as all
deamons handle
+ * renewal/recreation of their tokens. The only known drawback is 4) above: if
a Daemon stops/fails/crashes, AM should
+ * obtain a new token if it uses a token that was generated by that particular
Daemon.
+ */
+public class DefaultLlapTokenManager implements LlapTokenManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultLlapTokenManager.class);
+
+ private final ScheduledExecutorService tokenChecker =
Executors.newScheduledThreadPool(1);
+ private final List<TokenWrapper> tokens = new ArrayList<>();
+ private SecretManager secretManager;
+ private String clusterUser;
+
+ /**
+ * Convenience wrapper for llap tokens, that decodes max lifetime only once
on adding and maintains expiration time.
+ */
+ private class TokenWrapper {
+
+ private final Token<LlapTokenIdentifier> realToken;
+ private final long issueDate;
+ private final long maxDate;
+ private long expirationTime = 0;
+ private long renewalTime = 0;
+
+ public TokenWrapper(Token<LlapTokenIdentifier> token) {
+ this.realToken = token;
+ try {
+ renew(); // renew immediately in order to become aware of the
expiration date
+ issueDate = token.decodeIdentifier().getIssueDate();
+ maxDate = token.decodeIdentifier().getMaxDate();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void renew() {
+ LOG.info("Renewing token: " + realToken);
+ try {
+ expirationTime = secretManager.renewToken(realToken, clusterUser);
+ renewalTime = Time.now();
+ LOG.info("Renewed token: " + realToken);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String toString() {
+ return realToken.toString();
+ }
+ }
+
+ public DefaultLlapTokenManager(Configuration conf, SecretManager
secretManager) {
+ this.secretManager = secretManager;
+ try {
+ this.clusterUser =
UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ LOG.info("Initializing periodic token refresh in daemon, will run in every
{}s",
+ LLAP_TOKEN_CHECK_INTERVAL_IN_DAEMON_SECONDS);
+ tokenChecker.scheduleAtFixedRate(() -> {
+ LOG.debug("Checking tokens, count: {}", tokens.size());
+ ListIterator<TokenWrapper> tokensIt = tokens.listIterator();
+ while (tokensIt.hasNext()) {
+ TokenWrapper token = tokensIt.next();
+ if (needsRecreate(token)) {
+ /* If the token needs to be recreated, it's because maxDate is
close. We can remove it here,
+ * so it won't be returned by this daemon anymore. The cancelToken
represents the same in the ZK-based
+ * secretManager (removes the token), so additional cleanup is not
needed.
+ */
+ try {
+ LOG.info("Cancelling token: {}", token.realToken);
+ secretManager.cancelToken(token.realToken, clusterUser);
+ tokensIt.remove();
+ } catch (IOException e) {
+ LOG.error("Error while cancelling token: {}", token.realToken, e);
+ }
+ } else if (needsRenew(token)) {
+ token.renew();
+ }
+ }
+ }, 0, LLAP_TOKEN_CHECK_INTERVAL_IN_DAEMON_SECONDS, TimeUnit.SECONDS);
+ }
+
+ private boolean needsRecreate(TokenWrapper token) {
+ long now = Time.now();
+ long tokenWholeLifeTimeMs = token.maxDate - token.issueDate;
+ long tokenRemainingLifeTimeMs = Math.max(token.maxDate - now, 0);
+ // a number which tells how close if maxDate, e.g. 0.1 means only 10% of
the token lifetime left
+ double tokenRemainingLifeTimePercent = (double) tokenRemainingLifeTimeMs /
tokenWholeLifeTimeMs;
+ boolean needsRecreate = tokenRemainingLifeTimePercent < 0.1;
+
+ LOG.debug(
+ "Token needsRecreate? {}, now: {}, maxDate: {}, issueDate: {},
tokenWholeLifeTime(ms): {}, "
+ + "tokenRemainingLifeTime(ms): {}, tokenRemainingLifeTimePercent:
{}%",
+ needsRecreate, now, token.maxDate, token.issueDate,
tokenWholeLifeTimeMs, tokenRemainingLifeTimeMs,
+ toPercentString(tokenRemainingLifeTimePercent));
+ return needsRecreate;
+ }
+
+ private boolean needsRenew(TokenWrapper token) {
+ long now = Time.now();
+ long tokenWholeValidityPeriodMs = token.expirationTime - token.renewalTime;
+ long tokenRemainingValidityPeriodMs = Math.max(token.expirationTime - now,
0);
+ // a number which tells how close is token expiry time, e.g. 0.1 means
only 10% of the token validity period left
+ double tokenRemainingValidityPeriodPercent = (double)
tokenRemainingValidityPeriodMs / tokenWholeValidityPeriodMs;
+ boolean needsRenew = tokenRemainingValidityPeriodPercent < 0.1;
+
+ LOG.debug(
+ "Token needsRenew? {}, now: {}, expirationTime: {}, renewalTime: {},
tokenWholeValidityPeriod(ms): {},"
+ + " tokenRemainingValidityPeriod(ms): {},
tokenRelativeRemainingValidityPeriod: {}%",
+ needsRenew, now, token.expirationTime, token.renewalTime,
tokenWholeValidityPeriodMs,
+ tokenRemainingValidityPeriodMs,
toPercentString(tokenRemainingValidityPeriodPercent));
+ return needsRenew;
+ }
+
+ private String toPercentString(double dblNumber) {
+ return Double.toString((double)Math.round(dblNumber * 1000) / 10);
+ }
+
+ @Override
+ public Token<LlapTokenIdentifier> getToken(GetTokenRequestProto request,
boolean isSigningRequired)
+ throws IOException {
+ Token<LlapTokenIdentifier> token = tokens.isEmpty() ? null :
tokens.get(0).realToken;
+
+ if (token == null) {
+ token = generateToken(request, isSigningRequired);
+ } else {
+ LOG.debug("Returning already existing token: {}", token);
+ }
+
+ return token;
+ }
+
+ private Token<LlapTokenIdentifier> generateToken(GetTokenRequestProto
request, boolean isSigningRequired)
+ throws IOException {
+ Token<LlapTokenIdentifier> token =
+ secretManager.createLlapToken(request.hasAppId() ? request.getAppId()
: null, null, isSigningRequired);
+ tokens.add(new TokenWrapper(token));
+
+ LOG.info("Added new token: {}, #tokens: {}", token, tokens.size());
+ return token;
+ }
+
+ @Override
+ public void close() {
+ cancelTokens(tokens);
+ }
+
+ private void cancelTokens(List<TokenWrapper> tokensToCancel) {
+ ListIterator<TokenWrapper> tokensIt = tokensToCancel.listIterator();
+ while (tokensIt.hasNext()) {
+ TokenWrapper token = tokensIt.next();
+ try {
+ secretManager.cancelToken(token.realToken, clusterUser);
+ tokensIt.remove();
+ } catch (IOException e) {
+ LOG.warn("Cannot cancel token while shutting down (on IOException): "
+ token + ", giving up", e);
+ }
+ }
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DummyTokenManager.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DummyTokenManager.java
new file mode 100644
index 00000000000..3a24c876581
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DummyTokenManager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+public class DummyTokenManager implements LlapTokenManager {
+
+ @Override
+ public Token<LlapTokenIdentifier> getToken(GetTokenRequestProto request,
boolean isSigningRequired) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index d18f74b87ee..4f04793d33e 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.security.LlapExtClientJwtHelper;
import org.apache.hadoop.hive.llap.security.LlapUgiFactoryFactory;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.security.SecretManager;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
@@ -80,6 +81,7 @@ import
org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecke
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
@@ -122,6 +124,7 @@ public class LlapDaemon extends CompositeService implements
ContainerRunner, Lla
private final String[] localDirs;
private final DaemonId daemonId;
private final SocketFactory socketFactory;
+ private final LlapTokenManager llapTokenManager;
// TODO Not the best way to share the address
private final AtomicReference<InetSocketAddress> srvAddress = new
AtomicReference<>(),
@@ -322,10 +325,14 @@ public class LlapDaemon extends CompositeService
implements ContainerRunner, Lla
SecretManager sm = null;
if (UserGroupInformation.isSecurityEnabled()) {
sm = SecretManager.createSecretManager(daemonConf,
daemonId.getClusterString());
+ this.llapTokenManager = new DefaultLlapTokenManager(daemonConf, sm);
+ } else {
+ this.llapTokenManager = new DummyTokenManager();
}
+
this.secretManager = sm;
- this.server = new LlapProtocolServerImpl(secretManager,
- numHandlers, this, srvAddress, mngAddress, srvPort,
externalClientsRpcPort, mngPort, daemonId, metrics);
+ this.server = new LlapProtocolServerImpl(secretManager, numHandlers, this,
srvAddress, mngAddress, srvPort,
+ externalClientsRpcPort, mngPort, daemonId,
metrics).withTokenManager(this.llapTokenManager);
UgiFactory fsUgiFactory = null;
try {
@@ -528,6 +535,10 @@ public class LlapDaemon extends CompositeService
implements ContainerRunner, Lla
public void shutdown() {
LOG.info("LlapDaemon shutdown invoked");
+
+ // invalidate tokens
+ this.llapTokenManager.close();
+
if (llapDaemonInfoBean != null) {
try {
MBeans.unregister(llapDaemonInfoBean);
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index 26edaf5ce69..7203a37b9ac 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -84,9 +84,9 @@ public class LlapProtocolServerImpl extends AbstractService
private final SecretManager secretManager;
private String clusterUser = null;
private boolean isRestrictedToClusterUser = false;
- private final DaemonId daemonId;
private final LlapDaemonExecutorMetrics executorMetrics;
private TokenRequiresSigning isSigningRequiredConfig =
TokenRequiresSigning.TRUE;
+ private LlapTokenManager llapTokenManager = new DummyTokenManager();
public LlapProtocolServerImpl(SecretManager secretManager, int numHandlers,
ContainerRunner containerRunner, AtomicReference<InetSocketAddress>
srvAddress,
@@ -102,7 +102,6 @@ public class LlapProtocolServerImpl extends AbstractService
this.mngAddress = mngAddress;
this.externalClientsRpcPort = externalClientsRpcPort;
this.mngPort = mngPort;
- this.daemonId = daemonId;
this.executorMetrics = executorMetrics;
LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() +
" with port configured to: " + srvPort);
@@ -292,8 +291,7 @@ public class LlapProtocolServerImpl extends AbstractService
callingUser = UserGroupInformation.getCurrentUser();
// Determine if the user would need to sign fragments.
boolean isSigningRequired = determineIfSigningIsRequired(callingUser);
- token = secretManager.createLlapToken(
- request.hasAppId() ? request.getAppId() : null, null,
isSigningRequired);
+ token = llapTokenManager.getToken(request, isSigningRequired);
} catch (IOException e) {
throw new ServiceException(e);
}
@@ -395,4 +393,9 @@ public class LlapProtocolServerImpl extends AbstractService
default: throw new AssertionError("Unknown value " +
isSigningRequiredConfig);
}
}
+
+ public LlapProtocolServerImpl withTokenManager(LlapTokenManager
llapTokenManager) {
+ this.llapTokenManager = llapTokenManager;
+ return this;
+ }
}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenManager.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenManager.java
new file mode 100644
index 00000000000..06b197b8dd5
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenManager.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+
+import
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * LlapTokenManager is for renewing and recreating LLAP tokens on a cluster.
+ * See further details in implementations.
+ */
+public interface LlapTokenManager {
+ long LLAP_TOKEN_CHECK_INTERVAL_IN_DAEMON_SECONDS = 300;
+
+ Token<LlapTokenIdentifier> getToken(GetTokenRequestProto request, boolean
isSigningRequired) throws IOException;
+
+ void close();
+}
diff --git
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index c5bb75b8ec6..1b23b6a5480 100644
---
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -18,17 +18,14 @@ import org.apache.hadoop.hive.conf.Validator.RangeValidator;
import
org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService.NodeInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
-import org.apache.hadoop.io.Writable;
import
org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray;
import
org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
import java.io.IOException;
import java.net.BindException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -42,7 +39,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
@@ -77,7 +73,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -91,7 +86,6 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
@@ -105,7 +99,6 @@ import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.DagInfo;
-import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.slf4j.Logger;
@@ -115,7 +108,6 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
private static final Logger LOG =
LoggerFactory.getLogger(LlapTaskCommunicator.class);
- private static final String RESOURCE_URI_STR = "/ws/v1/applicationhistory";
private static final Joiner JOINER = Joiner.on("");
private static final Joiner PATH_JOINER = Joiner.on("/");
private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap;
@@ -130,7 +122,6 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
private LlapProtocolClientProxy communicator;
private long deleteDelayOnDagComplete;
private final LlapTaskUmbilicalProtocol umbilical;
- private final Token<LlapTokenIdentifier> token;
private final String user;
private String amHost;
private String timelineServerUri;
@@ -155,17 +146,6 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
public LlapTaskCommunicator(
TaskCommunicatorContext taskCommunicatorContext) {
super(taskCommunicatorContext);
- Credentials credentials = taskCommunicatorContext.getAMCredentials();
- if (credentials != null) {
- @SuppressWarnings("unchecked")
- Token<LlapTokenIdentifier> llapToken =
-
(Token<LlapTokenIdentifier>)credentials.getToken(LlapTokenIdentifier.KIND_NAME);
- this.token = llapToken;
- } else {
- this.token = null;
- }
- LOG.info("Task communicator with a token " + token);
- Preconditions.checkState((token != null) ==
UserGroupInformation.isSecurityEnabled());
// Not closing this at the moment at shutdown, since this could be a
shared instance.
serviceRegistry = LlapRegistryService.getClient(conf);
@@ -190,31 +170,48 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
}
}
+ @SuppressWarnings("unchecked")
+ private Token<LlapTokenIdentifier> getLlapToken() {
+ Token<LlapTokenIdentifier> token = null;
+ Credentials credentials = getContext().getAMCredentials();
+ if (credentials != null) {
+ token = (Token<LlapTokenIdentifier>)
credentials.getToken(LlapTokenIdentifier.KIND_NAME);
+ }
+ // supposed to have a token only if security is enabled (any mismatch here
implies a configuration problem)
+ Preconditions.checkState((token != null) ==
UserGroupInformation.isSecurityEnabled());
+ if (token != null) {
+ LOG.info("Task communicator with a token {}", token);
+ }
+ return token;
+ }
+
void setScheduler(LlapTaskSchedulerService peer) {
this.scheduler = peer;
}
private static final String LLAP_TOKEN_NAME =
LlapTokenIdentifier.KIND_NAME.toString();
- /**
- * @return true iff the error is fatal and we should give up on everything.
- */
- private boolean processSendError(Throwable t) {
+ private void processSendError(Throwable t) {
Throwable cause = t;
while (cause != null) {
- if (cause instanceof RetriableException) return false;
- if (((cause instanceof InvalidToken && cause.getMessage() != null)
- || (cause instanceof RemoteException && cause.getCause() == null
- && cause.getMessage() != null &&
cause.getMessage().contains("InvalidToken")))
- && cause.getMessage().contains(LLAP_TOKEN_NAME)) {
- break;
+ if (isInvalidTokenError(cause)) {
+ handleInvalidToken();
+ return;
}
cause = cause.getCause();
}
- if (cause == null) return false;
- LOG.error("Reporting fatal error - LLAP token appears to be invalid.", t);
- getContext().reportError(ServicePluginErrorDefaults.OTHER_FATAL,
cause.getMessage(), null);
- return true;
+ }
+
+ private boolean isInvalidTokenError(Throwable cause) {
+ LOG.debug("Checking for invalid token error, cause: {}, cause.getCause():
{}", cause, cause.getCause());
+ /*
+ * The lastest message discovered (while doing HIVE-26569) for a
RemoteException is:
+ * "Current (LLAP_TOKEN; LLAP_TOKEN owner=hive/***, renewer=hive,
realUser=, issueDate=1670317803579, maxDate=1671527403579,
+ * sequenceNumber=297, masterKeyId=296, cluster ***, app ID , signing
false) can't be found in cache"
+ */
+ return ((cause instanceof InvalidToken && cause.getMessage() != null) ||
(cause instanceof RemoteException
+ && cause.getCause() == null && cause.getMessage() != null &&
cause.getMessage().contains(LLAP_TOKEN_NAME)
+ && (cause.getMessage().contains("InvalidToken") ||
cause.getMessage().contains("can't be found in cache"))));
}
@Override
@@ -344,7 +341,7 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
@VisibleForTesting
protected LlapProtocolClientProxy createLlapProtocolClientProxy(int
numThreads, Configuration conf) {
- return new LlapProtocolClientProxy(numThreads, conf, token);
+ return new LlapProtocolClientProxy(numThreads, conf, getLlapToken());
}
@Override
@@ -403,9 +400,8 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
+ " appId=" +
currentQueryIdentifierProto.getApplicationIdString()
+ " dagId=" + currentQueryIdentifierProto.getDagIndex()
+ " to node " + node.getHost());
- if (!processSendError(t)) {
- callback.setError(null, t);
- }
+ processSendError(t);
+ callback.setError(null, t);
}
});
} catch (IOException e) {
@@ -442,9 +438,8 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
@Override
public void indicateError(Throwable t) {
LOG.warn("Failed to send update fragment request for {}",
attemptId.toString());
- if (!processSendError(t)) {
- callback.setError(ctx, t);
- }
+ processSendError(t);
+ callback.setError(ctx, t);
}
});
}
@@ -551,6 +546,7 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
ServiceException se = (ServiceException) t;
t = se.getCause();
}
+
if (t instanceof RemoteException) {
// All others from the remote service cause the task to FAIL.
LOG.info(
@@ -566,7 +562,7 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
LOG.info(
"Unable to run task: " + taskSpec.getTaskAttemptID() + "
on containerId: " +
containerId + ", Communication Error");
- processSendError(originalError);
+ processSendError(originalError);
getContext().taskKilled(taskSpec.getTaskAttemptID(),
TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication
Error");
} else {
@@ -574,7 +570,7 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
LOG.info(
"Failed to run task: " + taskSpec.getTaskAttemptID() + "
on containerId: " +
containerId, t);
- processSendError(originalError);
+ processSendError(originalError);
getContext()
.taskFailed(taskSpec.getTaskAttemptID(),
TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER,
t.getMessage());
@@ -584,6 +580,17 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
});
}
+ /**
+ * This is a best effort action in case of an invalid LLAP_TOKEN. The
communicator (LlapProtocolClientProxy) periodically
+ * refreshes tokens, but there is a relatively small time window (compared
to the whole token maxLifetime),
+ * when the token is cancelled and it's not yet fetched from daemons. By the
time we detect this problem, the current
+ * task attempt is already failed, but usually there are at least 3 task
attempts before failing the task and dag, therefore
+ * fetching an LLAP_TOKEN synchronously here definitely solves the invalid
token problem (as the next task attempt will use it).
+ */
+ private void handleInvalidToken() {
+ this.communicator.refreshToken();
+ }
+
@Override
public void unregisterRunningTaskAttempt(final TezTaskAttemptID
taskAttemptId,
TaskAttemptEndReason endReason,
@@ -623,7 +630,7 @@ public class LlapTaskCommunicator extends
TezTaskCommunicatorImpl {
public void indicateError(Throwable t) {
LOG.warn("Failed to send terminate fragment request for {}",
taskAttemptId.toString());
- processSendError(t);
+ processSendError(t);
}
});
} else {