HIVE-17386 : support LLAP workload management in HS2 (low level only) (Sergey Shelukhin, reviewed by Zhiyuan Yang, Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ac24537f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ac24537f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ac24537f Branch: refs/heads/hive-14535 Commit: ac24537f294e63682e250c25a0b96ad2d64066dd Parents: f3cb704 Author: sergey <[email protected]> Authored: Tue Sep 26 15:44:58 2017 -0700 Committer: sergey <[email protected]> Committed: Tue Sep 26 15:44:58 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 +- .../org/apache/hadoop/hive/ql/QTestUtil.java | 2 +- .../hive/llap/tez/LlapProtocolClientProxy.java | 37 +- .../hive/registry/impl/TezAmInstance.java | 9 +- .../hadoop/hive/llap/TestAsyncPbRpcProxy.java | 8 +- .../plugin/rpc/LlapPluginProtocolProtos.java | 260 ++----------- .../hadoop/hive/llap/AsyncPbRpcProxy.java | 83 +++-- .../llap/impl/LlapPluginProtocolClientImpl.java | 5 +- .../hadoop/hive/llap/impl/ProtobufProxy.java | 31 +- .../src/protobuf/LlapPluginProtocol.proto | 5 +- .../tezplugins/LlapTaskSchedulerService.java | 4 +- .../endpoint/LlapPluginServerImpl.java | 23 +- .../hadoop/hive/ql/exec/mr/ExecDriver.java | 7 +- .../hadoop/hive/ql/exec/tez/AmPluginNode.java | 43 +++ .../ql/exec/tez/GuaranteedTasksAllocator.java | 171 +++++++++ .../ql/exec/tez/LlapPluginEndpointClient.java | 30 ++ .../exec/tez/LlapPluginEndpointClientImpl.java | 123 ++++++ .../ql/exec/tez/QueryAllocationManager.java | 32 ++ .../hadoop/hive/ql/exec/tez/TezSessionPool.java | 85 +++-- .../hive/ql/exec/tez/TezSessionPoolManager.java | 59 ++- .../hive/ql/exec/tez/TezSessionPoolSession.java | 55 ++- .../hive/ql/exec/tez/TezSessionState.java | 29 +- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 23 +- .../hadoop/hive/ql/exec/tez/WmTezSession.java | 121 ++++++ .../hive/ql/exec/tez/WorkloadManager.java | 372 +++++++++++++++++++ .../ql/exec/tez/monitoring/TezJobMonitor.java | 2 + .../physical/LlapClusterStateForCompile.java | 105 +++--- .../hive/ql/exec/tez/SampleTezSessionState.java | 27 +- .../exec/tez/TestGuaranteedTaskAllocator.java | 151 ++++++++ .../hive/ql/exec/tez/TestTezSessionPool.java | 38 +- .../hadoop/hive/ql/exec/tez/TestTezTask.java | 16 +- .../hive/ql/exec/tez/TestWorkloadManager.java | 173 +++++++++ .../apache/hive/service/server/HiveServer2.java | 22 +- 33 files changed, 1692 insertions(+), 470 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e2bd38b..147a5d1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2397,6 +2397,13 @@ public class HiveConf extends Configuration { "The maximum number of past queries to show in HiverSever2 WebUI."), // Tez session settings + HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "", + "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" + + "workload management is enabled and used for these sessions."), + HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s", + new TimeValidator(TimeUnit.SECONDS), + "The timeout for AM registry registration, after which (on attempting to use the\n" + + "session), we kill it and try to get another one."), HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "", "A list of comma separated values corresponding to YARN queues of the same name.\n" + "When HiveServer2 is launched in Tez mode, this configuration needs to be set\n" + @@ -3175,8 +3182,6 @@ public class HiveConf extends Configuration { LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5, "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"), - LLAP_PLUGIN_RPC_PORT("hive.llap.plugin.rpc.port", 15005, - "RPC port for AM LLAP daemon plugin endpoint."), LLAP_PLUGIN_RPC_NUM_HANDLERS("hive.llap.plugin.rpc.num.handlers", 1, "Number of RPC handlers for AM LLAP plugin endpoint."), @@ -3274,6 +3279,8 @@ public class HiveConf extends Configuration { LLAP_DAEMON_COMMUNICATOR_NUM_THREADS("hive.llap.daemon.communicator.num.threads", 10, "Number of threads to use in LLAP task communicator in Tez AM.", "llap.daemon.communicator.num.threads"), + LLAP_PLUGIN_CLIENT_NUM_THREADS("hive.llap.plugin.client.num.threads", 10, + "Number of threads to use in LLAP task plugin client."), LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS("hive.llap.daemon.download.permanent.fns", false, "Whether LLAP daemon should localize the resources for permanent UDFs."), LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap", http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 05f8a5f..4477954 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -655,7 +655,7 @@ public class QTestUtil { } if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { - SessionState.get().getTezSession().close(false); + SessionState.get().getTezSession().destroy(); } setup.tearDown(); if (sparkSession != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java index b650184..211696a 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java @@ -46,11 +46,13 @@ public class LlapProtocolClientProxy public LlapProtocolClientProxy( int numThreads, Configuration conf, Token<LlapTokenIdentifier> llapToken) { + // We could pass in the number of nodes that we expect instead of -1. + // Also, a single concurrent request per node is currently hardcoded. super(LlapProtocolClientProxy.class.getSimpleName(), numThreads, conf, llapToken, HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS), HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, - TimeUnit.MILLISECONDS), -1); + TimeUnit.MILLISECONDS), -1, 1); } public void sendSubmitWork(SubmitWorkRequestProto request, String host, int port, @@ -71,13 +73,6 @@ public class LlapProtocolClientProxy queueRequest(new SendQueryCompleteCallable(nodeId, request, callback)); } - - public void sendUpdateFragment(final UpdateFragmentRequestProto request, final String host, - final int port, final ExecuteRequestCallback<UpdateFragmentResponseProto> callback) { - LlapNodeId nodeId = LlapNodeId.getInstance(host, port); - queueRequest(new SendUpdateFragmentCallable(nodeId, request, callback)); - } - public void sendTerminateFragment(final TerminateFragmentRequestProto request, final String host, final int port, final ExecuteRequestCallback<TerminateFragmentResponseProto> callback) { @@ -85,7 +80,13 @@ public class LlapProtocolClientProxy queueRequest(new SendTerminateFragmentCallable(nodeId, request, callback)); } - private class SubmitWorkCallable extends CallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> { + public void sendUpdateFragment(final UpdateFragmentRequestProto request, final String host, + final int port, final ExecuteRequestCallback<UpdateFragmentResponseProto> callback) { + LlapNodeId nodeId = LlapNodeId.getInstance(host, port); + queueRequest(new SendUpdateFragmentCallable(nodeId, request, callback)); + } + + private class SubmitWorkCallable extends NodeCallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> { protected SubmitWorkCallable(LlapNodeId nodeId, SubmitWorkRequestProto submitWorkRequestProto, @@ -95,12 +96,12 @@ public class LlapProtocolClientProxy @Override public SubmitWorkResponseProto call() throws Exception { - return getProxy(nodeId).submitWork(null, request); + return getProxy(nodeId, null).submitWork(null, request); } } private class SendSourceStateUpdateCallable - extends CallableRequest<SourceStateUpdatedRequestProto, SourceStateUpdatedResponseProto> { + extends NodeCallableRequest<SourceStateUpdatedRequestProto, SourceStateUpdatedResponseProto> { public SendSourceStateUpdateCallable(LlapNodeId nodeId, SourceStateUpdatedRequestProto request, @@ -110,12 +111,12 @@ public class LlapProtocolClientProxy @Override public SourceStateUpdatedResponseProto call() throws Exception { - return getProxy(nodeId).sourceStateUpdated(null, request); + return getProxy(nodeId, null).sourceStateUpdated(null, request); } } private class SendQueryCompleteCallable - extends CallableRequest<QueryCompleteRequestProto, QueryCompleteResponseProto> { + extends NodeCallableRequest<QueryCompleteRequestProto, QueryCompleteResponseProto> { protected SendQueryCompleteCallable(LlapNodeId nodeId, QueryCompleteRequestProto queryCompleteRequestProto, @@ -125,12 +126,12 @@ public class LlapProtocolClientProxy @Override public QueryCompleteResponseProto call() throws Exception { - return getProxy(nodeId).queryComplete(null, request); + return getProxy(nodeId, null).queryComplete(null, request); } } private class SendTerminateFragmentCallable - extends CallableRequest<TerminateFragmentRequestProto, TerminateFragmentResponseProto> { + extends NodeCallableRequest<TerminateFragmentRequestProto, TerminateFragmentResponseProto> { protected SendTerminateFragmentCallable(LlapNodeId nodeId, TerminateFragmentRequestProto terminateFragmentRequestProto, @@ -140,12 +141,12 @@ public class LlapProtocolClientProxy @Override public TerminateFragmentResponseProto call() throws Exception { - return getProxy(nodeId).terminateFragment(null, request); + return getProxy(nodeId, null).terminateFragment(null, request); } } private class SendUpdateFragmentCallable - extends CallableRequest<UpdateFragmentRequestProto, UpdateFragmentResponseProto> { + extends NodeCallableRequest<UpdateFragmentRequestProto, UpdateFragmentResponseProto> { protected SendUpdateFragmentCallable(LlapNodeId nodeId, UpdateFragmentRequestProto terminateFragmentRequestProto, @@ -155,7 +156,7 @@ public class LlapProtocolClientProxy @Override public UpdateFragmentResponseProto call() throws Exception { - return getProxy(nodeId).updateFragment(null, request); + return getProxy(nodeId, null).updateFragment(null, request); } } http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java index a71904c..0724cf5 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -54,7 +54,7 @@ public class TezAmInstance extends ServiceInstanceBase { return getProperties().get(TezAmRegistryImpl.AM_SESSION_ID); } - public String getJobIdForPluginToken() { + public String getPluginTokenJobId() { return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID); } @@ -73,4 +73,11 @@ public class TezAmInstance extends ServiceInstanceBase { this.token = token; return token; } + + @Override + public String toString() { + return "TezAmInstance [" + getSessionId() + ", host=" + host + ", rpcPort=" + rpcPort + + ", pluginPort=" + pluginPort + ", token=" + token + "]"; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java ---------------------------------------------------------------------- diff --git a/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java b/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java index 1c4f0e7..b152f1c 100644 --- a/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java +++ b/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java @@ -33,7 +33,7 @@ import org.junit.Test; public class TestAsyncPbRpcProxy { @Test (timeout = 5000) - public void testMultipleNodes() { + public void testMultipleNodes() throws Exception { RequestManagerForTest requestManager = new RequestManagerForTest(1); LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025); @@ -62,7 +62,7 @@ public class TestAsyncPbRpcProxy { } @Test(timeout = 5000) - public void testSingleInvocationPerNode() { + public void testSingleInvocationPerNode() throws Exception { RequestManagerForTest requestManager = new RequestManagerForTest(1); LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025); @@ -109,7 +109,7 @@ public class TestAsyncPbRpcProxy { private Map<LlapNodeId, MutableInt> numInvocationsPerNode = new HashMap<>(); public RequestManagerForTest(int numThreads) { - super(numThreads); + super(numThreads, 1); } protected void submitToExecutor(LlapProtocolClientProxy.CallableRequest request, LlapNodeId nodeId) { @@ -129,7 +129,7 @@ public class TestAsyncPbRpcProxy { } - static class CallableRequestForTest extends LlapProtocolClientProxy.CallableRequest<Message, Message> { + static class CallableRequestForTest extends LlapProtocolClientProxy.NodeCallableRequest<Message, Message> { protected CallableRequestForTest(LlapNodeId nodeId, Message message, LlapProtocolClientProxy.ExecuteRequestCallback<Message> callback) { http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java ---------------------------------------------------------------------- diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java index 61eb21a..dbcd895 100644 --- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java +++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java @@ -11,27 +11,13 @@ public final class LlapPluginProtocolProtos { public interface UpdateQueryRequestProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional .QueryIdentifierProto query_id = 1; + // optional int32 guaranteed_task_count = 1; /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - boolean hasQueryId(); - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryId(); - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdOrBuilder(); - - // optional int32 guaranteed_task_count = 2; - /** - * <code>optional int32 guaranteed_task_count = 2;</code> + * <code>optional int32 guaranteed_task_count = 1;</code> */ boolean hasGuaranteedTaskCount(); /** - * <code>optional int32 guaranteed_task_count = 2;</code> + * <code>optional int32 guaranteed_task_count = 1;</code> */ int getGuaranteedTaskCount(); } @@ -86,21 +72,8 @@ public final class LlapPluginProtocolProtos { } break; } - case 10: { - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder subBuilder = null; - if (((bitField0_ & 0x00000001) == 0x00000001)) { - subBuilder = queryId_.toBuilder(); - } - queryId_ = input.readMessage(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.PARSER, extensionRegistry); - if (subBuilder != null) { - subBuilder.mergeFrom(queryId_); - queryId_ = subBuilder.buildPartial(); - } + case 8: { bitField0_ |= 0x00000001; - break; - } - case 16: { - bitField0_ |= 0x00000002; guaranteedTaskCount_ = input.readInt32(); break; } @@ -144,46 +117,23 @@ public final class LlapPluginProtocolProtos { } private int bitField0_; - // optional .QueryIdentifierProto query_id = 1; - public static final int QUERY_ID_FIELD_NUMBER = 1; - private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto queryId_; - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public boolean hasQueryId() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryId() { - return queryId_; - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdOrBuilder() { - return queryId_; - } - - // optional int32 guaranteed_task_count = 2; - public static final int GUARANTEED_TASK_COUNT_FIELD_NUMBER = 2; + // optional int32 guaranteed_task_count = 1; + public static final int GUARANTEED_TASK_COUNT_FIELD_NUMBER = 1; private int guaranteedTaskCount_; /** - * <code>optional int32 guaranteed_task_count = 2;</code> + * <code>optional int32 guaranteed_task_count = 1;</code> */ public boolean hasGuaranteedTaskCount() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * <code>optional int32 guaranteed_task_count = 2;</code> + * <code>optional int32 guaranteed_task_count = 1;</code> */ public int getGuaranteedTaskCount() { return guaranteedTaskCount_; } private void initFields() { - queryId_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); guaranteedTaskCount_ = 0; } private byte memoizedIsInitialized = -1; @@ -199,10 +149,7 @@ public final class LlapPluginProtocolProtos { throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeMessage(1, queryId_); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeInt32(2, guaranteedTaskCount_); + output.writeInt32(1, guaranteedTaskCount_); } getUnknownFields().writeTo(output); } @@ -215,11 +162,7 @@ public final class LlapPluginProtocolProtos { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(1, queryId_); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - size += com.google.protobuf.CodedOutputStream - .computeInt32Size(2, guaranteedTaskCount_); + .computeInt32Size(1, guaranteedTaskCount_); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -244,11 +187,6 @@ public final class LlapPluginProtocolProtos { org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto other = (org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto) obj; boolean result = true; - result = result && (hasQueryId() == other.hasQueryId()); - if (hasQueryId()) { - result = result && getQueryId() - .equals(other.getQueryId()); - } result = result && (hasGuaranteedTaskCount() == other.hasGuaranteedTaskCount()); if (hasGuaranteedTaskCount()) { result = result && (getGuaranteedTaskCount() @@ -267,10 +205,6 @@ public final class LlapPluginProtocolProtos { } int hash = 41; hash = (19 * hash) + getDescriptorForType().hashCode(); - if (hasQueryId()) { - hash = (37 * hash) + QUERY_ID_FIELD_NUMBER; - hash = (53 * hash) + getQueryId().hashCode(); - } if (hasGuaranteedTaskCount()) { hash = (37 * hash) + GUARANTEED_TASK_COUNT_FIELD_NUMBER; hash = (53 * hash) + getGuaranteedTaskCount(); @@ -376,7 +310,6 @@ public final class LlapPluginProtocolProtos { } private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { - getQueryIdFieldBuilder(); } } private static Builder create() { @@ -385,14 +318,8 @@ public final class LlapPluginProtocolProtos { public Builder clear() { super.clear(); - if (queryIdBuilder_ == null) { - queryId_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); - } else { - queryIdBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000001); guaranteedTaskCount_ = 0; - bitField0_ = (bitField0_ & ~0x00000002); + bitField0_ = (bitField0_ & ~0x00000001); return this; } @@ -424,14 +351,6 @@ public final class LlapPluginProtocolProtos { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - if (queryIdBuilder_ == null) { - result.queryId_ = queryId_; - } else { - result.queryId_ = queryIdBuilder_.build(); - } - if (((from_bitField0_ & 0x00000002) == 0x00000002)) { - to_bitField0_ |= 0x00000002; - } result.guaranteedTaskCount_ = guaranteedTaskCount_; result.bitField0_ = to_bitField0_; onBuilt(); @@ -449,9 +368,6 @@ public final class LlapPluginProtocolProtos { public Builder mergeFrom(org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto other) { if (other == org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto.getDefaultInstance()) return this; - if (other.hasQueryId()) { - mergeQueryId(other.getQueryId()); - } if (other.hasGuaranteedTaskCount()) { setGuaranteedTaskCount(other.getGuaranteedTaskCount()); } @@ -482,151 +398,34 @@ public final class LlapPluginProtocolProtos { } private int bitField0_; - // optional .QueryIdentifierProto query_id = 1; - private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto queryId_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); - private com.google.protobuf.SingleFieldBuilder< - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder> queryIdBuilder_; - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public boolean hasQueryId() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto getQueryId() { - if (queryIdBuilder_ == null) { - return queryId_; - } else { - return queryIdBuilder_.getMessage(); - } - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public Builder setQueryId(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto value) { - if (queryIdBuilder_ == null) { - if (value == null) { - throw new NullPointerException(); - } - queryId_ = value; - onChanged(); - } else { - queryIdBuilder_.setMessage(value); - } - bitField0_ |= 0x00000001; - return this; - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public Builder setQueryId( - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder builderForValue) { - if (queryIdBuilder_ == null) { - queryId_ = builderForValue.build(); - onChanged(); - } else { - queryIdBuilder_.setMessage(builderForValue.build()); - } - bitField0_ |= 0x00000001; - return this; - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public Builder mergeQueryId(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto value) { - if (queryIdBuilder_ == null) { - if (((bitField0_ & 0x00000001) == 0x00000001) && - queryId_ != org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance()) { - queryId_ = - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder(queryId_).mergeFrom(value).buildPartial(); - } else { - queryId_ = value; - } - onChanged(); - } else { - queryIdBuilder_.mergeFrom(value); - } - bitField0_ |= 0x00000001; - return this; - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public Builder clearQueryId() { - if (queryIdBuilder_ == null) { - queryId_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance(); - onChanged(); - } else { - queryIdBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000001); - return this; - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder getQueryIdBuilder() { - bitField0_ |= 0x00000001; - onChanged(); - return getQueryIdFieldBuilder().getBuilder(); - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder getQueryIdOrBuilder() { - if (queryIdBuilder_ != null) { - return queryIdBuilder_.getMessageOrBuilder(); - } else { - return queryId_; - } - } - /** - * <code>optional .QueryIdentifierProto query_id = 1;</code> - */ - private com.google.protobuf.SingleFieldBuilder< - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder> - getQueryIdFieldBuilder() { - if (queryIdBuilder_ == null) { - queryIdBuilder_ = new com.google.protobuf.SingleFieldBuilder< - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder>( - queryId_, - getParentForChildren(), - isClean()); - queryId_ = null; - } - return queryIdBuilder_; - } - - // optional int32 guaranteed_task_count = 2; + // optional int32 guaranteed_task_count = 1; private int guaranteedTaskCount_ ; /** - * <code>optional int32 guaranteed_task_count = 2;</code> + * <code>optional int32 guaranteed_task_count = 1;</code> */ public boolean hasGuaranteedTaskCount() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * <code>optional int32 guaranteed_task_count = 2;</code> + * <code>optional int32 guaranteed_task_count = 1;</code> */ public int getGuaranteedTaskCount() { return guaranteedTaskCount_; } /** - * <code>optional int32 guaranteed_task_count = 2;</code> + * <code>optional int32 guaranteed_task_count = 1;</code> */ public Builder setGuaranteedTaskCount(int value) { - bitField0_ |= 0x00000002; + bitField0_ |= 0x00000001; guaranteedTaskCount_ = value; onChanged(); return this; } /** - * <code>optional int32 guaranteed_task_count = 2;</code> + * <code>optional int32 guaranteed_task_count = 1;</code> */ public Builder clearGuaranteedTaskCount() { - bitField0_ = (bitField0_ & ~0x00000002); + bitField0_ = (bitField0_ & ~0x00000001); guaranteedTaskCount_ = 0; onChanged(); return this; @@ -1232,15 +1031,13 @@ public final class LlapPluginProtocolProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n\030LlapPluginProtocol.proto\032\030LlapDaemonPr" + - "otocol.proto\"a\n\027UpdateQueryRequestProto\022" + - "\'\n\010query_id\030\001 \001(\0132\025.QueryIdentifierProto" + - "\022\035\n\025guaranteed_task_count\030\002 \001(\005\"\032\n\030Updat" + - "eQueryResponseProto2X\n\022LlapPluginProtoco" + - "l\022B\n\013updateQuery\022\030.UpdateQueryRequestPro" + - "to\032\031.UpdateQueryResponseProtoBH\n&org.apa" + - "che.hadoop.hive.llap.plugin.rpcB\030LlapPlu" + - "ginProtocolProtos\210\001\001\240\001\001" + "\n\030LlapPluginProtocol.proto\"8\n\027UpdateQuer" + + "yRequestProto\022\035\n\025guaranteed_task_count\030\001" + + " \001(\005\"\032\n\030UpdateQueryResponseProto2X\n\022Llap" + + "PluginProtocol\022B\n\013updateQuery\022\030.UpdateQu" + + "eryRequestProto\032\031.UpdateQueryResponsePro" + + "toBH\n&org.apache.hadoop.hive.llap.plugin" + + ".rpcB\030LlapPluginProtocolProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -1252,7 +1049,7 @@ public final class LlapPluginProtocolProtos { internal_static_UpdateQueryRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_UpdateQueryRequestProto_descriptor, - new java.lang.String[] { "QueryId", "GuaranteedTaskCount", }); + new java.lang.String[] { "GuaranteedTaskCount", }); internal_static_UpdateQueryResponseProto_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_UpdateQueryResponseProto_fieldAccessorTable = new @@ -1265,7 +1062,6 @@ public final class LlapPluginProtocolProtos { com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, new com.google.protobuf.Descriptors.FileDescriptor[] { - org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor(), }, assigner); } http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java index 7726794..0415169 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java @@ -81,7 +81,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent private final Condition queueCondition = lock.newCondition(); private final AtomicBoolean shouldRun = new AtomicBoolean(false); - private final int maxConcurrentRequestsPerNode = 1; + private final int maxConcurrentRequestsPerNode; private final ListeningExecutorService executor; @@ -97,9 +97,10 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent // Tracks completed requests pre node private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>(); - public RequestManager(int numThreads) { + public RequestManager(int numThreads, int maxPerNode) { ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build()); + maxConcurrentRequestsPerNode = maxPerNode; executor = MoreExecutors.listeningDecorator(localExecutor); } @@ -123,12 +124,8 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent break; } } catch (InterruptedException e) { - if (isShutdown.get()) { - break; - } else { - LOG.warn("RunLoop interrupted without being shutdown first"); - throw new RuntimeException(e); - } + handleInterrupt(e); + break; } finally { lock.unlock(); } @@ -137,6 +134,12 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent return null; } + private void handleInterrupt(InterruptedException e) { + if (isShutdown.get()) return; + LOG.warn("RunLoop interrupted without being shutdown first"); + throw new RuntimeException(e); + } + /* Add a new request to be executed */ public void queueRequest(CallableRequest<?, ?> request) { synchronized (newRequestList) { @@ -171,7 +174,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent } @VisibleForTesting - boolean process() { + boolean process() throws InterruptedException { if (isShutdown.get()) { return true; } @@ -182,6 +185,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent // otherwise an add/completion after draining the lists but before setting it to false, // will not trigger a run. May cause one unnecessary run if an add comes in before drain. // drain list. add request (setTrue). setFalse needs to be avoided. + // TODO: why CAS if the result is not checked? shouldRun.compareAndSet(true, false); // Drain any calls which may have come in during the last execution of the loop. drainNewRequestList(); // Locks newRequestList @@ -192,7 +196,16 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent while (iterator.hasNext()) { CallableRequest<?, ?> request = iterator.next(); iterator.remove(); - LlapNodeId nodeId = request.getNodeId(); + LlapNodeId nodeId; + try { + nodeId = request.getNodeId(); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + request.getCallback().indicateError(e); + continue; + } + if (canRunForNode(nodeId, currentLoopDisabledNodes)) { submitToExecutor(request, nodeId); } else { @@ -321,19 +334,15 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent @VisibleForTesting protected static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message> implements Callable<RESPONSE> { - protected final LlapNodeId nodeId; protected final ExecuteRequestCallback<RESPONSE> callback; protected final REQUEST request; - protected CallableRequest(LlapNodeId nodeId, REQUEST request, ExecuteRequestCallback<RESPONSE> callback) { - this.nodeId = nodeId; + protected CallableRequest(REQUEST request, ExecuteRequestCallback<RESPONSE> callback) { this.request = request; this.callback = callback; } - public LlapNodeId getNodeId() { - return nodeId; - } + public abstract LlapNodeId getNodeId() throws Exception; public ExecuteRequestCallback<RESPONSE> getCallback() { return callback; @@ -342,13 +351,31 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent public abstract RESPONSE call() throws Exception; } + + @VisibleForTesting + protected static abstract class NodeCallableRequest< + REQUEST extends Message, RESPONSE extends Message> extends CallableRequest<REQUEST, RESPONSE> { + protected final LlapNodeId nodeId; + + protected NodeCallableRequest(LlapNodeId nodeId, REQUEST request, + ExecuteRequestCallback<RESPONSE> callback) { + super(request, callback); + this.nodeId = nodeId; + } + + @Override + public LlapNodeId getNodeId() { + return nodeId; + } + } + public interface ExecuteRequestCallback<T extends Message> { void setResponse(T response); void indicateError(Throwable t); } - public AsyncPbRpcProxy(String name, int numThreads, Configuration conf, - Token<TokenType> token, long connectionTimeoutMs, long retrySleepMs, int expectedNodes) { + public AsyncPbRpcProxy(String name, int numThreads, Configuration conf, Token<TokenType> token, + long connectionTimeoutMs, long retrySleepMs, int expectedNodes, int maxPerNode) { super(name); // Note: we may make size/etc. configurable later. CacheBuilder<String, ProtocolType> cb = CacheBuilder.newBuilder().expireAfterAccess( @@ -371,7 +398,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( connectionTimeoutMs, retrySleepMs, TimeUnit.MILLISECONDS); - this.requestManager = new RequestManager(numThreads); + this.requestManager = new RequestManager(numThreads, maxPerNode); ExecutorService localExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build()); this.requestManagerExecutor = MoreExecutors.listeningDecorator(localExecutor); @@ -382,13 +409,19 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent "retrySleep(millis)=" + retrySleepMs); } - protected final ProtocolType getProxy(final LlapNodeId nodeId) { + /** + * @param nodeId Hostname + post. + * @param nodeToken A custom node token. If not specified, the default token is used. + * @return the protocol client implementation for the node. + */ + protected final ProtocolType getProxy( + final LlapNodeId nodeId, final Token<TokenType> nodeToken) { String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort()); try { return hostProxies.get(hostId, new Callable<ProtocolType>() { @Override public ProtocolType call() throws Exception { - return createProxy(nodeId); + return createProxy(nodeId, nodeToken); } }); } catch (ExecutionException e) { @@ -396,8 +429,8 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent } } - private ProtocolType createProxy(final LlapNodeId nodeId) { - if (token == null) { + private ProtocolType createProxy(final LlapNodeId nodeId, Token<TokenType> nodeToken) { + if (nodeToken == null && token == null) { if (LOG.isDebugEnabled()) { LOG.debug("Creating a client without a token for " + nodeId); } @@ -406,7 +439,9 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent } final UserGroupInformation ugi = UserGroupInformation.createRemoteUser(tokenUser); // Clone the token as we'd need to set the service to the one we are talking to. - Token<TokenType> nodeToken = new Token<TokenType>(token); + if (nodeToken == null) { + nodeToken = new Token<TokenType>(token); + } SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost( nodeId.getHostname(), nodeId.getPort())); ugi.addToken(nodeToken); http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java index 19e81e6..3883052 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java @@ -21,6 +21,7 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto; import org.apache.hadoop.hive.llap.protocol.LlapPluginProtocolPB; @@ -29,9 +30,9 @@ public class LlapPluginProtocolClientImpl implements LlapPluginProtocolPB { private ProtobufProxy<LlapPluginProtocolPB> protobuf; public LlapPluginProtocolClientImpl(Configuration conf, String hostname, int port, - RetryPolicy retryPolicy, SocketFactory socketFactory) { + RetryPolicy retryPolicy, SocketFactory socketFactory, UserGroupInformation ugi) { protobuf = new ProtobufProxy<>( - conf, hostname, port, retryPolicy, socketFactory, LlapPluginProtocolPB.class); + conf, ugi, hostname, port, retryPolicy, socketFactory, LlapPluginProtocolPB.class); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java index fa99536..c2711db 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java @@ -18,19 +18,14 @@ import javax.annotation.Nullable; import javax.net.SocketFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolProxy; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; -import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto; -import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB; -import org.apache.hadoop.hive.llap.protocol.LlapPluginProtocolPB; import org.apache.hadoop.security.UserGroupInformation; // TODO: move other protocols to use this too. @@ -42,12 +37,12 @@ public class ProtobufProxy<BlockingInterface> { private final SocketFactory socketFactory; private BlockingInterface proxy; private final Class<?> blockingInterfaceClass; + private UserGroupInformation ugi; - public ProtobufProxy(Configuration conf, String hostname, int port, - @Nullable RetryPolicy retryPolicy, - @Nullable SocketFactory socketFactory, - Class<?> blockingInterfaceClass) { + public ProtobufProxy(Configuration conf, UserGroupInformation ugi, + String hostname, int port, @Nullable RetryPolicy retryPolicy, + @Nullable SocketFactory socketFactory, Class<?> blockingInterfaceClass) { this.conf = conf; this.serverAddr = NetUtils.createSocketAddr(hostname, port); this.retryPolicy = retryPolicy; @@ -57,6 +52,7 @@ public class ProtobufProxy<BlockingInterface> { this.socketFactory = socketFactory; } this.blockingInterfaceClass = blockingInterfaceClass; + this.ugi = ugi; } public BlockingInterface getProxy() throws IOException { @@ -68,6 +64,21 @@ public class ProtobufProxy<BlockingInterface> { private BlockingInterface createProxy() throws IOException { RPC.setProtocolEngine(conf, blockingInterfaceClass, ProtobufRpcEngine.class); + if (ugi == null) return createProxyInternal(); + try { + return ugi.doAs(new PrivilegedExceptionAction<BlockingInterface>() { + @Override + public BlockingInterface run() throws IOException { + return createProxyInternal(); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private BlockingInterface createProxyInternal() throws IOException { + @SuppressWarnings("unchecked") ProtocolProxy<BlockingInterface> proxy = (ProtocolProxy<BlockingInterface>) RPC.getProtocolProxy(blockingInterfaceClass, 0, serverAddr, UserGroupInformation.getCurrentUser(), conf, socketFactory, 0, retryPolicy); http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/protobuf/LlapPluginProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-common/src/protobuf/LlapPluginProtocol.proto b/llap-common/src/protobuf/LlapPluginProtocol.proto index 39349b1..2a3f441 100644 --- a/llap-common/src/protobuf/LlapPluginProtocol.proto +++ b/llap-common/src/protobuf/LlapPluginProtocol.proto @@ -21,11 +21,8 @@ option java_outer_classname = "LlapPluginProtocolProtos"; option java_generic_services = true; option java_generate_equals_and_hash = true; -import "LlapDaemonProtocol.proto"; - message UpdateQueryRequestProto { - optional QueryIdentifierProto query_id = 1; - optional int32 guaranteed_task_count = 2; + optional int32 guaranteed_task_count = 1; } message UpdateQueryResponseProto { http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 26747fc..0bfab8b 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -253,6 +253,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { // The fields that HS2 uses to give AM information about plugin endpoint. + // Some of these will be removed when AM registry is implemented, as AM will generate and publish them. /** Whether to enable the endpoint. */ public static final String LLAP_PLUGIN_ENDPOINT_ENABLED = "llap.plugin.endpoint.enabled"; @@ -287,8 +288,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { serializedToken = jobIdForToken = null; } pluginEndpoint = new LlapPluginServerImpl(sm, - HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS), - HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_PORT), this); + HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS), this); } else { serializedToken = jobIdForToken = null; pluginEndpoint = null; http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java index 4d5333f..e9a011a 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java @@ -14,31 +14,29 @@ package org.apache.hadoop.hive.llap.tezplugins.endpoint; -import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicReference; - import com.google.protobuf.BlockingService; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapUtil; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos; import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto; import org.apache.hadoop.hive.llap.protocol.LlapPluginProtocolPB; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.service.AbstractService; import org.apache.tez.common.security.JobTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LlapPluginServerImpl extends AbstractService implements LlapPluginProtocolPB { private static final Logger LOG = LoggerFactory.getLogger(LlapPluginServerImpl.class); - private final int port; private RPC.Server server; private final SecretManager<JobTokenIdentifier> secretManager; private final int numHandlers; @@ -46,13 +44,11 @@ public class LlapPluginServerImpl extends AbstractService implements LlapPluginP private final AtomicReference<InetSocketAddress> bindAddress = new AtomicReference<>(); public LlapPluginServerImpl(SecretManager<JobTokenIdentifier> secretManager, - int numHandlers, int port, LlapTaskSchedulerService parent) { + int numHandlers, LlapTaskSchedulerService parent) { super("LlapPluginServerImpl"); this.secretManager = secretManager; - this.port = port; this.numHandlers = numHandlers; this.parent = parent; - LOG.info("Creating the plugin endpoint on port " + port); } @Override @@ -67,9 +63,10 @@ public class LlapPluginServerImpl extends AbstractService implements LlapPluginP final Configuration conf = getConfig(); final BlockingService daemonImpl = LlapPluginProtocolProtos.LlapPluginProtocol.newReflectiveBlockingService(this); - server = LlapUtil.startProtocolServer(port, numHandlers, bindAddress , conf, daemonImpl, + server = LlapUtil.startProtocolServer(0, numHandlers, bindAddress , conf, daemonImpl, LlapPluginProtocolPB.class, secretManager, new LlapPluginPolicyProvider(), ConfVars.LLAP_PLUGIN_ACL, ConfVars.LLAP_PLUGIN_ACL_DENY); + LOG.info("Starting the plugin endpoint on port " + bindAddress.get().getPort()); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 93a36c6..97df36e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -399,10 +399,9 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop Utilities.createTmpDirs(job, rWork); SessionState ss = SessionState.get(); - if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") - && ss != null) { - TezSessionState session = ss.getTezSession(); - TezSessionPoolManager.getInstance().closeIfNotDefault(session, true); + // TODO: why is there a TezSession in MR ExecDriver? + if (ss != null && HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + TezSessionPoolManager.closeIfNotDefault(ss.getTezSession(), true); } HiveConfUtil.updateJobCredentialProviders(job); http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java new file mode 100644 index 0000000..35d380c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; + +public interface AmPluginNode { + public static class AmPluginInfo { + public final int amPluginPort; + public final Token<JobTokenIdentifier> amPluginToken; + public final String amPluginTokenJobId; + public final String amHost; + + AmPluginInfo(String amHost, int amPluginPort, + Token<JobTokenIdentifier> amPluginToken, String amPluginTokenJobId) { + this.amHost = amHost; + this.amPluginPort = amPluginPort; + this.amPluginToken = amPluginToken; + this.amPluginTokenJobId = amPluginTokenJobId; + } + } + + AmPluginInfo waitForAmPluginInfo(int timeoutMs) throws InterruptedException, TimeoutException; +} http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java new file mode 100644 index 0000000..d978a25 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback; +import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; +import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto; +import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Implements query resource allocation using guaranteed tasks. */ +public class GuaranteedTasksAllocator implements QueryAllocationManager { + private static final Logger LOG = LoggerFactory.getLogger( + GuaranteedTasksAllocator.class); + private final static long CLUSTER_INFO_UPDATE_INTERVAL_MS = 120 * 1000L; + + private final Configuration conf; + private final LlapClusterStateForCompile clusterState; + private final Thread clusterStateUpdateThread; + private final LlapPluginEndpointClient amCommunicator; + + public GuaranteedTasksAllocator( + Configuration conf, LlapPluginEndpointClient amCommunicator) { + this.conf = conf; + this.clusterState = new LlapClusterStateForCompile(conf, CLUSTER_INFO_UPDATE_INTERVAL_MS); + this.amCommunicator = amCommunicator; + this.clusterStateUpdateThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + getExecutorCount(true); // Trigger an update if needed. + try { + Thread.sleep(CLUSTER_INFO_UPDATE_INTERVAL_MS / 2); + } catch (InterruptedException e) { + LOG.info("Cluster state update thread was interrupted"); + return; + } + } + } + }, "Cluster State Updater"); + clusterStateUpdateThread.setDaemon(true); + } + + @Override + public void start() { + clusterStateUpdateThread.start(); + } + + @Override + public void stop() { + clusterStateUpdateThread.interrupt(); // Don't wait for the thread. + } + + @VisibleForTesting + protected int getExecutorCount(boolean allowUpdate) { + if (!clusterState.initClusterInfo(allowUpdate)) { + LOG.warn("Failed to get LLAP cluster information for " + + HiveConf.getTrimmedVar(this.conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS) + + "; we may rely on outdated cluster status"); + } + if (!clusterState.hasClusterInfo()) { + LOG.error("No cluster information available to allocate; no guaranteed tasks will be used"); + return 0; + } + int unknownNodes = clusterState.getNodeCountWithUnknownExecutors(); + if (unknownNodes > 0) { + LOG.error("There are " + unknownNodes + " nodes with unknown executor count; only " + + clusterState.getKnownExecutorCount() + " guaranteed tasks will be allocated"); + } + return clusterState.getKnownExecutorCount(); + } + + @Override + public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) { + // Do not make a remote call unless we have no information at all. + int totalCount = getExecutorCount(false); + int totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc); + double lastDelta = 0; + for (int i = 0; i < sessionsToUpdate.size(); ++i) { + WmTezSession session = sessionsToUpdate.get(i); + int intAlloc = -1; + if (i + 1 == sessionsToUpdate.size()) { + intAlloc = totalToDistribute; + // We rely on the caller to supply a reasonable total; we could log a warning + // if this doesn't match the allocation of the last session beyond some threshold. + } else { + // This ensures we don't create skew, e.g. with 8 ducks and 5 queries with simple rounding + // we'd produce 2-2-2-2-0 as we round 1.6; whereas adding the last delta to the next query + // we'd round 1.6-1.2-1.8-1.4-2.0 and thus give out 2-1-2-1-2, as intended. + // Note that fractions don't have to all be the same like in this example. + double fraction = session.getClusterFraction(); + double allocation = fraction * totalCount + lastDelta; + double roundedAlloc = Math.round(allocation); + lastDelta = allocation - roundedAlloc; + if (roundedAlloc < 0) { + roundedAlloc = 0; // Can this happen? Delta cannot exceed 0.5. + } + intAlloc = (int)roundedAlloc; + } + // Make sure we don't give out more than allowed due to double/rounding artifacts. + if (intAlloc > totalToDistribute) { + intAlloc = totalToDistribute; + } + totalToDistribute -= intAlloc; + // This will only send update if it's necessary. + updateSessionAsync(session, intAlloc); + } + } + + private void updateSessionAsync(final WmTezSession session, final int intAlloc) { + boolean needsUpdate = session.setSendingGuaranteed(intAlloc); + if (!needsUpdate) return; + // Note: this assumes that the pattern where the same session object is reset with a different + // Tez client is not used. It was used a lot in the past but appears to be gone from most + // HS2 session pool paths, and this patch removes the last one (reopen). + UpdateQueryRequestProto request = UpdateQueryRequestProto + .newBuilder().setGuaranteedTaskCount(intAlloc).build(); + amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new UpdateCallback(session)); + } + + private final class UpdateCallback implements ExecuteRequestCallback<UpdateQueryResponseProto> { + private final WmTezSession session; + + private UpdateCallback(WmTezSession session) { + this.session = session; + } + + @Override + public void setResponse(UpdateQueryResponseProto response) { + int nextUpdate = session.setSentGuaranteed(); + if (nextUpdate >= 0) { + updateSessionAsync(session, nextUpdate); + } + } + + @Override + public void indicateError(Throwable t) { + LOG.error("Failed to update guaranteed tasks count for the session " + session, t); + boolean isOkToFail = session.setFailedToSendGuaranteed(); + if (isOkToFail) return; + // RPC already handles retries, so we will just try to kill the session here. + // This will cause the current query to fail. We could instead keep retrying. + try { + session.destroy(); + } catch (Exception e) { + LOG.error("Failed to kill the session " + session); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClient.java new file mode 100644 index 0000000..86c8a2d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClient.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback; +import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; +import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; + +public interface LlapPluginEndpointClient { + void sendUpdateQuery(UpdateQueryRequestProto request, AmPluginNode node, + ExecuteRequestCallback<UpdateQueryResponseProto> callback); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java new file mode 100644 index 0000000..45c3e38 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import javax.net.SocketFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.AsyncPbRpcProxy; +import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.hive.llap.impl.LlapPluginProtocolClientImpl; +import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; +import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto; +import org.apache.hadoop.hive.llap.protocol.LlapPluginProtocolPB; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; + +public class LlapPluginEndpointClientImpl extends + AsyncPbRpcProxy<LlapPluginProtocolPB, JobTokenIdentifier> + implements LlapPluginEndpointClient { + + public LlapPluginEndpointClientImpl( + Configuration conf, Token<JobTokenIdentifier> token, int expectedNodes) { + // A single concurrent request per node is currently hardcoded. The node includes a port number + // so different AMs on the same host count as different nodes; we only have one request type, + // and it is not useful to send more than one in parallel. + super(LlapPluginEndpointClientImpl.class.getSimpleName(), + HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_CLIENT_NUM_THREADS), conf, token, + HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, + TimeUnit.MILLISECONDS), + HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, + TimeUnit.MILLISECONDS), expectedNodes, 1); + } + + @Override + protected LlapPluginProtocolPB createProtocolImpl(Configuration conf, + String hostname, int port, UserGroupInformation ugi, + RetryPolicy retryPolicy, SocketFactory socketFactory) { + return new LlapPluginProtocolClientImpl(conf, hostname, port, retryPolicy, socketFactory, ugi); + } + + @Override + protected void shutdownProtocolImpl(LlapPluginProtocolPB proxy) { + // Nothing to do. + } + + @Override + protected String getTokenUser(Token<JobTokenIdentifier> token) { + try { + return token.decodeIdentifier().getJobId().toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.exec.tez.LlapPluginEndpointClient#sendUpdateQuery(org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto, java.lang.String, int, org.apache.hadoop.security.token.Token, org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback) + */ + @Override + public void sendUpdateQuery(UpdateQueryRequestProto request, + AmPluginNode node, ExecuteRequestCallback<UpdateQueryResponseProto> callback) { + queueRequest(new SendUpdateQueryCallable(node, request, callback)); + } + + private class SendUpdateQueryCallable + extends CallableRequest<UpdateQueryRequestProto, UpdateQueryResponseProto> { + private AmPluginNode node; + private AmPluginInfo info; + + protected SendUpdateQueryCallable(AmPluginNode node, UpdateQueryRequestProto request, + ExecuteRequestCallback<UpdateQueryResponseProto> callback) { + super(request, callback); + this.node = node; + } + + @Override + public LlapNodeId getNodeId() throws InterruptedException, TimeoutException { + ensureInfo(); + return LlapNodeId.getInstance(info.amHost, info.amPluginPort); + } + + @Override + public UpdateQueryResponseProto call() throws Exception { + ensureInfo(); + LlapNodeId nodeId = LlapNodeId.getInstance(info.amHost, info.amPluginPort); + return getProxy(nodeId, info.amPluginToken).updateQuery(null, request); + } + + private void ensureInfo() throws InterruptedException, TimeoutException { + if (info != null) return; + info = node.waitForAmPluginInfo(0); // Don't wait - should already be initialized. + if (info == null) { + throw new AssertionError("A request was created without AM plugin info for " + node); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java new file mode 100644 index 0000000..a326db3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.List; + +interface QueryAllocationManager { + void start(); + void stop(); + /** + * Updates the session allocations asynchoronously. + * @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to + * avoid various artifacts, esp. with small numbers and double weirdness. + * @param sessions Sessions to update based on their allocation fraction. + */ + void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessions); +} http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 6e8122d..4f2997b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -17,56 +17,55 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; - -import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; -import org.apache.hadoop.hive.registry.impl.TezAmInstance; -import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; +import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.HashSet; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicReference; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.registry.impl.TezAmInstance; +import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - /** * Distinct from TezSessionPool manager in that it implements a session pool, and nothing else. */ -class TezSessionPool { +class TezSessionPool<SessionType extends TezSessionPoolSession> { private static final Logger LOG = LoggerFactory.getLogger(TezSessionPool.class); /** A queue for initial sessions that have not been started yet. */ - private final Queue<TezSessionPoolSession> initialSessions = - new ConcurrentLinkedQueue<TezSessionPoolSession>(); + private final Queue<SessionType> initialSessions = + new ConcurrentLinkedQueue<SessionType>(); private final HiveConf initConf; - private final BlockingDeque<TezSessionPoolSession> defaultQueuePool; + + // TODO: eventually, this will need to support resize. That would probably require replacement + // with a RW lock, a semaphore and linked list. + private BlockingDeque<SessionType> defaultQueuePool; private final String amRegistryName; private final TezAmRegistryImpl amRegistry; - private final ConcurrentHashMap<String, TezSessionPoolSession> bySessionId = + private final ConcurrentHashMap<String, SessionType> bySessionId = new ConcurrentHashMap<>(); TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent) { this.initConf = initConf; assert numSessionsTotal > 0; - defaultQueuePool = new LinkedBlockingDeque<TezSessionPoolSession>(numSessionsTotal); + defaultQueuePool = new LinkedBlockingDeque<>(numSessionsTotal); this.amRegistry = useAmRegistryIfPresent ? TezAmRegistryImpl.create(initConf, true) : null; this.amRegistryName = amRegistry == null ? null : amRegistry.getRegistryName(); } @@ -86,7 +85,7 @@ class TezSessionPool { Preconditions.checkArgument(threadCount > 0); if (threadCount == 1) { while (true) { - TezSessionPoolSession session = initialSessions.poll(); + SessionType session = initialSessions.poll(); if (session == null) break; startInitialSession(session); } @@ -100,7 +99,7 @@ class TezSessionPool { SessionState.setCurrentSessionState(parentSessionState); } while (true) { - TezSessionPoolSession session = initialSessions.poll(); + SessionType session = initialSessions.poll(); if (session == null) break; if (firstError.get() != null) break; // Best-effort. try { @@ -130,38 +129,46 @@ class TezSessionPool { } } - void addInitialSession(TezSessionPoolSession session) { + void addInitialSession(SessionType session) { initialSessions.add(session); } - TezSessionState getSession() throws Exception { + SessionType getSession() throws Exception { while (true) { - TezSessionPoolSession result = defaultQueuePool.take(); + SessionType result = defaultQueuePool.take(); if (result.tryUse()) return result; LOG.info("Couldn't use a session [" + result + "]; attempting another one"); } } - void returnSession(TezSessionPoolSession session) throws Exception { - // TODO: should this be in pool, or pool manager? Probably common to all the use cases. + void returnSession(SessionType session) throws Exception { + // Make sure that if the session is returned to the pool, it doesn't live in the global. SessionState sessionState = SessionState.get(); if (sessionState != null) { sessionState.setTezSession(null); } - if (session.returnAfterUse()) { + if (session.stopUsing()) { defaultQueuePool.putFirst(session); } } - void replaceSession( - TezSessionPoolSession oldSession, TezSessionPoolSession newSession) throws Exception { + void replaceSession(SessionType oldSession, SessionType newSession, + boolean keepTmpDir, String[] additionalFilesArray, HiveConf conf) throws Exception { // Retain the stuff from the old session. // Re-setting the queue config is an old hack that we may remove in future. Path scratchDir = oldSession.getTezScratchDir(); - Set<String> additionalFiles = oldSession.getAdditionalFilesNotFromConf(); String queueName = oldSession.getQueueName(); + Set<String> additionalFiles = null; + if (additionalFilesArray != null) { + additionalFiles = new HashSet<>(); + for (String file : additionalFilesArray) { + additionalFiles.add(file); + } + } else { + additionalFiles = oldSession.getAdditionalFilesNotFromConf(); + } try { - oldSession.close(false); + oldSession.close(keepTmpDir); boolean wasRemoved = defaultQueuePool.remove(oldSession); if (!wasRemoved) { LOG.error("Old session was closed but it was not in the pool", oldSession); @@ -179,18 +186,18 @@ class TezSessionPool { } } - private void startInitialSession(TezSessionPoolSession session) throws Exception { + private void startInitialSession(SessionType session) throws Exception { boolean isUsable = session.tryUse(); if (!isUsable) throw new IOException(session + " is not usable at pool startup"); session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, session.getQueueName()); configureAmRegistry(session); session.open(); - if (session.returnAfterUse()) { + if (session.stopUsing()) { defaultQueuePool.put(session); } } - private void configureAmRegistry(TezSessionPoolSession session) { + private void configureAmRegistry(SessionType session) { if (amRegistryName != null) { bySessionId.put(session.getSessionId(), session); HiveConf conf = session.getConf(); @@ -206,12 +213,16 @@ class TezSessionPool { implements ServiceInstanceStateChangeListener<TezAmInstance> { @Override - public void onCreate(TezAmInstance serviceInstance) { - String sessionId = serviceInstance.getSessionId(); - TezSessionPoolSession session = bySessionId.get(sessionId); - LOG.warn("AM for " + sessionId + " has registered; updating [" + session - + "] with an endpoint at " + serviceInstance.getPluginPort()); - // TODO: actually update the session once WM is committed + public void onCreate(TezAmInstance si) { + String sessionId = si.getSessionId(); + SessionType session = bySessionId.get(sessionId); + if (session != null) { + LOG.info("AM for " + sessionId + " has registered; updating [" + session + + "] with an endpoint at " + si.getPluginPort()); + session.updateFromRegistry(si); + } else { + LOG.warn("AM for an unknown " + sessionId + " has registered; ignoring"); + } } @Override
