Repository: hive Updated Branches: refs/heads/llap bc75d72b8 -> e05790973
HIVE-13445 : LLAP: token should encode application and cluster ids (Sergey Shelukhin, reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/868e5e14 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/868e5e14 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/868e5e14 Branch: refs/heads/llap Commit: 868e5e141856ce75af48d854d9e3eb13372d11f4 Parents: b621827 Author: Sergey Shelukhin <[email protected]> Authored: Tue May 3 12:01:32 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Tue May 3 13:38:03 2016 -0700 ---------------------------------------------------------------------- .../daemon/rpc/LlapDaemonProtocolProtos.java | 209 +++++++++++++++++-- .../org/apache/hadoop/hive/llap/DaemonId.java | 41 ++++ .../hive/llap/security/LlapTokenIdentifier.java | 39 +++- .../hive/llap/security/LlapTokenProvider.java | 2 +- .../src/protobuf/LlapDaemonProtocol.proto | 1 + .../hive/llap/daemon/ContainerRunner.java | 9 +- .../llap/daemon/impl/ContainerRunnerImpl.java | 47 +++-- .../hive/llap/daemon/impl/LlapDaemon.java | 52 ++++- .../daemon/impl/LlapProtocolServerImpl.java | 41 ++-- .../hive/llap/daemon/impl/LlapTokenChecker.java | 137 ++++++++++++ .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 17 +- .../hive/llap/daemon/impl/QueryTracker.java | 85 +++++--- .../hadoop/hive/llap/daemon/impl/Scheduler.java | 2 + .../llap/daemon/impl/TaskExecutorService.java | 9 + .../hive/llap/security/LlapSecurityHelper.java | 15 +- .../hive/llap/security/SecretManager.java | 19 +- .../hive/llap/daemon/MiniLlapCluster.java | 2 +- .../daemon/impl/TaskExecutorTestHelpers.java | 2 +- .../impl/TestLlapDaemonProtocolServerImpl.java | 2 +- .../llap/daemon/impl/TestLlapTokenChecker.java | 96 +++++++++ .../hive/ql/exec/tez/TezSessionState.java | 3 +- 21 files changed, 702 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java ---------------------------------------------------------------------- diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index 4ab7b32..820f6be 100644 --- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -12821,6 +12821,21 @@ public final class LlapDaemonProtocolProtos { public interface GetTokenRequestProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { + + // optional string app_id = 1; + /** + * <code>optional string app_id = 1;</code> + */ + boolean hasAppId(); + /** + * <code>optional string app_id = 1;</code> + */ + java.lang.String getAppId(); + /** + * <code>optional string app_id = 1;</code> + */ + com.google.protobuf.ByteString + getAppIdBytes(); } /** * Protobuf type {@code GetTokenRequestProto} @@ -12855,6 +12870,7 @@ public final class LlapDaemonProtocolProtos { com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { initFields(); + int mutable_bitField0_ = 0; com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(); try { @@ -12872,6 +12888,11 @@ public final class LlapDaemonProtocolProtos { } break; } + case 10: { + bitField0_ |= 0x00000001; + appId_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -12911,7 +12932,52 @@ public final class LlapDaemonProtocolProtos { return PARSER; } + private int bitField0_; + // optional string app_id = 1; + public static final int APP_ID_FIELD_NUMBER = 1; + private java.lang.Object appId_; + /** + * <code>optional string app_id = 1;</code> + */ + public boolean hasAppId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <code>optional string app_id = 1;</code> + */ + public java.lang.String getAppId() { + java.lang.Object ref = appId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + appId_ = s; + } + return s; + } + } + /** + * <code>optional string app_id = 1;</code> + */ + public com.google.protobuf.ByteString + getAppIdBytes() { + java.lang.Object ref = appId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + appId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { + appId_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -12925,6 +12991,9 @@ public final class LlapDaemonProtocolProtos { public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getAppIdBytes()); + } getUnknownFields().writeTo(output); } @@ -12934,6 +13003,10 @@ public final class LlapDaemonProtocolProtos { if (size != -1) return size; size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getAppIdBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -12957,6 +13030,11 @@ public final class LlapDaemonProtocolProtos { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto) obj; boolean result = true; + result = result && (hasAppId() == other.hasAppId()); + if (hasAppId()) { + result = result && getAppId() + .equals(other.getAppId()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -12970,6 +13048,10 @@ public final class LlapDaemonProtocolProtos { } int hash = 41; hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasAppId()) { + hash = (37 * hash) + APP_ID_FIELD_NUMBER; + hash = (53 * hash) + getAppId().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -13079,6 +13161,8 @@ public final class LlapDaemonProtocolProtos { public Builder clear() { super.clear(); + appId_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); return this; } @@ -13105,6 +13189,13 @@ public final class LlapDaemonProtocolProtos { public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto buildPartial() { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.appId_ = appId_; + result.bitField0_ = to_bitField0_; onBuilt(); return result; } @@ -13120,6 +13211,11 @@ public final class LlapDaemonProtocolProtos { public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other) { if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance()) return this; + if (other.hasAppId()) { + bitField0_ |= 0x00000001; + appId_ = other.appId_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -13145,6 +13241,81 @@ public final class LlapDaemonProtocolProtos { } return this; } + private int bitField0_; + + // optional string app_id = 1; + private java.lang.Object appId_ = ""; + /** + * <code>optional string app_id = 1;</code> + */ + public boolean hasAppId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <code>optional string app_id = 1;</code> + */ + public java.lang.String getAppId() { + java.lang.Object ref = appId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + appId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * <code>optional string app_id = 1;</code> + */ + public com.google.protobuf.ByteString + getAppIdBytes() { + java.lang.Object ref = appId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + appId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * <code>optional string app_id = 1;</code> + */ + public Builder setAppId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + appId_ = value; + onChanged(); + return this; + } + /** + * <code>optional string app_id = 1;</code> + */ + public Builder clearAppId() { + bitField0_ = (bitField0_ & ~0x00000001); + appId_ = getDefaultInstance().getAppId(); + onChanged(); + return this; + } + /** + * <code>optional string app_id = 1;</code> + */ + public Builder setAppIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + appId_ = value; + onChanged(); + return this; + } // @@protoc_insertion_point(builder_scope:GetTokenRequestProto) } @@ -14414,24 +14585,24 @@ public final class LlapDaemonProtocolProtos { "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" + ".QueryIdentifierProto\022\"\n\032fragment_identi" + "fier_string\030\002 \001(\t\" \n\036TerminateFragmentRe" + - "sponseProto\"\026\n\024GetTokenRequestProto\"&\n\025G", - "etTokenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020S" + - "ourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RU" + - "NNING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEP" + - "TED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316" + - "\002\n\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Su" + - "bmitWorkRequestProto\032\030.SubmitWorkRespons" + - "eProto\022W\n\022sourceStateUpdated\022\037.SourceSta" + - "teUpdatedRequestProto\032 .SourceStateUpdat" + - "edResponseProto\022H\n\rqueryComplete\022\032.Query" + - "CompleteRequestProto\032\033.QueryCompleteResp", - "onseProto\022T\n\021terminateFragment\022\036.Termina" + - "teFragmentRequestProto\032\037.TerminateFragme" + - "ntResponseProto2]\n\026LlapManagementProtoco" + - "l\022C\n\022getDelegationToken\022\025.GetTokenReques" + - "tProto\032\026.GetTokenResponseProtoBH\n&org.ap" + - "ache.hadoop.hive.llap.daemon.rpcB\030LlapDa" + - "emonProtocolProtos\210\001\001\240\001\001" + "sponseProto\"&\n\024GetTokenRequestProto\022\016\n\006a", + "pp_id\030\001 \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005" + + "token\030\001 \001(\014*2\n\020SourceStateProto\022\017\n\013S_SUC" + + "CEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024SubmissionSta" + + "teProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rE" + + "VICTED_OTHER\020\0032\316\002\n\022LlapDaemonProtocol\022?\n" + + "\nsubmitWork\022\027.SubmitWorkRequestProto\032\030.S" + + "ubmitWorkResponseProto\022W\n\022sourceStateUpd" + + "ated\022\037.SourceStateUpdatedRequestProto\032 ." + + "SourceStateUpdatedResponseProto\022H\n\rquery" + + "Complete\022\032.QueryCompleteRequestProto\032\033.Q", + "ueryCompleteResponseProto\022T\n\021terminateFr" + + "agment\022\036.TerminateFragmentRequestProto\032\037" + + ".TerminateFragmentResponseProto2]\n\026LlapM" + + "anagementProtocol\022C\n\022getDelegationToken\022" + + "\025.GetTokenRequestProto\032\026.GetTokenRespons" + + "eProtoBH\n&org.apache.hadoop.hive.llap.da" + + "emon.rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -14533,7 +14704,7 @@ public final class LlapDaemonProtocolProtos { internal_static_GetTokenRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_GetTokenRequestProto_descriptor, - new java.lang.String[] { }); + new java.lang.String[] { "AppId", }); internal_static_GetTokenResponseProto_descriptor = getDescriptor().getMessageTypes().get(16); internal_static_GetTokenResponseProto_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java new file mode 100644 index 0000000..18355e6 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +public class DaemonId { + private final String userName; + private final String clusterName; + private final String appId; + private final String hostName; + private final long startTime; + + public DaemonId(String userName, String clusterName, String hostName, String appId, + long startTime) { + this.userName = userName; + this.clusterName = clusterName; + this.appId = appId; + this.hostName = hostName; + this.startTime = startTime; + // TODO: we could also get an unique number per daemon. + } + + public String getClusterString() { + return userName + "_" + clusterName + "_" + appId; + } + + public String getApplicationId() { + return appId; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java index 23980d0..e28eddd 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; @@ -31,25 +32,32 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier { private static final String KIND = "LLAP_TOKEN"; public static final Text KIND_NAME = new Text(KIND); + private String clusterId; + private String appId; public LlapTokenIdentifier() { super(); } - public LlapTokenIdentifier(Text owner, Text renewer, Text realUser) { + public LlapTokenIdentifier(Text owner, Text renewer, Text realUser, + String clusterId, String appId) { super(owner, renewer, realUser); + this.clusterId = clusterId; + this.appId = appId == null ? "" : appId; } @Override public void write(DataOutput out) throws IOException { super.write(out); - // Nothing right now. + out.writeUTF(clusterId); + out.writeUTF(appId); } @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - // Nothing right now. + clusterId = in.readUTF(); + appId = in.readUTF(); } @Override @@ -57,21 +65,34 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier { return KIND_NAME; } + public String getAppId() { + return appId; + } + + public String getClusterId() { + return clusterId; + } + @Override public int hashCode() { - // Nothing else right now. - return super.hashCode(); + final int prime = 31; + int result = prime * super.hashCode() + ((appId == null) ? 0 : appId.hashCode()); + return prime * result + ((clusterId == null) ? 0 : clusterId.hashCode()); } @Override - public boolean equals(Object other) { - // Nothing else right now. - return super.equals(other); + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof LlapTokenIdentifier) || !super.equals(obj)) return false; + LlapTokenIdentifier other = (LlapTokenIdentifier) obj; + return (appId == null ? other.appId == null : appId.equals(other.appId)) + && (clusterId == null ? other.clusterId == null : clusterId.equals(other.clusterId)); } @Override public String toString() { - return KIND + "; " + super.toString(); + return KIND + "; " + super.toString() + ", cluster " + clusterId + ", app secret hash " + + (StringUtils.isBlank(appId) ? 0 : appId.hashCode()); } @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java index 2e99a28..edf9b18 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java @@ -23,5 +23,5 @@ import java.io.IOException; import org.apache.hadoop.security.token.Token; public interface LlapTokenProvider { - Token<LlapTokenIdentifier> getDelegationToken() throws IOException; + Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto index 944c96c..5cdc02e 100644 --- a/llap-common/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -130,6 +130,7 @@ message TerminateFragmentResponseProto { } message GetTokenRequestProto { + optional string app_id = 1; } message GetTokenResponseProto { http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java index fc29371..c346aed 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java @@ -29,9 +29,12 @@ public interface ContainerRunner { SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException; - SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request); + SourceStateUpdatedResponseProto sourceStateUpdated( + SourceStateUpdatedRequestProto request) throws IOException; - QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request); + QueryCompleteResponseProto queryComplete( + QueryCompleteRequestProto request) throws IOException; - TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request); + TerminateFragmentResponseProto terminateFragment( + TerminateFragmentRequestProto request) throws IOException; } http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 3d45c7a..78b37f7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -92,7 +92,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort, AtomicReference<InetSocketAddress> localAddress, long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics, - AMReporter amReporter, ClassLoader classLoader) { + AMReporter amReporter, ClassLoader classLoader, String clusterId) { super("ContainerRunnerImpl"); this.conf = conf; Preconditions.checkState(numExecutors > 0, @@ -101,7 +101,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu this.localShufflePort = localShufflePort; this.amReporter = amReporter; - this.queryTracker = new QueryTracker(conf, localDirsBase); + this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId); addIfService(queryTracker); String waitQueueSchedulerClassName = HiveConf.getVar( conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); @@ -175,7 +175,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu fragmentSpec.getFragmentIdentifierString()); int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); - QueryIdentifier queryIdentifier = new QueryIdentifier(request.getApplicationIdString(), dagIdentifier); + QueryIdentifier queryIdentifier = new QueryIdentifier( + request.getApplicationIdString(), dagIdentifier); Credentials credentials = new Credentials(); DataInputBuffer dib = new DataInputBuffer(); @@ -193,6 +194,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec(), jobToken); + String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); if (LOG.isDebugEnabled()) { @@ -200,7 +202,6 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } // May need to setup localDir for re-localization, which is usually setup as Environment.PWD. // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) - TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()), new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, @@ -248,24 +249,23 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu @Override public SourceStateUpdatedResponseProto sourceStateUpdated( - SourceStateUpdatedRequestProto request) { + SourceStateUpdatedRequestProto request) throws IOException { LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); - queryTracker.registerSourceStateChange( - new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), - request.getQueryIdentifier().getDagIdentifier()), request.getSrcName(), - request.getState()); + QueryIdentifier queryId = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), + request.getQueryIdentifier().getDagIdentifier()); + queryTracker.registerSourceStateChange(queryId, request.getSrcName(), request.getState()); return SourceStateUpdatedResponseProto.getDefaultInstance(); } @Override - public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) { + public QueryCompleteResponseProto queryComplete( + QueryCompleteRequestProto request) throws IOException { QueryIdentifier queryIdentifier = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), request.getQueryIdentifier().getDagIdentifier()); LOG.info("Processing queryComplete notification for {}", queryIdentifier); - List<QueryFragmentInfo> knownFragments = - queryTracker - .queryComplete(queryIdentifier, request.getDeleteDelay()); + List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete( + queryIdentifier, request.getDeleteDelay(), false); LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier, knownFragments.size()); for (QueryFragmentInfo fragmentInfo : knownFragments) { @@ -277,9 +277,16 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } @Override - public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) { - LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString()); - executorService.killFragment(request.getFragmentIdentifierString()); + public TerminateFragmentResponseProto terminateFragment( + TerminateFragmentRequestProto request) throws IOException { + String fragmentId = request.getFragmentIdentifierString(); + LOG.info("DBG: Received terminateFragment request for {}", fragmentId); + // TODO: ideally, QueryTracker should have fragment-to-query mapping. + QueryIdentifier queryId = executorService.findQueryByFragment(fragmentId); + // checkPermissions returns false if query is not found, throws on failure. + if (queryId != null && queryTracker.checkPermissionsForQuery(queryId)) { + executorService.killFragment(fragmentId); + } return TerminateFragmentResponseProto.getDefaultInstance(); } @@ -355,8 +362,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu @Override public void queryFailed(QueryIdentifier queryIdentifier) { LOG.info("Processing query failed notification for {}", queryIdentifier); - List<QueryFragmentInfo> knownFragments = - queryTracker.queryComplete(queryIdentifier, -1); + List<QueryFragmentInfo> knownFragments; + try { + knownFragments = queryTracker.queryComplete(queryIdentifier, -1, true); + } catch (IOException e) { + throw new RuntimeException(e); // Should never happen here, no permission check. + } LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier, knownFragments.size()); for (QueryFragmentInfo fragmentInfo : knownFragments) { http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 63cb16b..d23a44a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -26,12 +26,14 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; import javax.management.ObjectName; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; @@ -57,11 +59,13 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker; import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.logging.log4j.core.config.Configurator; import org.slf4j.Logger; @@ -97,6 +101,13 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla private final int numExecutors; private final long maxJvmMemory; private final String[] localDirs; + private final DaemonId daemonId; + + private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]"); + private static String generateClusterName(Configuration conf) { + String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + return hostsRe.matcher(hosts.startsWith("@") ? hosts.substring(1) : hosts).replaceAll("_"); + } // TODO Not the best way to share the address private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(), @@ -105,11 +116,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort, - int mngPort, int shufflePort, int webPort) { + int mngPort, int shufflePort, int webPort, String appName) { super("LlapDaemon"); initializeLogging(); - printAsciiArt(); Preconditions.checkArgument(numExecutors > 0); @@ -129,6 +139,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla "LLAP service hosts startswith '@' but hive.zookeeper.quorum is not set." + " hive.zookeeper.quorum must be set."); } + String hostName = MetricsUtils.getHostName(); + try { + daemonId = new DaemonId(UserGroupInformation.getCurrentUser().getUserName(), + generateClusterName(daemonConf), hostName, appName, System.currentTimeMillis()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + this.maxJvmMemory = getTotalHeapSize(); this.llapIoEnabled = ioEnabled; @@ -193,7 +211,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla LlapMetricsSystem.initialize("LlapDaemon"); this.pauseMonitor = new JvmPauseMonitor(daemonConf); pauseMonitor.start(); - String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName(); + String displayName = "LlapDaemonExecutorMetrics-" + hostName; String sessionId = MetricsUtils.getUUID(); daemonConf.set("llap.daemon.metrics.sessionid", sessionId); String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf, @@ -223,11 +241,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf); this.server = new LlapProtocolServerImpl( - numHandlers, this, srvAddress, mngAddress, srvPort, mngPort); + numHandlers, this, srvAddress, mngAddress, srvPort, mngPort, daemonId); this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize, enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryBytes, metrics, - amReporter, executorClassLoader); + amReporter, executorClassLoader, daemonId.getClusterString()); addIfService(containerRunner); // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties. @@ -377,10 +395,18 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration(); String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()); + String appName = null; if (containerIdStr != null && !containerIdStr.isEmpty()) { daemonConf.set(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname, containerIdStr); + appName = ConverterUtils.toContainerId(containerIdStr) + .getApplicationAttemptId().getApplicationId().toString(); } else { daemonConf.unset(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); + // Note, we assume production LLAP always runs under YARN. + LOG.error("Cannot find " + ApplicationConstants.Environment.CONTAINER_ID.toString() + + "; LLAP tokens may grant access to subsequent instances of the cluster with" + + " the same name"); + appName = null; } int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); @@ -400,7 +426,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, - isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort); + isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, + appName); LOG.info("Adding shutdown hook for LlapDaemon"); ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1); @@ -420,24 +447,27 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla } @Override - public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws - IOException { + public SubmitWorkResponseProto submitWork( + SubmitWorkRequestProto request) throws IOException { numSubmissions.incrementAndGet(); return containerRunner.submitWork(request); } @Override - public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) { + public SourceStateUpdatedResponseProto sourceStateUpdated( + SourceStateUpdatedRequestProto request) throws IOException { return containerRunner.sourceStateUpdated(request); } @Override - public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) { + public QueryCompleteResponseProto queryComplete( + QueryCompleteRequestProto request) throws IOException { return containerRunner.queryComplete(request); } @Override - public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) { + public TerminateFragmentResponseProto terminateFragment( + TerminateFragmentRequestProto request) throws IOException { return containerRunner.terminateFragment(request); } http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index dae1a3a..db8bfa6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto; @@ -71,13 +72,11 @@ public class LlapProtocolServerImpl extends AbstractService private final AtomicReference<InetSocketAddress> srvAddress, mngAddress; private SecretManager zkSecretManager; private String restrictedToUser = null; + private final DaemonId daemonId; - public LlapProtocolServerImpl(int numHandlers, - ContainerRunner containerRunner, - AtomicReference<InetSocketAddress> srvAddress, - AtomicReference<InetSocketAddress> mngAddress, - int srvPort, - int mngPort) { + public LlapProtocolServerImpl(int numHandlers, ContainerRunner containerRunner, + AtomicReference<InetSocketAddress> srvAddress, AtomicReference<InetSocketAddress> mngAddress, + int srvPort, int mngPort, DaemonId daemonId) { super("LlapDaemonProtocolServerImpl"); this.numHandlers = numHandlers; this.containerRunner = containerRunner; @@ -85,14 +84,14 @@ public class LlapProtocolServerImpl extends AbstractService this.srvPort = srvPort; this.mngAddress = mngAddress; this.mngPort = mngPort; + this.daemonId = daemonId; LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() + " with port configured to: " + srvPort); } @Override public SubmitWorkResponseProto submitWork(RpcController controller, - SubmitWorkRequestProto request) throws - ServiceException { + SubmitWorkRequestProto request) throws ServiceException { try { return containerRunner.submitWork(request); } catch (IOException e) { @@ -103,20 +102,31 @@ public class LlapProtocolServerImpl extends AbstractService @Override public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, SourceStateUpdatedRequestProto request) throws ServiceException { - return containerRunner.sourceStateUpdated(request); + try { + return containerRunner.sourceStateUpdated(request); + } catch (IOException e) { + throw new ServiceException(e); + } } @Override public QueryCompleteResponseProto queryComplete(RpcController controller, QueryCompleteRequestProto request) throws ServiceException { - return containerRunner.queryComplete(request); + try { + return containerRunner.queryComplete(request); + } catch (IOException e) { + throw new ServiceException(e); + } } @Override public TerminateFragmentResponseProto terminateFragment( - RpcController controller, - TerminateFragmentRequestProto request) throws ServiceException { - return containerRunner.terminateFragment(request); + RpcController controller, TerminateFragmentRequestProto request) throws ServiceException { + try { + return containerRunner.terminateFragment(request); + } catch (IOException e) { + throw new ServiceException(e); + } } @Override @@ -140,7 +150,7 @@ public class LlapProtocolServerImpl extends AbstractService } String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); - zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab); + zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, daemonId); // Start the protocol server after properly authenticating with daemon keytab. UserGroupInformation daemonUgi = null; @@ -275,7 +285,8 @@ public class LlapProtocolServerImpl extends AbstractService realUser = new Text(ugi.getRealUser().getUserName()); } Text renewer = new Text(ugi.getShortUserName()); - LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser); + LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser, + daemonId.getClusterString(), request.hasAppId() ? request.getAppId() : null); // TODO: note that the token is not renewable right now and will last for 2 weeks by default. Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java new file mode 100644 index 0000000..03ee055 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java @@ -0,0 +1,137 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayList; + +import java.util.List; + +import java.io.IOException; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class LlapTokenChecker { + private static final Logger LOG = LoggerFactory.getLogger(LlapTokenChecker.class); + + private static final ImmutablePair<String, String> NO_SECURITY = new ImmutablePair<>(null, null); + public static Pair<String, String> getTokenInfo(String clusterId) throws IOException { + if (!UserGroupInformation.isSecurityEnabled()) return NO_SECURITY; + UserGroupInformation current = UserGroupInformation.getCurrentUser(); + String kerberosName = current.hasKerberosCredentials() ? current.getShortUserName() : null; + List<LlapTokenIdentifier> tokens = getLlapTokens(current, clusterId); + if ((tokens == null || tokens.isEmpty()) && kerberosName == null) { + throw new SecurityException("No tokens or kerberos for " + current); + } + return getTokenInfoInternal(kerberosName, tokens); + } + + private static List<LlapTokenIdentifier> getLlapTokens( + UserGroupInformation ugi, String clusterId) { + List<LlapTokenIdentifier> tokens = null; + for (TokenIdentifier id : ugi.getTokenIdentifiers()) { + if (!LlapTokenIdentifier.KIND_NAME.equals(id.getKind())) continue; + if (LOG.isDebugEnabled()) { + LOG.debug("Token {}", id); + } + LlapTokenIdentifier llapId = (LlapTokenIdentifier)id; + if (!clusterId.equals(llapId.getClusterId())) continue; + if (tokens == null) { + tokens = new ArrayList<>(); + } + tokens.add((LlapTokenIdentifier)id); + } + return tokens; + } + + @VisibleForTesting + static Pair<String, String> getTokenInfoInternal( + String kerberosName, List<LlapTokenIdentifier> tokens) { + assert (tokens != null && !tokens.isEmpty()) || kerberosName != null; + if (tokens == null) { + return new ImmutablePair<String, String>(kerberosName, null); + } + String userName = kerberosName, appId = null; + for (LlapTokenIdentifier llapId : tokens) { + String newUserName = llapId.getRealUser().toString(); + if (userName != null && !userName.equals(newUserName)) { + throw new SecurityException("Ambiguous user name from credentials - " + userName + + " and " + newUserName + " from " + llapId + + ((kerberosName == null) ? ("; has kerberos credentials for " + kerberosName) : "")); + } + userName = newUserName; + String newAppId = llapId.getAppId(); + if (!StringUtils.isEmpty(newAppId)) { + if (!StringUtils.isEmpty(appId) && !appId.equals(newAppId)) { + throw new SecurityException("Ambiguous app ID from credentials - " + appId + + " and " + newAppId + " from " + llapId); + } + appId = newAppId; + } + } + assert userName != null; + return new ImmutablePair<String, String>(userName, appId); + } + + public static void checkPermissions( + String clusterId, String userName, String appId, Object hint) throws IOException { + if (!UserGroupInformation.isSecurityEnabled()) return; + UserGroupInformation current = UserGroupInformation.getCurrentUser(); + String kerberosName = current.hasKerberosCredentials() ? current.getShortUserName() : null; + List<LlapTokenIdentifier> tokens = getLlapTokens(current, clusterId); + checkPermissionsInternal(kerberosName, tokens, userName, appId, hint); + } + + @VisibleForTesting + static void checkPermissionsInternal(String kerberosName, List<LlapTokenIdentifier> tokens, + String userName, String appId, Object hint) { + if (kerberosName != null && StringUtils.isEmpty(appId) && kerberosName.equals(userName)) { + return; + } + if (tokens != null) { + for (LlapTokenIdentifier llapId : tokens) { + String tokenUser = llapId.getRealUser().toString(), tokenAppId = llapId.getAppId(); + if (checkTokenPermissions(userName, appId, tokenUser, tokenAppId)) return; + } + } + throw new SecurityException("Unauthorized to access " + + userName + ", " + appId.hashCode() + " (" + hint + ")"); + } + + public static void checkPermissions( + Pair<String, String> prm, String userName, String appId, Object hint) { + if (userName == null) { + assert StringUtils.isEmpty(appId); + return; + } + if (!checkTokenPermissions(userName, appId, prm.getLeft(), prm.getRight())) { + throw new SecurityException("Unauthorized to access " + + userName + ", " + appId.hashCode() + " (" + hint + ")"); + } + } + + private static boolean checkTokenPermissions( + String userName, String appId, String tokenUser, String tokenAppId) { + return userName.equals(tokenUser) + && (StringUtils.isEmpty(appId) || appId.equals(tokenAppId)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 64c2b58..8daef9e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -55,10 +55,11 @@ public class QueryInfo { private final ConcurrentMap<String, SourceStateProto> sourceStateMap; private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); + private final String tokenUserName, appId; - public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier, - String user, ConcurrentMap<String, SourceStateProto> sourceStateMap, - String[] localDirsBase, FileSystem localFs) { + public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName, + int dagIdentifier, String user, ConcurrentMap<String, SourceStateProto> sourceStateMap, + String[] localDirsBase, FileSystem localFs, String tokenUserName, String tokenAppId) { this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; this.dagName = dagName; @@ -67,6 +68,8 @@ public class QueryInfo { this.user = user; this.localDirsBase = localDirsBase; this.localFs = localFs; + this.tokenUserName = tokenUserName; + this.appId = tokenAppId; } public QueryIdentifier getQueryIdentifier() { @@ -270,4 +273,12 @@ public class QueryInfo { this.lastFinishableState = lastFinishableState; } } + + public String getTokenUserName() { + return tokenUserName; + } + + public String getTokenAppId() { + return appId; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 14657e6..cb3be2b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -18,6 +18,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.token.Token; import org.apache.tez.common.CallableWithNdc; @@ -60,6 +62,7 @@ public class QueryTracker extends AbstractService { private final String[] localDirsBase; private final FileSystem localFs; + private final String clusterId; private final long defaultDeleteDelaySeconds; // TODO At the moment there's no way of knowing whether a query is running or not. @@ -89,9 +92,10 @@ public class QueryTracker extends AbstractService { private final ConcurrentHashMap<QueryIdentifier, String> queryIdentifierToHiveQueryId = new ConcurrentHashMap<>(); - public QueryTracker(Configuration conf, String[] localDirsBase) { + public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId) { super("QueryTracker"); this.localDirsBase = localDirsBase; + this.clusterId = clusterId; try { localFs = FileSystem.getLocal(conf); } catch (IOException e) { @@ -119,35 +123,50 @@ public class QueryTracker extends AbstractService { * @param user * @throws IOException */ - QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName, - int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, - FragmentSpecProto fragmentSpec, Token<JobTokenIdentifier> appToken) throws IOException { + QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, + String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, + String user, FragmentSpecProto fragmentSpec, Token<JobTokenIdentifier> appToken) + throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.readLock().lock(); try { - if (!completedDagMap.contains(queryIdentifier)) { - QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); - if (queryInfo == null) { - queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user, - getSourceCompletionMap(queryIdentifier), localDirsBase, localFs); - queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier); - } - ShuffleHandler.get() - .registerDag(appIdString, dagIdentifier, appToken, - user, queryInfo.getLocalDirs()); - - return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec); - } else { + if (completedDagMap.contains(queryIdentifier)) { // Cleanup the dag lock here, since it may have been created after the query completed dagSpecificLocks.remove(queryIdentifier); throw new RuntimeException( "Dag " + dagName + " already complete. Rejecting fragment [" + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]"); } + // TODO: for now, we get the secure username out of UGI... after signing, we can take it + // out of the request provided that it's signed. + Pair<String, String> tokenInfo = LlapTokenChecker.getTokenInfo(clusterId); + boolean isExistingQueryInfo = true; + QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); + if (queryInfo == null) { + queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user, + getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, + tokenInfo.getLeft(), tokenInfo.getRight()); + QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); + if (old != null) { + queryInfo = old; + } else { + isExistingQueryInfo = false; + } + } + if (isExistingQueryInfo) { + // We already retrieved the incoming info, check without UGI. + LlapTokenChecker.checkPermissions(tokenInfo, queryInfo.getTokenUserName(), + queryInfo.getTokenAppId(), queryInfo.getQueryIdentifier()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier); + } + ShuffleHandler.get() + .registerDag(appIdString, dagIdentifier, appToken, + user, queryInfo.getLocalDirs()); + + return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec); } finally { dagLock.readLock().unlock(); } @@ -174,17 +193,20 @@ public class QueryTracker extends AbstractService { * @param queryIdentifier * @param deleteDelay */ - List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay) { + List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, + boolean isInternal) throws IOException { if (deleteDelay == -1) { deleteDelay = defaultDeleteDelaySeconds; } ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.writeLock().lock(); try { + QueryInfo queryInfo = isInternal + ? queryInfoMap.get(queryIdentifier) : checkPermissionsAndGetQuery(queryIdentifier); rememberCompletedDag(queryIdentifier); LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier, deleteDelay); - QueryInfo queryInfo = queryInfoMap.remove(queryIdentifier); + queryInfoMap.remove(queryIdentifier); if (queryInfo == null) { LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier); return Collections.emptyList(); @@ -229,9 +251,10 @@ public class QueryTracker extends AbstractService { * @param sourceName * @param sourceState */ - void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName, SourceStateProto sourceState) { + void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName, + SourceStateProto sourceState) throws IOException { getSourceCompletionMap(queryIdentifier).put(sourceName, sourceState); - QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); + QueryInfo queryInfo = checkPermissionsAndGetQuery(queryIdentifier); if (queryInfo != null) { queryInfo.sourceStateUpdated(sourceName); } else { @@ -322,4 +345,16 @@ public class QueryTracker extends AbstractService { return null; } } + + private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IOException { + QueryInfo queryInfo = queryInfoMap.get(queryId); + if (queryInfo == null) return null; + LlapTokenChecker.checkPermissions(clusterId, queryInfo.getTokenAppId(), + queryInfo.getTokenUserName(), queryInfo.getQueryIdentifier()); + return queryInfo; + } + + public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException { + return checkPermissionsAndGetQuery(queryId) != null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java index 26c8e55..fd6234a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java @@ -44,4 +44,6 @@ public interface Scheduler<T> { void killFragment(String fragmentId); Set<String> getExecutorsStatus(); + + QueryIdentifier findQueryByFragment(String fragmentId); } http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index f621af2..1933eb1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -420,6 +420,15 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta } @Override + public QueryIdentifier findQueryByFragment(String fragmentId) { + synchronized (lock) { + TaskWrapper taskWrapper = knownTasks.get(fragmentId); + return taskWrapper == null ? null : taskWrapper.getTaskRunnerCallable() + .getFragmentInfo().getQueryInfo().getQueryIdentifier(); + } + } + + @Override public void killFragment(String fragmentId) { synchronized (lock) { TaskWrapper taskWrapper = knownTasks.remove(fragmentId); http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java index 76ba225..f958bc4 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import javax.net.SocketFactory; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; @@ -81,7 +82,7 @@ public class LlapSecurityHelper implements LlapTokenProvider { } @Override - public Token<LlapTokenIdentifier> getDelegationToken() throws IOException { + public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException { if (!UserGroupInformation.isSecurityEnabled()) return null; if (llapUgi == null) { llapUgi = UserGroupInformation.getCurrentUser(); @@ -98,7 +99,7 @@ public class LlapSecurityHelper implements LlapTokenProvider { boolean hasRefreshed = false; while (true) { try { - tokenBytes = getTokenBytes(); + tokenBytes = getTokenBytes(appId); break; } catch (InterruptedException ie) { throw new RuntimeException(ie); @@ -128,7 +129,8 @@ public class LlapSecurityHelper implements LlapTokenProvider { return token; } - private ByteString getTokenBytes() throws InterruptedException, IOException { + private ByteString getTokenBytes( + final String appId) throws InterruptedException, IOException { return llapUgi.doAs(new PrivilegedExceptionAction<ByteString>() { @Override public ByteString run() throws Exception { @@ -138,8 +140,11 @@ public class LlapSecurityHelper implements LlapTokenProvider { clientInstance.getManagementPort(), retryPolicy, socketFactory); } // Client only connects on the first call, so this has to be done in doAs. - GetTokenRequestProto req = GetTokenRequestProto.newBuilder().build(); - return client.getDelegationToken(null, req).getToken(); + GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder(); + if (!StringUtils.isBlank(appId)) { + req.setAppId(appId); + } + return client.getDelegationToken(null, req.build()).getToken(); } }); } http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java index 8c7a539..c54e726 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java @@ -23,6 +23,7 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory; public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> { private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class); + public SecretManager(Configuration conf) { super(conf); checkForZKDTSMBug(conf); @@ -82,16 +84,8 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent return id; } - private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]"); - private static String deriveZkPath(Configuration conf) throws IOException { - String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); - String clusterName = hosts.startsWith("@") ? hosts.substring(1) : hosts; - String userName = UserGroupInformation.getCurrentUser().getShortUserName(); - return hostsRe.matcher(userName + "_" + clusterName).replaceAll("_") ; - } - public static SecretManager createSecretManager( - final Configuration conf, String llapPrincipal, String llapKeytab) { + final Configuration conf, String llapPrincipal, String llapKeytab, DaemonId daemonId) { // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways. UserGroupInformation zkUgi = null; String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal); @@ -110,12 +104,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime); zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal); zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab); - String zkPath; - try { - zkPath = deriveZkPath(conf); - } catch (IOException e) { - throw new RuntimeException(e); - } + String zkPath = daemonId.getClusterString(); LOG.info("Using {} as ZK secret manager path", zkPath); zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath); setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl"); http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index 610f266..dde5be0 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -193,7 +193,7 @@ public class MiniLlapCluster extends AbstractService { LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); for (int i = 0 ;i < numInstances ; i++) { llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort); + ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed); llapDaemons[i].init(new Configuration(conf)); } LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 24f4442..c6ba14e 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -78,7 +78,7 @@ public class TaskExecutorTestHelpers { QueryInfo queryInfo = new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_name", 1, "fakeUser", new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(), - new String[0], null); + new String[0], null, null, null); return queryInfo; } http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java index a65bf5c..fd37a06 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java @@ -46,7 +46,7 @@ public class TestLlapDaemonProtocolServerImpl { LlapProtocolServerImpl server = new LlapProtocolServerImpl(numHandlers, containerRunnerMock, new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(), - rpcPort, rpcPort + 1); + rpcPort, rpcPort + 1, null); when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn( SubmitWorkResponseProto .newBuilder() http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java new file mode 100644 index 0000000..aaaa762 --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import static org.junit.Assert.*; + +import org.apache.hadoop.io.Text; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +public class TestLlapTokenChecker { + + @Test + public void testGetToken() { + check(LlapTokenChecker.getTokenInfoInternal("u", null), "u", null); + check(LlapTokenChecker.getTokenInfoInternal(null, createTokens("u", null)), "u", null); + check(LlapTokenChecker.getTokenInfoInternal(null, createTokens("u", "a")), "u", "a"); + check(LlapTokenChecker.getTokenInfoInternal("u", createTokens("u", "a")), "u", "a"); + check(LlapTokenChecker.getTokenInfoInternal("u", createTokens("u", "a", "u", null)), + "u", "a"); + // Note - some of these scenarios could be handled, but they are not supported right now. + // The reason is that we bind a query to app/user using the signed token information, and + // we don't want to bother figuring out which one to use in case of ambiguity w/o a use case. + checkGetThrows("u", createTokens("u2", "a")); // Ambiguous user. + checkGetThrows("u2", createTokens("u2", "a", "u3", "a")); // Ambiguous user. + checkGetThrows(null, createTokens("u2", "a", "u3", "a")); // Ambiguous user. + checkGetThrows(null, createTokens("u2", "a", "u2", "a1")); // Ambiguous app. + } + + @Test + public void testCheckPermissions() { + LlapTokenChecker.checkPermissionsInternal("u", null, "u", null, null); + LlapTokenChecker.checkPermissionsInternal(null, createTokens("u", null) , "u", null, null); + LlapTokenChecker.checkPermissionsInternal("u", createTokens("u", "a") , "u", "a", null); + // No access. + checkPrmThrows("u2", null, "u", "a"); + checkPrmThrows("u", null, "u", "a"); // Note - Kerberos user w/o appId doesn't have access. + checkPrmThrows(null, createTokens("u2", "a"), "u", "a"); + checkPrmThrows(null, createTokens("u", "a2"), "u", "a"); + checkPrmThrows(null, createTokens("u", null), "u", "a"); + } + + private List<LlapTokenIdentifier> createTokens(String... args) { + List<LlapTokenIdentifier> tokens = new ArrayList<>(); + for (int i = 0; i < args.length; i += 2) { + tokens.add(new LlapTokenIdentifier(null, null, new Text(args[i]), "c", args[i + 1])); + } + return tokens; + } + + private void checkGetThrows(String kerberosName, List<LlapTokenIdentifier> tokens) { + try { + LlapTokenChecker.getTokenInfoInternal(kerberosName, tokens); + fail("Didn't throw"); + } catch (SecurityException ex) { + // Expected. + } + } + + private void checkPrmThrows( + String kerberosName, List<LlapTokenIdentifier> tokens, String userName, String appId) { + try { + LlapTokenChecker.checkPermissionsInternal(kerberosName, tokens, userName, appId, null); + fail("Didn't throw"); + } catch (SecurityException ex) { + // Expected. + } + } + + private void check(Pair<String, String> p, String user, String appId) { + assertEquals(user, p.getLeft()); + assertEquals(appId, p.getRight()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 3ea5ef9..fd6465a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -275,7 +275,8 @@ public class TezSessionState { if (llapMode) { if (UserGroupInformation.isSecurityEnabled()) { LlapTokenProvider tp = LlapProxy.getOrInitTokenProvider(conf); - Token<LlapTokenIdentifier> token = tp.getDelegationToken(); + // For Tez, we don't use appId to distinguish the tokens; security scope is the user. + Token<LlapTokenIdentifier> token = tp.getDelegationToken(null); if (LOG.isInfoEnabled()) { LOG.info("Obtained a LLAP token: " + token); }
