HIVE-13444 : LLAP: add HMAC signatures to LLAP; verify them on LLAP side (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9fe3dab7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9fe3dab7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9fe3dab7 Branch: refs/heads/branch-2.1 Commit: 9fe3dab7fe82d78435d9cd01f44f7a8e748f3420 Parents: 44a8f0a Author: Sergey Shelukhin <ser...@apache.org> Authored: Tue May 31 13:09:45 2016 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Tue May 31 13:24:05 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 +- .../hive/llap/security/LlapTokenClient.java | 148 ++++++++++++++ .../llap/security/LlapTokenClientFactory.java | 160 --------------- .../llap/security/LlapTokenLocalClient.java | 12 +- .../hadoop/hive/llap/security/LlapSigner.java | 41 ++++ .../hive/llap/security/LlapTokenIdentifier.java | 14 +- .../hive/llap/security/SecretManager.java | 96 ++++++--- .../llap/security/SigningSecretManager.java | 26 +++ .../llap/daemon/impl/ContainerRunnerImpl.java | 57 +++++- .../hive/llap/daemon/impl/LlapDaemon.java | 2 +- .../daemon/impl/LlapProtocolServerImpl.java | 59 ++++-- .../hive/llap/daemon/impl/LlapTokenChecker.java | 30 ++- .../hive/llap/daemon/impl/QueryTracker.java | 14 +- .../llap/daemon/impl/TaskExecutorService.java | 6 +- .../llap/daemon/impl/TaskRunnerCallable.java | 19 +- .../hive/llap/security/LlapSignerImpl.java | 60 ++++++ .../daemon/impl/TaskExecutorTestHelpers.java | 3 +- .../llap/daemon/impl/TestLlapTokenChecker.java | 8 +- .../TestFirstInFirstOutComparator.java | 31 --- .../hive/llap/security/TestLlapSignerImpl.java | 200 +++++++++++++++++++ .../hive/ql/exec/tez/TezSessionState.java | 26 +-- 21 files changed, 720 insertions(+), 303 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6a404bd..cad5d65 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2716,6 +2716,12 @@ public class HiveConf extends Configuration { LLAP_MANAGEMENT_ACL("hive.llap.management.acl", "*", "The ACL for LLAP daemon management."), LLAP_MANAGEMENT_ACL_DENY("hive.llap.management.acl.blocked", "", "The deny ACL for LLAP daemon management."), + LLAP_REMOTE_TOKEN_REQUIRES_SIGNING("hive.llap.remote.token.requires.signing", "true", + new StringSet("false", "except_llap_owner", "true"), + "Whether the token returned from LLAP management API should require fragment signing.\n" + + "True by default; can be disabled to allow CLI to get tokens from LLAP in a secure\n" + + "cluster by setting it to true or 'except_llap_owner' (the latter returns such tokens\n" + + "to everyone except the user LLAP cluster is authenticating under)."), // Hadoop DelegationTokenManager default is 1 week. LLAP_DELEGATION_TOKEN_LIFETIME("hive.llap.daemon.delegation.token.lifetime", "14d", @@ -2725,11 +2731,6 @@ public class HiveConf extends Configuration { "RPC port for LLAP daemon management service."), LLAP_WEB_AUTO_AUTH("hive.llap.auto.auth", false, "Whether or not to set Hadoop configs to enable auth in LLAP web app."), - LLAP_CREATE_TOKEN_LOCALLY("hive.llap.create.token.locally", "hs2", - new StringSet("true", "hs2", "false"), - "Whether to create LLAP tokens locally, saving directly to ZooKeeper SecretManager.\n" + - "Requires one to have access to ZK paths; in other words, this should only be used in\n" + - "HiveServer2. By default, the value is 'hs2', which means exactly that."), LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5, "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"), http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java new file mode 100644 index 0000000..921e050 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.security; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.net.SocketFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; + +public class LlapTokenClient { + private static final Logger LOG = LoggerFactory.getLogger(LlapTokenClient.class); + + private final LlapRegistryService registry; + private final SocketFactory socketFactory; + private final RetryPolicy retryPolicy; + private final Configuration conf; + private ServiceInstanceSet activeInstances; + private Collection<ServiceInstance> lastKnownInstances; + private LlapManagementProtocolClientImpl client; + private ServiceInstance clientInstance; + + public LlapTokenClient(Configuration conf) { + this.conf = conf; + registry = new LlapRegistryService(false); + registry.init(conf); + socketFactory = NetUtils.getDefaultSocketFactory(conf); + retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( + 16000, 2000l, TimeUnit.MILLISECONDS); + } + + public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException { + if (!UserGroupInformation.isSecurityEnabled()) return null; + Iterator<ServiceInstance> llaps = null; + if (clientInstance == null) { + assert client == null; + llaps = getLlapServices(false).iterator(); + clientInstance = llaps.next(); + } + + ByteString tokenBytes = null; + boolean hasRefreshed = false; + while (true) { + try { + tokenBytes = getTokenBytes(appId); + break; + } catch (IOException | ServiceException ex) { + LOG.error("Cannot get a token, trying a different instance", ex); + client = null; + clientInstance = null; + } + if (llaps == null || !llaps.hasNext()) { + if (hasRefreshed) { // Only refresh once. + throw new RuntimeException("Cannot find any LLAPs to get the token from"); + } + llaps = getLlapServices(true).iterator(); + hasRefreshed = true; + } + clientInstance = llaps.next(); + } + + Token<LlapTokenIdentifier> token = extractToken(tokenBytes); + if (LOG.isInfoEnabled()) { + LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token); + } + return token; + } + + private Token<LlapTokenIdentifier> extractToken(ByteString tokenBytes) throws IOException { + Token<LlapTokenIdentifier> token = new Token<>(); + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(tokenBytes.asReadOnlyByteBuffer()); + token.readFields(in); + return token; + } + + private ByteString getTokenBytes(final String appId) throws IOException, ServiceException { + assert clientInstance != null; + if (client == null) { + client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(), + clientInstance.getManagementPort(), retryPolicy, socketFactory); + } + GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder(); + if (!StringUtils.isBlank(appId)) { + req.setAppId(appId); + } + return client.getDelegationToken(null, req.build()).getToken(); + } + + /** Synchronized - LLAP registry and instance set are not thread safe. */ + private synchronized List<ServiceInstance> getLlapServices( + boolean doForceRefresh) throws IOException { + if (!doForceRefresh && lastKnownInstances != null) { + return new ArrayList<>(lastKnownInstances); + } + if (activeInstances == null) { + registry.start(); + activeInstances = registry.getInstances(); + } + Map<String, ServiceInstance> daemons = activeInstances.getAll(); + if (daemons == null || daemons.isEmpty()) { + throw new RuntimeException("No LLAPs found"); + } + lastKnownInstances = daemons.values(); + return new ArrayList<ServiceInstance>(lastKnownInstances); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java deleted file mode 100644 index ebc91b1..0000000 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.security; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.net.SocketFactory; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; -import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; - -public class LlapTokenClientFactory { - private static final Logger LOG = LoggerFactory.getLogger(LlapTokenClientFactory.class); - - private final LlapRegistryService registry; - private final SocketFactory socketFactory; - private final RetryPolicy retryPolicy; - private final Configuration conf; - private ServiceInstanceSet activeInstances; - private Collection<ServiceInstance> lastKnownInstances; - - public LlapTokenClientFactory(Configuration conf) { - this.conf = conf; - registry = new LlapRegistryService(false); - registry.init(conf); - socketFactory = NetUtils.getDefaultSocketFactory(conf); - retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( - 16000, 2000l, TimeUnit.MILLISECONDS); - } - - public interface Client { - Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException; - } - - public Client createClient() { - return new ClientImpl(); // Client is separate from factory mostly for thread-safety reasons. - } - - private class ClientImpl implements Client { - private LlapManagementProtocolClientImpl client; - private ServiceInstance clientInstance; - - @Override - public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException { - if (!UserGroupInformation.isSecurityEnabled()) return null; - Iterator<ServiceInstance> llaps = null; - if (clientInstance == null) { - assert client == null; - llaps = getLlapServices(false).iterator(); - clientInstance = llaps.next(); - } - - ByteString tokenBytes = null; - boolean hasRefreshed = false; - while (true) { - try { - tokenBytes = getTokenBytes(appId); - break; - } catch (IOException | ServiceException ex) { - LOG.error("Cannot get a token, trying a different instance", ex); - client = null; - clientInstance = null; - } - if (llaps == null || !llaps.hasNext()) { - if (hasRefreshed) { // Only refresh once. - throw new RuntimeException("Cannot find any LLAPs to get the token from"); - } - llaps = getLlapServices(true).iterator(); - hasRefreshed = true; - } - clientInstance = llaps.next(); - } - - Token<LlapTokenIdentifier> token = extractToken(tokenBytes); - if (LOG.isInfoEnabled()) { - LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token); - } - return token; - } - - private Token<LlapTokenIdentifier> extractToken(ByteString tokenBytes) throws IOException { - Token<LlapTokenIdentifier> token = new Token<>(); - DataInputByteBuffer in = new DataInputByteBuffer(); - in.reset(tokenBytes.asReadOnlyByteBuffer()); - token.readFields(in); - return token; - } - - private ByteString getTokenBytes(final String appId) throws IOException, ServiceException { - assert clientInstance != null; - if (client == null) { - client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(), - clientInstance.getManagementPort(), retryPolicy, socketFactory); - } - GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder(); - if (!StringUtils.isBlank(appId)) { - req.setAppId(appId); - } - return client.getDelegationToken(null, req.build()).getToken(); - } - } - - /** Synchronized - LLAP registry and instance set are not thread safe. */ - private synchronized List<ServiceInstance> getLlapServices( - boolean doForceRefresh) throws IOException { - if (!doForceRefresh && lastKnownInstances != null) { - return new ArrayList<>(lastKnownInstances); - } - if (activeInstances == null) { - registry.start(); - activeInstances = registry.getInstances(); - } - Map<String, ServiceInstance> daemons = activeInstances.getAll(); - if (daemons == null || daemons.isEmpty()) { - throw new RuntimeException("No LLAPs found"); - } - lastKnownInstances = daemons.values(); - return new ArrayList<ServiceInstance>(lastKnownInstances); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java index f10351b..af889b6 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.security.token.Token; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,20 +31,21 @@ public class LlapTokenLocalClient { private final SecretManager secretManager; public LlapTokenLocalClient(Configuration conf, String clusterId) { + // TODO: create this centrally in HS2 case secretManager = SecretManager.createSecretManager(conf, clusterId); } - public Token<LlapTokenIdentifier> createToken(String appId, String user) throws IOException { + public Token<LlapTokenIdentifier> createToken( + String appId, String user, boolean isSignatureRequired) throws IOException { try { - Token<LlapTokenIdentifier> token = secretManager.createLlapToken(appId, user); + Token<LlapTokenIdentifier> token = secretManager.createLlapToken( + appId, user, isSignatureRequired); if (LOG.isInfoEnabled()) { LOG.info("Created a LLAP delegation token locally: " + token); } return token; } catch (Exception ex) { - throw new IOException("Failed to create LLAP token locally. You might need to set " - + ConfVars.LLAP_CREATE_TOKEN_LOCALLY.varname - + " to false, or make sure you can access secure ZK paths.", ex); + throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); } } http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java new file mode 100644 index 0000000..478a40a --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.security; + +import java.io.IOException; + +public interface LlapSigner { + /** An object signable by a signer. */ + public interface Signable { + /** Called by the signer to record key information as part of the message to be signed. */ + void setSignInfo(int masterKeyId, String user); + /** Called by the signer to get the serialized representation of the message to be signed. */ + byte[] serialize() throws IOException; + } + + /** Message with the signature. */ + public static final class SignedMessage { + public byte[] message, signature; + } + + /** Serializes and signs the message. */ + SignedMessage serializeAndSign(Signable message) throws IOException; + + void checkSignature(byte[] message, byte[] signature, int keyId); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java index 7c47f0b..08c141f 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java @@ -30,23 +30,24 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti import com.google.common.base.Preconditions; -/** For now, a LLAP token gives access to any LLAP server. */ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier { private static final String KIND = "LLAP_TOKEN"; public static final Text KIND_NAME = new Text(KIND); private String clusterId; private String appId; + private boolean isSigningRequired; public LlapTokenIdentifier() { super(); } public LlapTokenIdentifier(Text owner, Text renewer, Text realUser, - String clusterId, String appId) { + String clusterId, String appId, boolean isSigningRequired) { super(owner, renewer, realUser); Preconditions.checkNotNull(clusterId); this.clusterId = clusterId; this.appId = appId == null ? "" : appId; + this.isSigningRequired = isSigningRequired; } @Override @@ -54,6 +55,7 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier { super.write(out); out.writeUTF(clusterId); out.writeUTF(appId); + out.writeBoolean(isSigningRequired); } @Override @@ -62,6 +64,7 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier { clusterId = in.readUTF(); Preconditions.checkNotNull(clusterId); appId = in.readUTF(); + isSigningRequired = in.readBoolean(); appId = appId == null ? "" : appId; } @@ -78,10 +81,15 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier { return clusterId; } + public boolean isSigningRequired() { + return isSigningRequired; + } + @Override public int hashCode() { final int prime = 31; int result = prime * super.hashCode() + (StringUtils.isBlank(appId) ? 0 : appId.hashCode()); + result = prime * result + (isSigningRequired ? 1231 : 1237); return prime * result + ((clusterId == null) ? 0 : clusterId.hashCode()); } @@ -90,7 +98,7 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier { if (this == obj) return true; if (!(obj instanceof LlapTokenIdentifier) || !super.equals(obj)) return false; LlapTokenIdentifier other = (LlapTokenIdentifier) obj; - return (StringUtils.isBlank(appId) + return isSigningRequired == other.isSigningRequired && (StringUtils.isBlank(appId) ? StringUtils.isBlank(other.appId) : appId.equals(other.appId)) && (clusterId == null ? other.clusterId == null : clusterId.equals(other.clusterId)); } http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java index 465b204..5aa4b84 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java @@ -1,7 +1,11 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -27,12 +31,14 @@ import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> { +public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> + implements SigningSecretManager { private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class); private final String clusterId; @@ -86,43 +92,76 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent return id; } - public static SecretManager createSecretManager(final Configuration conf, String clusterId) { - String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), - llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); - return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId); + @Override + public synchronized DelegationKey getCurrentKey() { + return allKeys.get(getCurrentKeyId()); } + @Override + public byte[] signWithKey(byte[] message, DelegationKey key) { + return createPassword(message, key.getKey()); + } - public static SecretManager createSecretManager(final Configuration conf, - String llapPrincipal, String llapKeytab, final String clusterId) { - // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways. - UserGroupInformation zkUgi = null; - String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal); - String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab); - try { - zkUgi = LlapUtil.loginWithKerberos(principal, keyTab); - } catch (IOException e) { - throw new RuntimeException(e); + @Override + public byte[] signWithKey(byte[] message, int keyId) throws SecurityException { + DelegationKey key = getDelegationKey(keyId); + if (key == null) { + throw new SecurityException("The key ID " + keyId + " was not found"); } - // Override the default delegation token lifetime for LLAP. - // Also set all the necessary ZK settings to defaults and LLAP configs, if not set. - final Configuration zkConf = new Configuration(conf); + return createPassword(message, key.getKey()); + } + + static final class LlapZkConf { + public Configuration zkConf; + public UserGroupInformation zkUgi; + public LlapZkConf(Configuration zkConf, UserGroupInformation zkUgi) { + this.zkConf = zkConf; + this.zkUgi = zkUgi; + } + } + + private static LlapZkConf createLlapZkConf( + Configuration conf, String llapPrincipal, String llapKeytab, String clusterId) { + String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal); + String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab); + // Override the default delegation token lifetime for LLAP. + // Also set all the necessary ZK settings to defaults and LLAP configs, if not set. + final Configuration zkConf = new Configuration(conf); long tokenLifetime = HiveConf.getTimeVar( conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS); zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime); zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime); zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal); zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab); - String zkPath = clusterId; + String zkPath = "zkdtsm_" + clusterId; LOG.info("Using {} as ZK secret manager path", zkPath); - zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath); + zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, zkPath); setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl"); setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING, HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING)); - return zkUgi.doAs(new PrivilegedAction<SecretManager>() { + + UserGroupInformation zkUgi = null; + try { + zkUgi = LlapUtil.loginWithKerberos(principal, keyTab); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new LlapZkConf(zkConf, zkUgi); + } + + public static SecretManager createSecretManager(final Configuration conf, String clusterId) { + String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), + llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); + return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId); + } + + public static SecretManager createSecretManager( + final Configuration conf, String llapPrincipal, String llapKeytab, final String clusterId) { + final LlapZkConf c = createLlapZkConf(conf, llapPrincipal, llapKeytab, clusterId); + return c.zkUgi.doAs(new PrivilegedAction<SecretManager>() { @Override public SecretManager run() { - SecretManager zkSecretManager = new SecretManager(zkConf, clusterId); + SecretManager zkSecretManager = new SecretManager(c.zkConf, clusterId); try { zkSecretManager.startThreads(); } catch (IOException e) { @@ -138,7 +177,8 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent zkConf.set(name, value); } - public Token<LlapTokenIdentifier> createLlapToken(String appId, String user) throws IOException { + public Token<LlapTokenIdentifier> createLlapToken( + String appId, String user, boolean isSignatureRequired) throws IOException { Text realUser = null, renewer = null; if (user == null) { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); @@ -151,7 +191,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent renewer = new Text(user); } LlapTokenIdentifier llapId = new LlapTokenIdentifier( - new Text(user), renewer, realUser, clusterId, appId); + new Text(user), renewer, realUser, clusterId, appId, isSignatureRequired); // TODO: note that the token is not renewable right now and will last for 2 weeks by default. Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, this); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java new file mode 100644 index 0000000..067a98e --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.security; + +import org.apache.hadoop.security.token.delegation.DelegationKey; + +public interface SigningSecretManager { + DelegationKey getCurrentKey(); + byte[] signWithKey(byte[] message, DelegationKey key); + byte[] signWithKey(byte[] message, int keyId) throws SecurityException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2524dc2..d439c07 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -27,11 +27,13 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; +import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; @@ -46,11 +48,14 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -68,7 +73,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -// TODO Convert this to a CompositeService +import com.google.protobuf.ByteString; + public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler { // TODO Setup a set of threads to process incoming requests. @@ -89,12 +95,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu private final TaskRunnerCallable.ConfParams confParams; private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl(); private final HadoopShim tezHadoopShim; + private final LlapSignerImpl signer; + private final String clusterId; public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize, boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort, AtomicReference<InetSocketAddress> localAddress, long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics, - AMReporter amReporter, ClassLoader classLoader, String clusterId) { + AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId) { super("ContainerRunnerImpl"); this.conf = conf; Preconditions.checkState(numExecutors > 0, @@ -102,7 +110,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu this.localAddress = localAddress; this.localShufflePort = localShufflePort; this.amReporter = amReporter; + this.signer = UserGroupInformation.isSecurityEnabled() + ? new LlapSignerImpl(conf, daemonId) : null; + this.clusterId = daemonId.getClusterString(); this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId); addIfService(queryTracker); String waitQueueSchedulerClassName = HiveConf.getVar( @@ -153,9 +164,22 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu @Override public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException { - // TODO: also support binary. Actually, we should figure out the binary stuff here and - // stop passing the protobuf around. We should pass around some plain objects/values. - SignableVertexSpec vertex = request.getWorkSpec().getVertex(); + VertexOrBinary vob = request.getWorkSpec(); + SignableVertexSpec vertex = vob.hasVertex() ? vob.getVertex() : null; + ByteString vertexBinary = vob.hasVertexBinary() ? vob.getVertexBinary() : null; + if (vertex != null && vertexBinary != null) { + throw new IOException( + "Vertex and vertexBinary in VertexOrBinary cannot be set at the same time"); + } + if (vertexBinary != null) { + vertex = SignableVertexSpec.parseFrom(vob.getVertexBinary()); + } + + LlapTokenInfo tokenInfo = LlapTokenChecker.getTokenInfo(clusterId); + if (tokenInfo.isSigningRequired) { + checkSignature(vertex, vertexBinary, request, tokenInfo.userName); + } + if (LOG.isInfoEnabled()) { LOG.info("Queueing container for execution: " + stringifySubmitRequest(request, vertex)); } @@ -166,6 +190,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu HistoryLogger.logFragmentStart(vId.getApplicationIdString(), request.getContainerIdString(), localAddress.get().getHostName(), vertex.getDagName(), vId.getDagId(), vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); + // This is the start of container-annotated logging. // TODO Reduce the length of this string. Way too verbose at the moment. NDC.push(fragmentIdString); @@ -194,7 +219,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu QueryFragmentInfo fragmentInfo = queryTracker.registerFragment( queryIdentifier, vId.getApplicationIdString(), vertex.getDagName(), dagIdentifier, vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), - vertex.getUser(), vertex, jobToken, fragmentIdString); + vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo); String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); @@ -208,7 +233,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, - this, tezHadoopShim, attemptId); + this, tezHadoopShim, attemptId, vertex); submissionState = executorService.schedule(callable); if (LOG.isInfoEnabled()) { @@ -233,6 +258,24 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu return responseBuilder.build(); } + private void checkSignature(SignableVertexSpec vertex, ByteString vertexBinary, + SubmitWorkRequestProto request, String tokenUserName) throws SecurityException, IOException { + if (!request.hasWorkSpecSignature()) { + throw new SecurityException("Unsigned fragment not allowed"); + } + if (vertexBinary == null) { + ByteString.Output os = ByteString.newOutput(); + vertex.writeTo(os); + vertexBinary = os.toByteString(); + } + signer.checkSignature(vertexBinary.toByteArray(), + request.getWorkSpecSignature().toByteArray(), (int)vertex.getSignatureKeyId()); + if (!vertex.hasUser() || !vertex.getUser().equals(tokenUserName)) { + throw new SecurityException("LLAP token is for " + tokenUserName + + " but the fragment is for " + (vertex.hasUser() ? vertex.getUser() : null)); + } + } + private static class LlapExecutionContext extends ExecutionContextImpl implements TezProcessor.Hook { private final QueryTracker queryTracker; http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 5ab7b3c..2faedcd 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -248,7 +248,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize, enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryBytes, metrics, - amReporter, executorClassLoader, daemonId.getClusterString()); + amReporter, executorClassLoader, daemonId); addIfService(containerRunner); // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties. http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index b94fc2e..7ccd28f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -64,6 +63,9 @@ public class LlapProtocolServerImpl extends AbstractService implements LlapProtocolBlockingPB, LlapManagementProtocolPB { private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolServerImpl.class); + private enum TokenRequiresSigning { + TRUE, FALSE, EXCEPT_OWNER + } private final int numHandlers; private final ContainerRunner containerRunner; @@ -71,8 +73,10 @@ public class LlapProtocolServerImpl extends AbstractService private RPC.Server server, mngServer; private final AtomicReference<InetSocketAddress> srvAddress, mngAddress; private SecretManager zkSecretManager; - private String restrictedToUser = null; + private String clusterUser = null; + private boolean isRestrictedToClusterUser = false; private final DaemonId daemonId; + private TokenRequiresSigning isSigningRequiredConfig = TokenRequiresSigning.TRUE; public LlapProtocolServerImpl(int numHandlers, ContainerRunner containerRunner, AtomicReference<InetSocketAddress> srvAddress, AtomicReference<InetSocketAddress> mngAddress, @@ -132,6 +136,7 @@ public class LlapProtocolServerImpl extends AbstractService @Override public void serviceStart() { final Configuration conf = getConfig(); + isSigningRequiredConfig = getSigningConfig(conf); final BlockingService daemonImpl = LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this); final BlockingService managementImpl = @@ -140,13 +145,14 @@ public class LlapProtocolServerImpl extends AbstractService startProtocolServers(conf, daemonImpl, managementImpl); return; } + try { + this.clusterUser = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + throw new RuntimeException(e); + } if (isPermissiveManagementAcl(conf)) { LOG.warn("Management protocol has a '*' ACL."); - try { - this.restrictedToUser = UserGroupInformation.getCurrentUser().getShortUserName(); - } catch (IOException e) { - throw new RuntimeException(e); - } + isRestrictedToClusterUser = true; } String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); @@ -169,6 +175,20 @@ public class LlapProtocolServerImpl extends AbstractService }); } + private static TokenRequiresSigning getSigningConfig(final Configuration conf) { + String signSetting = HiveConf.getVar( + conf, ConfVars.LLAP_REMOTE_TOKEN_REQUIRES_SIGNING).toLowerCase(); + switch (signSetting) { + case "true": return TokenRequiresSigning.TRUE; + case "except_llap_owner": return TokenRequiresSigning.EXCEPT_OWNER; + case "false": return TokenRequiresSigning.FALSE; + default: { + throw new RuntimeException("Invalid value for " + + ConfVars.LLAP_REMOTE_TOKEN_REQUIRES_SIGNING.varname + ": " + signSetting); + } + } + } + private static boolean isPermissiveManagementAcl(Configuration conf) { return HiveConf.getBoolVar(conf, ConfVars.LLAP_VALIDATE_ACLS) && AccessControlList.WILDCARD_ACL_VALUE.equals( @@ -266,17 +286,20 @@ public class LlapProtocolServerImpl extends AbstractService if (zkSecretManager == null) { throw new ServiceException("Operation not supported on unsecure cluster"); } - UserGroupInformation ugi = null; + UserGroupInformation callingUser = null; Token<LlapTokenIdentifier> token = null; try { - ugi = UserGroupInformation.getCurrentUser(); - token = zkSecretManager.createLlapToken(request.getAppId(), null); + callingUser = UserGroupInformation.getCurrentUser(); + // Determine if the user would need to sign fragments. + boolean isSigningRequired = determineIfSigningIsRequired(callingUser); + token = zkSecretManager.createLlapToken( + request.hasAppId() ? request.getAppId() : null, null, isSigningRequired); } catch (IOException e) { throw new ServiceException(e); } - if (restrictedToUser != null && !restrictedToUser.equals(ugi.getShortUserName())) { + if (isRestrictedToClusterUser && !clusterUser.equals(callingUser.getShortUserName())) { throw new ServiceException("Management protocol ACL is too permissive. The access has been" - + " automatically restricted to " + restrictedToUser + "; " + ugi.getShortUserName() + + " automatically restricted to " + clusterUser + "; " + callingUser.getShortUserName() + " is denied acccess. Please set " + ConfVars.LLAP_VALIDATE_ACLS.varname + " to false," + " or adjust " + ConfVars.LLAP_MANAGEMENT_ACL.varname + " and " + ConfVars.LLAP_MANAGEMENT_ACL_DENY.varname + " to a more restrictive ACL."); @@ -292,4 +315,16 @@ public class LlapProtocolServerImpl extends AbstractService GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build(); return response; } + + private boolean determineIfSigningIsRequired(UserGroupInformation callingUser) { + switch (isSigningRequiredConfig) { + case FALSE: return false; + case TRUE: return true; + // Note that this uses short user name without consideration for Kerberos realm. + // This seems to be the common approach (e.g. for HDFS permissions), but it may be + // better to consider the realm (although not the host, so not the full name). + case EXCEPT_OWNER: return !clusterUser.equals(callingUser.getShortUserName()); + default: throw new AssertionError("Unknown value " + isSigningRequiredConfig); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java index 04df929..24a7737 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java @@ -23,8 +23,6 @@ import java.util.List; import java.io.IOException; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.TokenIdentifier; @@ -34,8 +32,20 @@ import org.slf4j.LoggerFactory; public final class LlapTokenChecker { private static final Logger LOG = LoggerFactory.getLogger(LlapTokenChecker.class); - private static final ImmutablePair<String, String> NO_SECURITY = new ImmutablePair<>(null, null); - public static Pair<String, String> getTokenInfo(String clusterId) throws IOException { + public static final class LlapTokenInfo { + public final String userName; + public final String appId; + public final boolean isSigningRequired; + + public LlapTokenInfo(String userName, String appId, boolean isSigningRequired) { + this.userName = userName; + this.appId = appId; + this.isSigningRequired = isSigningRequired; + } + } + + private static final LlapTokenInfo NO_SECURITY = new LlapTokenInfo(null, null, false); + public static LlapTokenInfo getTokenInfo(String clusterId) throws IOException { if (!UserGroupInformation.isSecurityEnabled()) return NO_SECURITY; UserGroupInformation current = UserGroupInformation.getCurrentUser(); String kerberosName = current.hasKerberosCredentials() ? current.getShortUserName() : null; @@ -65,13 +75,14 @@ public final class LlapTokenChecker { } @VisibleForTesting - static Pair<String, String> getTokenInfoInternal( + static LlapTokenInfo getTokenInfoInternal( String kerberosName, List<LlapTokenIdentifier> tokens) { assert (tokens != null && !tokens.isEmpty()) || kerberosName != null; if (tokens == null) { - return new ImmutablePair<String, String>(kerberosName, null); + return new LlapTokenInfo(kerberosName, null, true); } String userName = kerberosName, appId = null; + boolean isSigningRequired = false; for (LlapTokenIdentifier llapId : tokens) { String newUserName = llapId.getRealUser().toString(); if (userName != null && !userName.equals(newUserName)) { @@ -88,9 +99,10 @@ public final class LlapTokenChecker { } appId = newAppId; } + isSigningRequired = isSigningRequired || llapId.isSigningRequired(); } assert userName != null; - return new ImmutablePair<String, String>(userName, appId); + return new LlapTokenInfo(userName, appId, isSigningRequired); } public static void checkPermissions( @@ -120,12 +132,12 @@ public final class LlapTokenChecker { } public static void checkPermissions( - Pair<String, String> prm, String userName, String appId, Object hint) { + LlapTokenInfo prm, String userName, String appId, Object hint) { if (userName == null) { assert StringUtils.isEmpty(appId); return; } - if (!checkTokenPermissions(userName, appId, prm.getLeft(), prm.getRight())) { + if (!checkTokenPermissions(userName, appId, prm.userName, prm.appId)) { throw new SecurityException("Unauthorized to access " + userName + ", " + appId.hashCode() + " (" + hint + ")"); } http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index c55436b..a965872 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -20,7 +20,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; @@ -119,7 +119,7 @@ public class QueryTracker extends AbstractService { QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken, - String fragmentIdString) throws IOException { + String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.readLock().lock(); try { @@ -132,16 +132,18 @@ public class QueryTracker extends AbstractService { } // TODO: for now, we get the secure username out of UGI... after signing, we can take it // out of the request provided that it's signed. - Pair<String, String> tokenInfo = LlapTokenChecker.getTokenInfo(clusterId); + if (tokenInfo == null) { + tokenInfo = LlapTokenChecker.getTokenInfo(clusterId); + } boolean isExistingQueryInfo = true; QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); if (queryInfo == null) { - String tokenUser = tokenInfo.getLeft(), tokenAppId = tokenInfo.getRight(); if (UserGroupInformation.isSecurityEnabled()) { - Preconditions.checkNotNull(tokenUser); + Preconditions.checkNotNull(tokenInfo.userName); } queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user, - getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, tokenUser, tokenAppId); + getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, + tokenInfo.userName, tokenInfo.appId); QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); if (old != null) { queryInfo = old; http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index eac0e8f..1e302e8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -628,9 +628,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta String state = reason == null ? "FAILED" : reason.name(); boolean removed = preemptionQueue.remove(taskWrapper); if (removed && isInfoEnabled) { - LOG.info(TaskRunnerCallable - .getTaskIdentifierString(taskWrapper.getTaskRunnerCallable().getRequest()) - + " request " + state + "! Removed from preemption list."); + TaskRunnerCallable trc = taskWrapper.getTaskRunnerCallable(); + LOG.info(TaskRunnerCallable.getTaskIdentifierString(trc.getRequest(), + trc.getVertexSpec()) + " request " + state + "! Removed from preemption list."); } if (metrics != null) { metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 74359fa..0d9882b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -124,7 +124,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { ConfParams confParams, LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, FragmentCompletionHandler fragmentCompleteHandler, - HadoopShim tezHadoopShim, TezTaskAttemptID attemptId) { + HadoopShim tezHadoopShim, TezTaskAttemptID attemptId, + SignableVertexSpec vertex) { this.request = request; this.fragmentInfo = fragmentInfo; this.conf = conf; @@ -135,8 +136,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { this.memoryAvailable = memoryAvailable; this.confParams = confParams; this.jobToken = TokenCache.getSessionToken(credentials); - // TODO: support binary spec here or above - this.vertex = request.getWorkSpec().getVertex(); + this.vertex = vertex; this.taskSpec = Converters.getTaskSpecfromProto( vertex, request.getFragmentNumber(), request.getAttemptNumber(), attemptId); this.amReporter = amReporter; @@ -389,7 +389,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } public TaskRunnerCallback getCallback() { - return new TaskRunnerCallback(request, this); + return new TaskRunnerCallback(request, vertex, this); } public SubmitWorkRequestProto getRequest() { @@ -399,11 +399,13 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { final class TaskRunnerCallback implements FutureCallback<TaskRunner2Result> { private final SubmitWorkRequestProto request; + private final SignableVertexSpec vertex; private final TaskRunnerCallable taskRunnerCallable; - TaskRunnerCallback(SubmitWorkRequestProto request, + TaskRunnerCallback(SubmitWorkRequestProto request, SignableVertexSpec vertex, TaskRunnerCallable taskRunnerCallable) { this.request = request; + this.vertex = vertex; this.taskRunnerCallable = taskRunnerCallable; } @@ -463,7 +465,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { @Override public void onFailure(Throwable t) { - LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t); + LOG.error("TezTaskRunner execution failed for : " + + getTaskIdentifierString(request, vertex), t); isCompleted.set(true); fragmentCompletionHanler.fragmentComplete(fragmentInfo); // TODO HIVE-10236 Report a fatal error over the umbilical @@ -494,10 +497,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } public static String getTaskIdentifierString( - SubmitWorkRequestProto request) { + SubmitWorkRequestProto request, SignableVertexSpec vertex) { StringBuilder sb = new StringBuilder(); - // TODO: also support the binary version - SignableVertexSpec vertex = request.getWorkSpec().getVertex(); sb.append("AppId=").append(vertex.getVertexIdentifier().getApplicationIdString()) .append(", containerId=").append(request.getContainerIdString()) .append(", Dag=").append(vertex.getDagName()) http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java new file mode 100644 index 0000000..4174593 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.security; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.DelegationKey; + +import com.google.common.annotations.VisibleForTesting; + +public class LlapSignerImpl implements LlapSigner { + private final SigningSecretManager secretManager; + + public LlapSignerImpl(Configuration conf, DaemonId daemonId) { + // TODO: create this centrally in HS2 case + secretManager = SecretManager.createSecretManager(conf, daemonId.getClusterString()); + } + + @VisibleForTesting + public LlapSignerImpl(SigningSecretManager sm) { + secretManager = sm; + } + + @Override + public SignedMessage serializeAndSign(Signable message) throws IOException { + SignedMessage result = new SignedMessage(); + DelegationKey key = secretManager.getCurrentKey(); + message.setSignInfo(key.getKeyId(), UserGroupInformation.getCurrentUser().getUserName()); + result.message = message.serialize(); + result.signature = secretManager.signWithKey(result.message, key); + return result; + } + + @Override + public void checkSignature(byte[] message, byte[] signature, int keyId) + throws SecurityException { + byte[] expectedSignature = secretManager.signWithKey(message, keyId); + if (Arrays.equals(signature, expectedSignature)) return; + throw new SecurityException("Message signature does not match"); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index e0f0676..96d626a 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -150,7 +150,8 @@ public class TaskExecutorTestHelpers { new ExecutionContextImpl("localhost"), null, new Credentials(), 0, mock(AMReporter.class), null, mock( LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class), new DefaultHadoopShim(), null); + FragmentCompletionHandler.class), new DefaultHadoopShim(), null, + requestProto.getWorkSpec().getVertex()); this.workTime = workTime; this.canFinish = canFinish; } http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java index aaaa762..d4ded23 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java @@ -65,7 +65,7 @@ public class TestLlapTokenChecker { private List<LlapTokenIdentifier> createTokens(String... args) { List<LlapTokenIdentifier> tokens = new ArrayList<>(); for (int i = 0; i < args.length; i += 2) { - tokens.add(new LlapTokenIdentifier(null, null, new Text(args[i]), "c", args[i + 1])); + tokens.add(new LlapTokenIdentifier(null, null, new Text(args[i]), "c", args[i + 1], false)); } return tokens; } @@ -89,8 +89,8 @@ public class TestLlapTokenChecker { } } - private void check(Pair<String, String> p, String user, String appId) { - assertEquals(user, p.getLeft()); - assertEquals(appId, p.getRight()); + private void check(LlapTokenChecker.LlapTokenInfo p, String user, String appId) { + assertEquals(user, p.userName); + assertEquals(appId, p.appId); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index a250882..ac48a3a 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -53,37 +53,6 @@ public class TestFirstInFirstOutComparator { private static Configuration conf; private static Credentials cred = new Credentials(); - private static class MockRequest extends TaskRunnerCallable { - private int workTime; - private boolean canFinish; - - public MockRequest(SubmitWorkRequestProto requestProto, - boolean canFinish, int workTime) { - super(requestProto, mock(QueryFragmentInfo.class), conf, - new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null, - mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class), new DefaultHadoopShim(), null); - this.workTime = workTime; - this.canFinish = canFinish; - } - - @Override - protected TaskRunner2Result callInternal() { - System.out.println(super.getRequestId() + " is executing.."); - try { - Thread.sleep(workTime); - } catch (InterruptedException e) { - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); - } - return new TaskRunner2Result(EndReason.SUCCESS, null, null, false); - } - - @Override - public boolean canFinish() { - return canFinish; - } - } - @Before public void setup() { conf = new Configuration(); http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java new file mode 100644 index 0000000..0420225 --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.security; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.hadoop.hive.llap.security.LlapSigner.Signable; +import org.apache.hadoop.hive.llap.security.LlapSigner.SignedMessage; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestLlapSignerImpl { + private static final Logger LOG = LoggerFactory.getLogger(TestLlapSignerImpl.class); + + @Test(timeout = 10000) + public void testSigning() throws Exception { + FakeSecretManager fsm = new FakeSecretManager(); + fsm.startThreads(); + + // Make sure the signature works. + LlapSignerImpl signer = new LlapSignerImpl(fsm); + byte theByte = 1; + TestSignable in = new TestSignable(theByte); + TestSignable in2 = new TestSignable(++theByte); + SignedMessage sm2 = signer.serializeAndSign(in2); + SignedMessage sm = signer.serializeAndSign(in); + TestSignable out = TestSignable.deserialize(sm.message); + TestSignable out2 = TestSignable.deserialize(sm2.message); + assertEquals(in, out); + assertEquals(in2, out2); + signer.checkSignature(sm.message, sm.signature, out.masterKeyId); + signer.checkSignature(sm2.message, sm2.signature, out2.masterKeyId); + + // Make sure the broken signature doesn't work. + try { + signer.checkSignature(sm.message, sm2.signature, out.masterKeyId); + fail("Didn't throw"); + } catch (SecurityException ex) { + // Expected. + } + + int index = sm.signature.length / 2; + sm.signature[index] = (byte)(sm.signature[index] + 1); + try { + signer.checkSignature(sm.message, sm.signature, out.masterKeyId); + fail("Didn't throw"); + } catch (SecurityException ex) { + // Expected. + } + sm.signature[index] = (byte)(sm.signature[index] - 1); + + // Adding keys is PITA - there's no way to plug into timed rolling; just create a new fsm. + DelegationKey dk = fsm.getCurrentKey(); + fsm.stopThreads(); + fsm = new FakeSecretManager(); + fsm.addKey(dk); + fsm.startThreads(); + signer = new LlapSignerImpl(fsm); + // Sign in2 with a different key. + sm2 = signer.serializeAndSign(in2); + out2 = TestSignable.deserialize(sm2.message); + assertNotEquals(out.masterKeyId, out2.masterKeyId); + assertEquals(in2, out2); + signer.checkSignature(sm2.message, sm2.signature, out2.masterKeyId); + signer.checkSignature(sm.message, sm.signature, out.masterKeyId); + // Make sure the key ID mismatch causes error. + try { + signer.checkSignature(sm2.message, sm2.signature, out.masterKeyId); + fail("Didn't throw"); + } catch (SecurityException ex) { + // Expected. + } + + // The same for rolling the key; re-create the fsm with only the key #2. + dk = fsm.getCurrentKey(); + fsm.stopThreads(); + + fsm = new FakeSecretManager(); + fsm.addKey(dk); + fsm.startThreads(); + signer = new LlapSignerImpl(fsm); + signer.checkSignature(sm2.message, sm2.signature, out2.masterKeyId); + // The key is missing - shouldn't be able to verify. + try { + signer.checkSignature(sm.message, sm.signature, out.masterKeyId); + fail("Didn't throw"); + } catch (SecurityException ex) { + // Expected. + } + fsm.stopThreads(); + } + + private static class TestSignable implements Signable { + public int masterKeyId; + public byte index; + + public TestSignable(byte i) { + index = i; + } + + public TestSignable(int keyId, byte b) { + masterKeyId = keyId; + index = b; + } + + @Override + public void setSignInfo(int masterKeyId, String user) { + this.masterKeyId = masterKeyId; + } + + @Override + public byte[] serialize() throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(5); + dob.writeInt(masterKeyId); + dob.write(index); + byte[] b = dob.getData(); + dob.close(); + return b; + } + + public static TestSignable deserialize(byte[] bytes) throws IOException { + DataInputBuffer db = new DataInputBuffer(); + db.reset(bytes, bytes.length); + int keyId = db.readInt(); + byte b = db.readByte(); + db.close(); + return new TestSignable(keyId, b); + } + + @Override + public int hashCode() { + return 31 * index + masterKeyId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof TestSignable)) return false; + TestSignable other = (TestSignable) obj; + return (index == other.index) && (masterKeyId == other.masterKeyId); + } + } + + private static class FakeSecretManager + extends AbstractDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> + implements SigningSecretManager { + + public FakeSecretManager() { + // The keys instantly expire and are rolled. + super(10000000, 10000000, 10000000, 10000000); + } + + @Override + public DelegationKey getCurrentKey() { + return getDelegationKey(getCurrentKeyId()); + } + + @Override + public byte[] signWithKey(byte[] message, DelegationKey key) { + return createPassword(message, key.getKey()); + } + + @Override + public byte[] signWithKey(byte[] message, int keyId) throws SecurityException { + DelegationKey key = getDelegationKey(keyId); + if (key == null) { + throw new SecurityException("The key ID " + keyId + " was not found"); + } + return createPassword(message, key.getKey()); + } + + @Override + public AbstractDelegationTokenIdentifier createIdentifier() { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index c9b912b..d04cfa3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -54,7 +54,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl; -import org.apache.hadoop.hive.llap.security.LlapTokenClientFactory; +import org.apache.hadoop.hive.llap.security.LlapTokenClient; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient; import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; @@ -355,10 +355,12 @@ public class TezSessionState { String user, final Configuration conf) throws IOException { // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's // no good place for that right now (HIVE-13698). - boolean useLocalTokenClient = isUsingLocalClient(conf); + SessionState session = SessionState.get(); + boolean isInHs2 = session != null && session.isHiveServerQuery(); Token<LlapTokenIdentifier> token = null; // For Tez, we don't use appId to distinguish the tokens. - if (useLocalTokenClient) { + if (isInHs2) { + // We are in HS2, get the token locally. String clusterName = LlapUtil.generateClusterName(conf); // This assumes that the LLAP cluster and session are both running under HS2 user. final String clusterId = DaemonId.createClusterString(user, clusterName); @@ -368,12 +370,13 @@ public class TezSessionState { public LlapTokenLocalClient call() throws Exception { return new LlapTokenLocalClient(conf, clusterId); } - }).createToken(null, null); + }).createToken(null, null, false); // Signature is not required for Tez. } catch (ExecutionException e) { throw new IOException(e); } } else { - token = new LlapTokenClientFactory(conf).createClient().getDelegationToken(null); + // We are not in HS2; always create a new client for now. + token = new LlapTokenClient(conf).getDelegationToken(null); } if (LOG.isInfoEnabled()) { LOG.info("Obtained a LLAP token: " + token); @@ -381,19 +384,6 @@ public class TezSessionState { return token; } - private static boolean isUsingLocalClient(Configuration conf) { - String mode = HiveConf.getVar(conf, ConfVars.LLAP_CREATE_TOKEN_LOCALLY).toLowerCase(); - boolean isHs2Only = "hs2".equals(mode); - // We are initialized on first use inside TezSessionState::openInternal; assume the session - // should be available. - if (!isHs2Only) return "true".equals(mode); - SessionState session = SessionState.get(); - if (session == null && LOG.isInfoEnabled()) { - LOG.warn("There's no session to check if we are in HS2"); - } - return session != null && session.isHiveServerQuery(); - } - private TezClient startSessionAndContainers(TezClient session, HiveConf conf, Map<String, LocalResource> commonLocalResources, TezConfiguration tezConfig, boolean isOnThread) throws TezException, IOException {