Repository: hive Updated Branches: refs/heads/master 298644f66 -> 8c4b99a4e
HIVE-13449 : LLAP: HS2 should get the token directly, rather than from LLAP (Sergey Shelukhin, reviewed by Siddharth Seth and Lefty Leverenz) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8c4b99a4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8c4b99a4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8c4b99a4 Branch: refs/heads/master Commit: 8c4b99a4e49ea297f5a4f52a723f0697dc4ea272 Parents: 298644f Author: Sergey Shelukhin <[email protected]> Authored: Tue May 17 17:41:17 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Tue May 17 17:41:17 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 + llap-client/pom.xml | 11 +- .../hadoop/hive/llap/io/api/LlapProxy.java | 37 ----- .../llap/security/LlapTokenClientFactory.java | 160 ++++++++++++++++++ .../llap/security/LlapTokenLocalClient.java | 59 +++++++ .../org/apache/hadoop/hive/llap/DaemonId.java | 9 +- .../org/apache/hadoop/hive/llap/LlapUtil.java | 26 +++ .../impl/LlapManagementProtocolClientImpl.java | 3 +- .../hive/llap/security/LlapTokenProvider.java | 27 --- .../hive/llap/security/SecretManager.java | 162 ++++++++++++++++++ .../hive/llap/daemon/impl/LlapDaemon.java | 8 +- .../daemon/impl/LlapProtocolServerImpl.java | 26 +-- .../hive/llap/security/LlapSecurityHelper.java | 164 ------------------- .../hive/llap/security/SecretManager.java | 131 --------------- .../hive/ql/exec/tez/TezSessionState.java | 76 ++++++++- 15 files changed, 502 insertions(+), 402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 541af57..cbb3a72 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2704,6 +2704,11 @@ public class HiveConf extends Configuration { "RPC port for LLAP daemon management service."), LLAP_WEB_AUTO_AUTH("hive.llap.auto.auth", true, "Whether or not to set Hadoop configs to enable auth in LLAP web app."), + LLAP_CREATE_TOKEN_LOCALLY("hive.llap.create.token.locally", "hs2", + new StringSet("true", "hs2", "false"), + "Whether to create LLAP tokens locally, saving directly to ZooKeeper SecretManager.\n" + + "Requires one to have access to ZK paths; in other words, this should only be used in\n" + + "HiveServer2. By default, the value is 'hs2', which means exactly that."), LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5, "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"), http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/pom.xml ---------------------------------------------------------------------- diff --git a/llap-client/pom.xml b/llap-client/pom.xml index 4a75bbb..cbfdcd9 100644 --- a/llap-client/pom.xml +++ b/llap-client/pom.xml @@ -46,6 +46,11 @@ </dependency> <!-- inter-project --> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> @@ -81,12 +86,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>${commons-lang3.version}</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>${mockito-all.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java index 424769f..6c2618b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java @@ -21,23 +21,14 @@ import java.io.IOException; import java.lang.reflect.Constructor; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.security.LlapTokenProvider; - @SuppressWarnings("rawtypes") public class LlapProxy { private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl"; - private final static String TOKEN_CLASS = - "org.apache.hadoop.hive.llap.security.LlapSecurityHelper"; // Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O // singleton once (on daemon startup); the said singleton serves as the IO interface. private static LlapIo io = null; - private static LlapTokenProvider tokenProvider = null; - private static final Object tpInitLock = new Object(); - private static volatile boolean isTpInitDone = false; private static boolean isDaemon = false; @@ -77,34 +68,6 @@ public class LlapProxy { } } - public static LlapTokenProvider getOrInitTokenProvider(Configuration conf) { - if (isTpInitDone) return tokenProvider; - synchronized (tpInitLock) { - if (isTpInitDone) return tokenProvider; - try { - tokenProvider = createTokenProviderImpl(conf); - isTpInitDone = true; - } catch (IOException e) { - throw new RuntimeException("Cannot initialize token provider", e); - } - return tokenProvider; - } - } - - private static LlapTokenProvider createTokenProviderImpl(Configuration conf) throws IOException { - try { - @SuppressWarnings("unchecked") - Class<? extends LlapTokenProvider> clazz = - (Class<? extends LlapTokenProvider>)Class.forName(TOKEN_CLASS); - Constructor<? extends LlapTokenProvider> ctor = - clazz.getDeclaredConstructor(Configuration.class); - ctor.setAccessible(true); - return ctor.newInstance(conf); - } catch (Exception e) { - throw new RuntimeException("Failed to create token provider class", e); - } - } - public static void close() { if (io != null) { io.close(); http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java new file mode 100644 index 0000000..ebc91b1 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.security; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.net.SocketFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; + +public class LlapTokenClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(LlapTokenClientFactory.class); + + private final LlapRegistryService registry; + private final SocketFactory socketFactory; + private final RetryPolicy retryPolicy; + private final Configuration conf; + private ServiceInstanceSet activeInstances; + private Collection<ServiceInstance> lastKnownInstances; + + public LlapTokenClientFactory(Configuration conf) { + this.conf = conf; + registry = new LlapRegistryService(false); + registry.init(conf); + socketFactory = NetUtils.getDefaultSocketFactory(conf); + retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( + 16000, 2000l, TimeUnit.MILLISECONDS); + } + + public interface Client { + Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException; + } + + public Client createClient() { + return new ClientImpl(); // Client is separate from factory mostly for thread-safety reasons. + } + + private class ClientImpl implements Client { + private LlapManagementProtocolClientImpl client; + private ServiceInstance clientInstance; + + @Override + public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException { + if (!UserGroupInformation.isSecurityEnabled()) return null; + Iterator<ServiceInstance> llaps = null; + if (clientInstance == null) { + assert client == null; + llaps = getLlapServices(false).iterator(); + clientInstance = llaps.next(); + } + + ByteString tokenBytes = null; + boolean hasRefreshed = false; + while (true) { + try { + tokenBytes = getTokenBytes(appId); + break; + } catch (IOException | ServiceException ex) { + LOG.error("Cannot get a token, trying a different instance", ex); + client = null; + clientInstance = null; + } + if (llaps == null || !llaps.hasNext()) { + if (hasRefreshed) { // Only refresh once. + throw new RuntimeException("Cannot find any LLAPs to get the token from"); + } + llaps = getLlapServices(true).iterator(); + hasRefreshed = true; + } + clientInstance = llaps.next(); + } + + Token<LlapTokenIdentifier> token = extractToken(tokenBytes); + if (LOG.isInfoEnabled()) { + LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token); + } + return token; + } + + private Token<LlapTokenIdentifier> extractToken(ByteString tokenBytes) throws IOException { + Token<LlapTokenIdentifier> token = new Token<>(); + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(tokenBytes.asReadOnlyByteBuffer()); + token.readFields(in); + return token; + } + + private ByteString getTokenBytes(final String appId) throws IOException, ServiceException { + assert clientInstance != null; + if (client == null) { + client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(), + clientInstance.getManagementPort(), retryPolicy, socketFactory); + } + GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder(); + if (!StringUtils.isBlank(appId)) { + req.setAppId(appId); + } + return client.getDelegationToken(null, req.build()).getToken(); + } + } + + /** Synchronized - LLAP registry and instance set are not thread safe. */ + private synchronized List<ServiceInstance> getLlapServices( + boolean doForceRefresh) throws IOException { + if (!doForceRefresh && lastKnownInstances != null) { + return new ArrayList<>(lastKnownInstances); + } + if (activeInstances == null) { + registry.start(); + activeInstances = registry.getInstances(); + } + Map<String, ServiceInstance> daemons = activeInstances.getAll(); + if (daemons == null || daemons.isEmpty()) { + throw new RuntimeException("No LLAPs found"); + } + lastKnownInstances = daemons.values(); + return new ArrayList<ServiceInstance>(lastKnownInstances); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java new file mode 100644 index 0000000..f10351b --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.security; + +import java.io.IOException; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapTokenLocalClient { + private static final Logger LOG = LoggerFactory.getLogger(LlapTokenLocalClient.class); + private final SecretManager secretManager; + + public LlapTokenLocalClient(Configuration conf, String clusterId) { + secretManager = SecretManager.createSecretManager(conf, clusterId); + } + + public Token<LlapTokenIdentifier> createToken(String appId, String user) throws IOException { + try { + Token<LlapTokenIdentifier> token = secretManager.createLlapToken(appId, user); + if (LOG.isInfoEnabled()) { + LOG.info("Created a LLAP delegation token locally: " + token); + } + return token; + } catch (Exception ex) { + throw new IOException("Failed to create LLAP token locally. You might need to set " + + ConfVars.LLAP_CREATE_TOKEN_LOCALLY.varname + + " to false, or make sure you can access secure ZK paths.", ex); + } + } + + public void close() { + try { + secretManager.stopThreads(); + } catch (Exception ex) { + // Ignore. + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java index 18355e6..ea47330 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java @@ -32,7 +32,14 @@ public class DaemonId { } public String getClusterString() { - return userName + "_" + clusterName + "_" + appId; + return createClusterString(userName, clusterName); + } + + public static String createClusterString(String userName, String clusterName) { + // Note that this doesn't include appId. We assume that all the subsequent instances + // of the same user+cluster are logically the same, i.e. all the ZK paths will be reused, + // all the security tokens/etc. should transition between them, etc. + return userName + "_" + clusterName; } public String getApplicationId() { http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index ce03de0..9dcacea 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -13,14 +13,40 @@ */ package org.apache.hadoop.hive.llap; +import java.io.IOException; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LlapUtil { + private static final Logger LOG = LoggerFactory.getLogger(LlapUtil.class); + public static String getDaemonLocalDirList(Configuration conf) { String localDirList = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_WORK_DIRS); if (localDirList != null && !localDirList.isEmpty()) return localDirList; return conf.get("yarn.nodemanager.local-dirs"); } + + public static UserGroupInformation loginWithKerberos( + String principal, String keytabFile) throws IOException { + if (!UserGroupInformation.isSecurityEnabled()) return null; + if (principal.isEmpty() || keytabFile.isEmpty()) { + throw new RuntimeException("Kerberos principal and/or keytab are empty"); + } + LOG.info("Logging in as " + principal + " via " + keytabFile); + return UserGroupInformation.loginUserFromKeytabAndReturnUGI( + SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytabFile); + } + + private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]"); + public static String generateClusterName(Configuration conf) { + String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + return hostsRe.matcher(hosts.startsWith("@") ? hosts.substring(1) : hosts).replaceAll("_"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java index cd11bdb..af760b1 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java @@ -65,8 +65,7 @@ public class LlapManagementProtocolClientImpl implements LlapManagementProtocolP RPC.setProtocolEngine(conf, LlapManagementProtocolPB.class, ProtobufRpcEngine.class); ProtocolProxy<LlapManagementProtocolPB> proxy = RPC.getProtocolProxy(LlapManagementProtocolPB.class, 0, serverAddr, - UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0, - retryPolicy); + UserGroupInformation.getCurrentUser(), conf, socketFactory, 0, retryPolicy); return proxy.getProxy(); } http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java deleted file mode 100644 index edf9b18..0000000 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.security; - -import java.io.IOException; - -import org.apache.hadoop.security.token.Token; - -public interface LlapTokenProvider { - Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java new file mode 100644 index 0000000..465b204 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java @@ -0,0 +1,162 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.security; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> { + private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class); + private final String clusterId; + + public SecretManager(Configuration conf, String clusterId) { + super(conf); + this.clusterId = clusterId; + checkForZKDTSMBug(conf); + } + + // Workaround for HADOOP-12659 - remove when Hadoop 2.7.X is no longer supported. + private void checkForZKDTSMBug(Configuration conf) { + // There's a bug in ZKDelegationTokenSecretManager ctor where seconds are not converted to ms. + long expectedRenewTimeSec = conf.getLong(DelegationTokenManager.RENEW_INTERVAL, -1); + LOG.info("Checking for tokenRenewInterval bug: " + expectedRenewTimeSec); + if (expectedRenewTimeSec == -1) return; // The default works, no bug. + java.lang.reflect.Field f = null; + try { + Class<?> c = org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.class; + f = c.getDeclaredField("tokenRenewInterval"); + f.setAccessible(true); + } catch (Throwable t) { + // Maybe someone removed the field; probably ok to ignore. + LOG.error("Failed to check for tokenRenewInterval bug, hoping for the best", t); + return; + } + try { + long realValue = f.getLong(this); + long expectedValue = expectedRenewTimeSec * 1000; + LOG.info("tokenRenewInterval is: " + realValue + " (expected " + expectedValue + ")"); + if (realValue == expectedRenewTimeSec) { + // Bug - the field has to be in ms, not sec. Override only if set precisely to sec. + f.setLong(this, expectedValue); + } + } catch (Exception ex) { + throw new RuntimeException("Failed to address tokenRenewInterval bug", ex); + } + } + + @Override + public LlapTokenIdentifier createIdentifier() { + return new LlapTokenIdentifier(); + } + + @Override + public LlapTokenIdentifier decodeTokenIdentifier( + Token<LlapTokenIdentifier> token) throws IOException { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(token.getIdentifier())); + LlapTokenIdentifier id = new LlapTokenIdentifier(); + id.readFields(dis); + dis.close(); + return id; + } + + public static SecretManager createSecretManager(final Configuration conf, String clusterId) { + String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), + llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); + return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId); + } + + + public static SecretManager createSecretManager(final Configuration conf, + String llapPrincipal, String llapKeytab, final String clusterId) { + // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways. + UserGroupInformation zkUgi = null; + String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal); + String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab); + try { + zkUgi = LlapUtil.loginWithKerberos(principal, keyTab); + } catch (IOException e) { + throw new RuntimeException(e); + } + // Override the default delegation token lifetime for LLAP. + // Also set all the necessary ZK settings to defaults and LLAP configs, if not set. + final Configuration zkConf = new Configuration(conf); + long tokenLifetime = HiveConf.getTimeVar( + conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS); + zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime); + zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime); + zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal); + zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab); + String zkPath = clusterId; + LOG.info("Using {} as ZK secret manager path", zkPath); + zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath); + setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl"); + setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING, + HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING)); + return zkUgi.doAs(new PrivilegedAction<SecretManager>() { + @Override + public SecretManager run() { + SecretManager zkSecretManager = new SecretManager(zkConf, clusterId); + try { + zkSecretManager.startThreads(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return zkSecretManager; + } + }); + } + + private static void setZkConfIfNotSet(Configuration zkConf, String name, String value) { + if (zkConf.get(name) != null) return; + zkConf.set(name, value); + } + + public Token<LlapTokenIdentifier> createLlapToken(String appId, String user) throws IOException { + Text realUser = null, renewer = null; + if (user == null) { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + user = ugi.getUserName(); + if (ugi.getRealUser() != null) { + realUser = new Text(ugi.getRealUser().getUserName()); + } + renewer = new Text(ugi.getShortUserName()); + } else { + renewer = new Text(user); + } + LlapTokenIdentifier llapId = new LlapTokenIdentifier( + new Text(user), renewer, realUser, clusterId, appId); + // TODO: note that the token is not renewable right now and will last for 2 weeks by default. + Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, this); + if (LOG.isInfoEnabled()) { + LOG.info("Created LLAP token {}", token); + } + return token; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 5731b2c..de817e3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -106,12 +106,6 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla private final String[] localDirs; private final DaemonId daemonId; - private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]"); - private static String generateClusterName(Configuration conf) { - String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); - return hostsRe.matcher(hosts.startsWith("@") ? hosts.substring(1) : hosts).replaceAll("_"); - } - // TODO Not the best way to share the address private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(), mngAddress = new AtomicReference<>(); @@ -150,7 +144,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla String hostName = MetricsUtils.getHostName(); try { daemonId = new DaemonId(UserGroupInformation.getCurrentUser().getUserName(), - generateClusterName(daemonConf), hostName, appName, System.currentTimeMillis()); + LlapUtil.generateClusterName(daemonConf), hostName, appName, System.currentTimeMillis()); } catch (IOException ex) { throw new RuntimeException(ex); } http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index db8bfa6..b94fc2e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto; @@ -51,7 +52,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.hive.llap.security.LlapSecurityHelper; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.security.SecretManager; import org.apache.hadoop.service.AbstractService; @@ -150,12 +150,13 @@ public class LlapProtocolServerImpl extends AbstractService } String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); - zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, daemonId); + zkSecretManager = SecretManager.createSecretManager( + conf, llapPrincipal, llapKeytab, daemonId.getClusterString()); // Start the protocol server after properly authenticating with daemon keytab. UserGroupInformation daemonUgi = null; try { - daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab); + daemonUgi = LlapUtil.loginWithKerberos(llapPrincipal, llapKeytab); } catch (IOException e) { throw new RuntimeException(e); } @@ -265,9 +266,11 @@ public class LlapProtocolServerImpl extends AbstractService if (zkSecretManager == null) { throw new ServiceException("Operation not supported on unsecure cluster"); } - UserGroupInformation ugi; + UserGroupInformation ugi = null; + Token<LlapTokenIdentifier> token = null; try { ugi = UserGroupInformation.getCurrentUser(); + token = zkSecretManager.createLlapToken(request.getAppId(), null); } catch (IOException e) { throw new ServiceException(e); } @@ -278,20 +281,7 @@ public class LlapProtocolServerImpl extends AbstractService + " or adjust " + ConfVars.LLAP_MANAGEMENT_ACL.varname + " and " + ConfVars.LLAP_MANAGEMENT_ACL_DENY.varname + " to a more restrictive ACL."); } - String user = ugi.getUserName(); - Text owner = new Text(user); - Text realUser = null; - if (ugi.getRealUser() != null) { - realUser = new Text(ugi.getRealUser().getUserName()); - } - Text renewer = new Text(ugi.getShortUserName()); - LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser, - daemonId.getClusterString(), request.hasAppId() ? request.getAppId() : null); - // TODO: note that the token is not renewable right now and will last for 2 weeks by default. - Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager); - if (LOG.isInfoEnabled()) { - LOG.info("Created LLAP token " + token); - } + ByteArrayDataOutput out = ByteStreams.newDataOutput(); try { token.write(out); http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java deleted file mode 100644 index f958bc4..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.security; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.net.SocketFactory; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; -import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; - -/** Individual instances of this class are not thread safe. */ -public class LlapSecurityHelper implements LlapTokenProvider { - private static final Logger LOG = LoggerFactory.getLogger(LlapSecurityHelper.class); - - private UserGroupInformation llapUgi; - - private final LlapRegistryService registry; - private ServiceInstanceSet activeInstances; - private final Configuration conf; - private LlapManagementProtocolClientImpl client; - private ServiceInstance clientInstance; - - private final SocketFactory socketFactory; - private final RetryPolicy retryPolicy; - - public LlapSecurityHelper(Configuration conf) { - this.conf = conf; - registry = new LlapRegistryService(false); - registry.init(conf); - socketFactory = NetUtils.getDefaultSocketFactory(conf); - retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( - 16000, 2000l, TimeUnit.MILLISECONDS); - } - - public static UserGroupInformation loginWithKerberos( - String principal, String keytabFile) throws IOException { - if (!UserGroupInformation.isSecurityEnabled()) return null; - if (principal.isEmpty() || keytabFile.isEmpty()) { - throw new RuntimeException("Kerberos principal and/or keytab are empty"); - } - LOG.info("Logging in as " + principal + " via " + keytabFile); - UserGroupInformation.loginUserFromKeytab( - SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytabFile); - return UserGroupInformation.getLoginUser(); - } - - @Override - public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException { - if (!UserGroupInformation.isSecurityEnabled()) return null; - if (llapUgi == null) { - llapUgi = UserGroupInformation.getCurrentUser(); - // We could have also added keytab support; right now client must do smth like kinit. - } - Iterator<ServiceInstance> llaps = null; - if (clientInstance == null) { - assert client == null; - llaps = getLlapServices(false); - clientInstance = llaps.next(); - } - - ByteString tokenBytes = null; - boolean hasRefreshed = false; - while (true) { - try { - tokenBytes = getTokenBytes(appId); - break; - } catch (InterruptedException ie) { - throw new RuntimeException(ie); - } catch (IOException ex) { - LOG.error("Cannot get a token, trying a different instance", ex); - client = null; - clientInstance = null; - } - if (llaps == null || !llaps.hasNext()) { - if (hasRefreshed) { // Only refresh once. - throw new RuntimeException("Cannot find any LLAPs to get the token from"); - } - llaps = getLlapServices(true); - hasRefreshed = true; - } - clientInstance = llaps.next(); - } - - // Stupid protobuf byte-buffer reinvention. - Token<LlapTokenIdentifier> token = new Token<>(); - DataInputByteBuffer in = new DataInputByteBuffer(); - in.reset(tokenBytes.asReadOnlyByteBuffer()); - token.readFields(in); - if (LOG.isInfoEnabled()) { - LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token); - } - return token; - } - - private ByteString getTokenBytes( - final String appId) throws InterruptedException, IOException { - return llapUgi.doAs(new PrivilegedExceptionAction<ByteString>() { - @Override - public ByteString run() throws Exception { - assert clientInstance != null; - if (client == null) { - client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(), - clientInstance.getManagementPort(), retryPolicy, socketFactory); - } - // Client only connects on the first call, so this has to be done in doAs. - GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder(); - if (!StringUtils.isBlank(appId)) { - req.setAppId(appId); - } - return client.getDelegationToken(null, req.build()).getToken(); - } - }); - } - - private Iterator<ServiceInstance> getLlapServices(boolean doForceRefresh) throws IOException { - if (activeInstances == null) { - registry.start(); - activeInstances = registry.getInstances(); - } - Map<String, ServiceInstance> daemons = activeInstances.getAll(); - if (doForceRefresh || daemons == null || daemons.isEmpty()) { - daemons = activeInstances.getAll(); - if (daemons == null || daemons.isEmpty()) throw new RuntimeException("No LLAPs found"); - } - return daemons.values().iterator(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java deleted file mode 100644 index c54e726..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.security; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.security.PrivilegedAction; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.DaemonId; -import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager; -import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> { - private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class); - - public SecretManager(Configuration conf) { - super(conf); - checkForZKDTSMBug(conf); - } - - // Workaround for HADOOP-12659 - remove when Hadoop 2.7.X is no longer supported. - private void checkForZKDTSMBug(Configuration conf) { - // There's a bug in ZKDelegationTokenSecretManager ctor where seconds are not converted to ms. - long expectedRenewTimeSec = conf.getLong(DelegationTokenManager.RENEW_INTERVAL, -1); - LOG.info("Checking for tokenRenewInterval bug: " + expectedRenewTimeSec); - if (expectedRenewTimeSec == -1) return; // The default works, no bug. - java.lang.reflect.Field f = null; - try { - Class<?> c = org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.class; - f = c.getDeclaredField("tokenRenewInterval"); - f.setAccessible(true); - } catch (Throwable t) { - // Maybe someone removed the field; probably ok to ignore. - LOG.error("Failed to check for tokenRenewInterval bug, hoping for the best", t); - return; - } - try { - long realValue = f.getLong(this); - long expectedValue = expectedRenewTimeSec * 1000; - LOG.info("tokenRenewInterval is: " + realValue + " (expected " + expectedValue + ")"); - if (realValue == expectedRenewTimeSec) { - // Bug - the field has to be in ms, not sec. Override only if set precisely to sec. - f.setLong(this, expectedValue); - } - } catch (Exception ex) { - throw new RuntimeException("Failed to address tokenRenewInterval bug", ex); - } - } - - @Override - public LlapTokenIdentifier createIdentifier() { - return new LlapTokenIdentifier(); - } - - @Override - public LlapTokenIdentifier decodeTokenIdentifier( - Token<LlapTokenIdentifier> token) throws IOException { - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(token.getIdentifier())); - LlapTokenIdentifier id = new LlapTokenIdentifier(); - id.readFields(dis); - dis.close(); - return id; - } - - public static SecretManager createSecretManager( - final Configuration conf, String llapPrincipal, String llapKeytab, DaemonId daemonId) { - // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways. - UserGroupInformation zkUgi = null; - String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal); - String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab); - try { - zkUgi = LlapSecurityHelper.loginWithKerberos(principal, keyTab); - } catch (IOException e) { - throw new RuntimeException(e); - } - // Override the default delegation token lifetime for LLAP. - // Also set all the necessary ZK settings to defaults and LLAP configs, if not set. - final Configuration zkConf = new Configuration(conf); - long tokenLifetime = HiveConf.getTimeVar( - conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS); - zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime); - zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime); - zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal); - zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab); - String zkPath = daemonId.getClusterString(); - LOG.info("Using {} as ZK secret manager path", zkPath); - zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath); - setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl"); - setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING, - HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING)); - return zkUgi.doAs(new PrivilegedAction<SecretManager>() { - @Override - public SecretManager run() { - SecretManager zkSecretManager = new SecretManager(zkConf); - try { - zkSecretManager.startThreads(); - } catch (IOException e) { - throw new RuntimeException(e); - } - return zkSecretManager; - } - }); - } - - private static void setZkConfIfNotSet(Configuration zkConf, String name, String value) { - if (zkConf.get(name) != null) return; - zkConf.set(name, value); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index fd6465a..c9b912b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -44,16 +44,19 @@ import javax.security.auth.login.LoginException; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl; -import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import org.apache.hadoop.hive.llap.security.LlapTokenClientFactory; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; -import org.apache.hadoop.hive.llap.security.LlapTokenProvider; +import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient; import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; import org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; @@ -82,6 +85,11 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + /** * Holds session state related to Tez */ @@ -274,14 +282,8 @@ public class TezSessionState { Credentials llapCredentials = null; if (llapMode) { if (UserGroupInformation.isSecurityEnabled()) { - LlapTokenProvider tp = LlapProxy.getOrInitTokenProvider(conf); - // For Tez, we don't use appId to distinguish the tokens; security scope is the user. - Token<LlapTokenIdentifier> token = tp.getDelegationToken(null); - if (LOG.isInfoEnabled()) { - LOG.info("Obtained a LLAP token: " + token); - } llapCredentials = new Credentials(); - llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, token); + llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig)); } UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig); // we need plugins to handle llap and uber mode @@ -336,6 +338,62 @@ public class TezSessionState { } } + // Only cache ZK connections (ie local clients); these are presumed to be used in HS2. + // TODO: temporary before HIVE-13698. + private static final Cache<String, LlapTokenLocalClient> localClientCache = CacheBuilder + .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES) + .removalListener(new RemovalListener<String, LlapTokenLocalClient>() { + @Override + public void onRemoval(RemovalNotification<String, LlapTokenLocalClient> notification) { + if (notification.getValue() != null) { + notification.getValue().close(); + } + } + }).build(); + + private static Token<LlapTokenIdentifier> getLlapToken( + String user, final Configuration conf) throws IOException { + // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's + // no good place for that right now (HIVE-13698). + boolean useLocalTokenClient = isUsingLocalClient(conf); + Token<LlapTokenIdentifier> token = null; + // For Tez, we don't use appId to distinguish the tokens. + if (useLocalTokenClient) { + String clusterName = LlapUtil.generateClusterName(conf); + // This assumes that the LLAP cluster and session are both running under HS2 user. + final String clusterId = DaemonId.createClusterString(user, clusterName); + try { + token = localClientCache.get(clusterId, new Callable<LlapTokenLocalClient>() { + @Override + public LlapTokenLocalClient call() throws Exception { + return new LlapTokenLocalClient(conf, clusterId); + } + }).createToken(null, null); + } catch (ExecutionException e) { + throw new IOException(e); + } + } else { + token = new LlapTokenClientFactory(conf).createClient().getDelegationToken(null); + } + if (LOG.isInfoEnabled()) { + LOG.info("Obtained a LLAP token: " + token); + } + return token; + } + + private static boolean isUsingLocalClient(Configuration conf) { + String mode = HiveConf.getVar(conf, ConfVars.LLAP_CREATE_TOKEN_LOCALLY).toLowerCase(); + boolean isHs2Only = "hs2".equals(mode); + // We are initialized on first use inside TezSessionState::openInternal; assume the session + // should be available. + if (!isHs2Only) return "true".equals(mode); + SessionState session = SessionState.get(); + if (session == null && LOG.isInfoEnabled()) { + LOG.warn("There's no session to check if we are in HS2"); + } + return session != null && session.isHiveServerQuery(); + } + private TezClient startSessionAndContainers(TezClient session, HiveConf conf, Map<String, LocalResource> commonLocalResources, TezConfiguration tezConfig, boolean isOnThread) throws TezException, IOException {
