HIVE-17386 : support LLAP workload management in HS2 (low level only) (Sergey 
Shelukhin, reviewed by Zhiyuan Yang, Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ac24537f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ac24537f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ac24537f

Branch: refs/heads/hive-14535
Commit: ac24537f294e63682e250c25a0b96ad2d64066dd
Parents: f3cb704
Author: sergey <[email protected]>
Authored: Tue Sep 26 15:44:58 2017 -0700
Committer: sergey <[email protected]>
Committed: Tue Sep 26 15:44:58 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  11 +-
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |   2 +-
 .../hive/llap/tez/LlapProtocolClientProxy.java  |  37 +-
 .../hive/registry/impl/TezAmInstance.java       |   9 +-
 .../hadoop/hive/llap/TestAsyncPbRpcProxy.java   |   8 +-
 .../plugin/rpc/LlapPluginProtocolProtos.java    | 260 ++-----------
 .../hadoop/hive/llap/AsyncPbRpcProxy.java       |  83 +++--
 .../llap/impl/LlapPluginProtocolClientImpl.java |   5 +-
 .../hadoop/hive/llap/impl/ProtobufProxy.java    |  31 +-
 .../src/protobuf/LlapPluginProtocol.proto       |   5 +-
 .../tezplugins/LlapTaskSchedulerService.java    |   4 +-
 .../endpoint/LlapPluginServerImpl.java          |  23 +-
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      |   7 +-
 .../hadoop/hive/ql/exec/tez/AmPluginNode.java   |  43 +++
 .../ql/exec/tez/GuaranteedTasksAllocator.java   | 171 +++++++++
 .../ql/exec/tez/LlapPluginEndpointClient.java   |  30 ++
 .../exec/tez/LlapPluginEndpointClientImpl.java  | 123 ++++++
 .../ql/exec/tez/QueryAllocationManager.java     |  32 ++
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java |  85 +++--
 .../hive/ql/exec/tez/TezSessionPoolManager.java |  59 ++-
 .../hive/ql/exec/tez/TezSessionPoolSession.java |  55 ++-
 .../hive/ql/exec/tez/TezSessionState.java       |  29 +-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |  23 +-
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   | 121 ++++++
 .../hive/ql/exec/tez/WorkloadManager.java       | 372 +++++++++++++++++++
 .../ql/exec/tez/monitoring/TezJobMonitor.java   |   2 +
 .../physical/LlapClusterStateForCompile.java    | 105 +++---
 .../hive/ql/exec/tez/SampleTezSessionState.java |  27 +-
 .../exec/tez/TestGuaranteedTaskAllocator.java   | 151 ++++++++
 .../hive/ql/exec/tez/TestTezSessionPool.java    |  38 +-
 .../hadoop/hive/ql/exec/tez/TestTezTask.java    |  16 +-
 .../hive/ql/exec/tez/TestWorkloadManager.java   | 173 +++++++++
 .../apache/hive/service/server/HiveServer2.java |  22 +-
 33 files changed, 1692 insertions(+), 470 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e2bd38b..147a5d1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2397,6 +2397,13 @@ public class HiveConf extends Configuration {
         "The maximum number of past queries to show in HiverSever2 WebUI."),
 
     // Tez session settings
+    HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", 
"",
+        "A single YARN queues to use for Hive Interactive sessions. When this 
is specified,\n" +
+        "workload management is enabled and used for these sessions."),
+    
HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout",
 "30s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "The timeout for AM registry registration, after which (on attempting 
to use the\n" +
+        "session), we kill it and try to get another one."),
     HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "",
         "A list of comma separated values corresponding to YARN queues of the 
same name.\n" +
         "When HiveServer2 is launched in Tez mode, this configuration needs to 
be set\n" +
@@ -3175,8 +3182,6 @@ public class HiveConf extends Configuration {
 
     LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
       "Number of RPC handlers for LLAP daemon.", 
"llap.daemon.rpc.num.handlers"),
-    LLAP_PLUGIN_RPC_PORT("hive.llap.plugin.rpc.port", 15005,
-        "RPC port for AM LLAP daemon plugin endpoint."),
 
     LLAP_PLUGIN_RPC_NUM_HANDLERS("hive.llap.plugin.rpc.num.handlers", 1,
       "Number of RPC handlers for AM LLAP plugin endpoint."),
@@ -3274,6 +3279,8 @@ public class HiveConf extends Configuration {
     
LLAP_DAEMON_COMMUNICATOR_NUM_THREADS("hive.llap.daemon.communicator.num.threads",
 10,
       "Number of threads to use in LLAP task communicator in Tez AM.",
       "llap.daemon.communicator.num.threads"),
+    LLAP_PLUGIN_CLIENT_NUM_THREADS("hive.llap.plugin.client.num.threads", 10,
+        "Number of threads to use in LLAP task plugin client."),
     
LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS("hive.llap.daemon.download.permanent.fns", 
false,
         "Whether LLAP daemon should localize the resources for permanent 
UDFs."),
     
LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", 
"llap",

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java 
b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 05f8a5f..4477954 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -655,7 +655,7 @@ public class QTestUtil {
     }
 
     if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) {
-      SessionState.get().getTezSession().close(false);
+      SessionState.get().getTezSession().destroy();
     }
     setup.tearDown();
     if (sparkSession != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
index b650184..211696a 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
@@ -46,11 +46,13 @@ public class LlapProtocolClientProxy
 
   public LlapProtocolClientProxy(
       int numThreads, Configuration conf, Token<LlapTokenIdentifier> 
llapToken) {
+    // We could pass in the number of nodes that we expect instead of -1.
+    // Also, a single concurrent request per node is currently hardcoded.
     super(LlapProtocolClientProxy.class.getSimpleName(), numThreads, conf, 
llapToken,
         HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS,
             TimeUnit.MILLISECONDS),
         HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
-            TimeUnit.MILLISECONDS), -1);
+            TimeUnit.MILLISECONDS), -1, 1);
   }
 
   public void sendSubmitWork(SubmitWorkRequestProto request, String host, int 
port,
@@ -71,13 +73,6 @@ public class LlapProtocolClientProxy
     queueRequest(new SendQueryCompleteCallable(nodeId, request, callback));
   }
 
-
-  public void sendUpdateFragment(final UpdateFragmentRequestProto request, 
final String host,
-      final int port, final 
ExecuteRequestCallback<UpdateFragmentResponseProto> callback) {
-    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
-    queueRequest(new SendUpdateFragmentCallable(nodeId, request, callback));
-  }
-
   public void sendTerminateFragment(final TerminateFragmentRequestProto 
request, final String host,
                                     final int port,
                                     final 
ExecuteRequestCallback<TerminateFragmentResponseProto> callback) {
@@ -85,7 +80,13 @@ public class LlapProtocolClientProxy
     queueRequest(new SendTerminateFragmentCallable(nodeId, request, callback));
   }
 
-  private class SubmitWorkCallable extends 
CallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> {
+  public void sendUpdateFragment(final UpdateFragmentRequestProto request, 
final String host,
+      final int port, final 
ExecuteRequestCallback<UpdateFragmentResponseProto> callback) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    queueRequest(new SendUpdateFragmentCallable(nodeId, request, callback));
+  }
+
+  private class SubmitWorkCallable extends 
NodeCallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> {
 
     protected SubmitWorkCallable(LlapNodeId nodeId,
                           SubmitWorkRequestProto submitWorkRequestProto,
@@ -95,12 +96,12 @@ public class LlapProtocolClientProxy
 
     @Override
     public SubmitWorkResponseProto call() throws Exception {
-      return getProxy(nodeId).submitWork(null, request);
+      return getProxy(nodeId, null).submitWork(null, request);
     }
   }
 
   private class SendSourceStateUpdateCallable
-      extends CallableRequest<SourceStateUpdatedRequestProto, 
SourceStateUpdatedResponseProto> {
+      extends NodeCallableRequest<SourceStateUpdatedRequestProto, 
SourceStateUpdatedResponseProto> {
 
     public SendSourceStateUpdateCallable(LlapNodeId nodeId,
                                          SourceStateUpdatedRequestProto 
request,
@@ -110,12 +111,12 @@ public class LlapProtocolClientProxy
 
     @Override
     public SourceStateUpdatedResponseProto call() throws Exception {
-      return getProxy(nodeId).sourceStateUpdated(null, request);
+      return getProxy(nodeId, null).sourceStateUpdated(null, request);
     }
   }
 
   private class SendQueryCompleteCallable
-      extends CallableRequest<QueryCompleteRequestProto, 
QueryCompleteResponseProto> {
+      extends NodeCallableRequest<QueryCompleteRequestProto, 
QueryCompleteResponseProto> {
 
     protected SendQueryCompleteCallable(LlapNodeId nodeId,
                                         QueryCompleteRequestProto 
queryCompleteRequestProto,
@@ -125,12 +126,12 @@ public class LlapProtocolClientProxy
 
     @Override
     public QueryCompleteResponseProto call() throws Exception {
-      return getProxy(nodeId).queryComplete(null, request);
+      return getProxy(nodeId, null).queryComplete(null, request);
     }
   }
 
   private class SendTerminateFragmentCallable
-      extends CallableRequest<TerminateFragmentRequestProto, 
TerminateFragmentResponseProto> {
+      extends NodeCallableRequest<TerminateFragmentRequestProto, 
TerminateFragmentResponseProto> {
 
     protected SendTerminateFragmentCallable(LlapNodeId nodeId,
                                             TerminateFragmentRequestProto 
terminateFragmentRequestProto,
@@ -140,12 +141,12 @@ public class LlapProtocolClientProxy
 
     @Override
     public TerminateFragmentResponseProto call() throws Exception {
-      return getProxy(nodeId).terminateFragment(null, request);
+      return getProxy(nodeId, null).terminateFragment(null, request);
     }
   }
 
   private class SendUpdateFragmentCallable
-      extends CallableRequest<UpdateFragmentRequestProto, 
UpdateFragmentResponseProto> {
+      extends NodeCallableRequest<UpdateFragmentRequestProto, 
UpdateFragmentResponseProto> {
 
     protected SendUpdateFragmentCallable(LlapNodeId nodeId,
         UpdateFragmentRequestProto terminateFragmentRequestProto,
@@ -155,7 +156,7 @@ public class LlapProtocolClientProxy
 
     @Override
     public UpdateFragmentResponseProto call() throws Exception {
-      return getProxy(nodeId).updateFragment(null, request);
+      return getProxy(nodeId, null).updateFragment(null, request);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
index a71904c..0724cf5 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
@@ -54,7 +54,7 @@ public class TezAmInstance extends ServiceInstanceBase {
     return getProperties().get(TezAmRegistryImpl.AM_SESSION_ID);
   }
 
-  public String getJobIdForPluginToken() {
+  public String getPluginTokenJobId() {
     return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID);
   }
 
@@ -73,4 +73,11 @@ public class TezAmInstance extends ServiceInstanceBase {
     this.token = token;
     return token;
   }
+
+  @Override
+  public String toString() {
+    return "TezAmInstance [" + getSessionId() + ", host=" + host + ", 
rpcPort=" + rpcPort +
+        ", pluginPort=" + pluginPort + ", token=" + token + "]";
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java 
b/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java
index 1c4f0e7..b152f1c 100644
--- a/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java
+++ b/llap-client/src/test/org/apache/hadoop/hive/llap/TestAsyncPbRpcProxy.java
@@ -33,7 +33,7 @@ import org.junit.Test;
 public class TestAsyncPbRpcProxy {
 
   @Test (timeout = 5000)
-  public void testMultipleNodes() {
+  public void testMultipleNodes() throws Exception {
     RequestManagerForTest requestManager = new RequestManagerForTest(1);
 
     LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
@@ -62,7 +62,7 @@ public class TestAsyncPbRpcProxy {
   }
 
   @Test(timeout = 5000)
-  public void testSingleInvocationPerNode() {
+  public void testSingleInvocationPerNode() throws Exception {
     RequestManagerForTest requestManager = new RequestManagerForTest(1);
 
     LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
@@ -109,7 +109,7 @@ public class TestAsyncPbRpcProxy {
     private Map<LlapNodeId, MutableInt> numInvocationsPerNode = new 
HashMap<>();
 
     public RequestManagerForTest(int numThreads) {
-      super(numThreads);
+      super(numThreads, 1);
     }
 
     protected void submitToExecutor(LlapProtocolClientProxy.CallableRequest 
request, LlapNodeId nodeId) {
@@ -129,7 +129,7 @@ public class TestAsyncPbRpcProxy {
 
   }
 
-  static class CallableRequestForTest extends 
LlapProtocolClientProxy.CallableRequest<Message, Message> {
+  static class CallableRequestForTest extends 
LlapProtocolClientProxy.NodeCallableRequest<Message, Message> {
 
     protected CallableRequestForTest(LlapNodeId nodeId, Message message,
                                      
LlapProtocolClientProxy.ExecuteRequestCallback<Message> callback) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java
 
b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java
index 61eb21a..dbcd895 100644
--- 
a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java
+++ 
b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/plugin/rpc/LlapPluginProtocolProtos.java
@@ -11,27 +11,13 @@ public final class LlapPluginProtocolProtos {
   public interface UpdateQueryRequestProtoOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // optional .QueryIdentifierProto query_id = 1;
+    // optional int32 guaranteed_task_count = 1;
     /**
-     * <code>optional .QueryIdentifierProto query_id = 1;</code>
-     */
-    boolean hasQueryId();
-    /**
-     * <code>optional .QueryIdentifierProto query_id = 1;</code>
-     */
-    
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto
 getQueryId();
-    /**
-     * <code>optional .QueryIdentifierProto query_id = 1;</code>
-     */
-    
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder
 getQueryIdOrBuilder();
-
-    // optional int32 guaranteed_task_count = 2;
-    /**
-     * <code>optional int32 guaranteed_task_count = 2;</code>
+     * <code>optional int32 guaranteed_task_count = 1;</code>
      */
     boolean hasGuaranteedTaskCount();
     /**
-     * <code>optional int32 guaranteed_task_count = 2;</code>
+     * <code>optional int32 guaranteed_task_count = 1;</code>
      */
     int getGuaranteedTaskCount();
   }
@@ -86,21 +72,8 @@ public final class LlapPluginProtocolProtos {
               }
               break;
             }
-            case 10: {
-              
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder
 subBuilder = null;
-              if (((bitField0_ & 0x00000001) == 0x00000001)) {
-                subBuilder = queryId_.toBuilder();
-              }
-              queryId_ = 
input.readMessage(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.PARSER,
 extensionRegistry);
-              if (subBuilder != null) {
-                subBuilder.mergeFrom(queryId_);
-                queryId_ = subBuilder.buildPartial();
-              }
+            case 8: {
               bitField0_ |= 0x00000001;
-              break;
-            }
-            case 16: {
-              bitField0_ |= 0x00000002;
               guaranteedTaskCount_ = input.readInt32();
               break;
             }
@@ -144,46 +117,23 @@ public final class LlapPluginProtocolProtos {
     }
 
     private int bitField0_;
-    // optional .QueryIdentifierProto query_id = 1;
-    public static final int QUERY_ID_FIELD_NUMBER = 1;
-    private 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto
 queryId_;
-    /**
-     * <code>optional .QueryIdentifierProto query_id = 1;</code>
-     */
-    public boolean hasQueryId() {
-      return ((bitField0_ & 0x00000001) == 0x00000001);
-    }
-    /**
-     * <code>optional .QueryIdentifierProto query_id = 1;</code>
-     */
-    public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto
 getQueryId() {
-      return queryId_;
-    }
-    /**
-     * <code>optional .QueryIdentifierProto query_id = 1;</code>
-     */
-    public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder
 getQueryIdOrBuilder() {
-      return queryId_;
-    }
-
-    // optional int32 guaranteed_task_count = 2;
-    public static final int GUARANTEED_TASK_COUNT_FIELD_NUMBER = 2;
+    // optional int32 guaranteed_task_count = 1;
+    public static final int GUARANTEED_TASK_COUNT_FIELD_NUMBER = 1;
     private int guaranteedTaskCount_;
     /**
-     * <code>optional int32 guaranteed_task_count = 2;</code>
+     * <code>optional int32 guaranteed_task_count = 1;</code>
      */
     public boolean hasGuaranteedTaskCount() {
-      return ((bitField0_ & 0x00000002) == 0x00000002);
+      return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>optional int32 guaranteed_task_count = 2;</code>
+     * <code>optional int32 guaranteed_task_count = 1;</code>
      */
     public int getGuaranteedTaskCount() {
       return guaranteedTaskCount_;
     }
 
     private void initFields() {
-      queryId_ = 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance();
       guaranteedTaskCount_ = 0;
     }
     private byte memoizedIsInitialized = -1;
@@ -199,10 +149,7 @@ public final class LlapPluginProtocolProtos {
                         throws java.io.IOException {
       getSerializedSize();
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
-        output.writeMessage(1, queryId_);
-      }
-      if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        output.writeInt32(2, guaranteedTaskCount_);
+        output.writeInt32(1, guaranteedTaskCount_);
       }
       getUnknownFields().writeTo(output);
     }
@@ -215,11 +162,7 @@ public final class LlapPluginProtocolProtos {
       size = 0;
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeMessageSize(1, queryId_);
-      }
-      if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeInt32Size(2, guaranteedTaskCount_);
+          .computeInt32Size(1, guaranteedTaskCount_);
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
@@ -244,11 +187,6 @@ public final class LlapPluginProtocolProtos {
       
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto
 other = 
(org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto)
 obj;
 
       boolean result = true;
-      result = result && (hasQueryId() == other.hasQueryId());
-      if (hasQueryId()) {
-        result = result && getQueryId()
-            .equals(other.getQueryId());
-      }
       result = result && (hasGuaranteedTaskCount() == 
other.hasGuaranteedTaskCount());
       if (hasGuaranteedTaskCount()) {
         result = result && (getGuaranteedTaskCount()
@@ -267,10 +205,6 @@ public final class LlapPluginProtocolProtos {
       }
       int hash = 41;
       hash = (19 * hash) + getDescriptorForType().hashCode();
-      if (hasQueryId()) {
-        hash = (37 * hash) + QUERY_ID_FIELD_NUMBER;
-        hash = (53 * hash) + getQueryId().hashCode();
-      }
       if (hasGuaranteedTaskCount()) {
         hash = (37 * hash) + GUARANTEED_TASK_COUNT_FIELD_NUMBER;
         hash = (53 * hash) + getGuaranteedTaskCount();
@@ -376,7 +310,6 @@ public final class LlapPluginProtocolProtos {
       }
       private void maybeForceBuilderInitialization() {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
-          getQueryIdFieldBuilder();
         }
       }
       private static Builder create() {
@@ -385,14 +318,8 @@ public final class LlapPluginProtocolProtos {
 
       public Builder clear() {
         super.clear();
-        if (queryIdBuilder_ == null) {
-          queryId_ = 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance();
-        } else {
-          queryIdBuilder_.clear();
-        }
-        bitField0_ = (bitField0_ & ~0x00000001);
         guaranteedTaskCount_ = 0;
-        bitField0_ = (bitField0_ & ~0x00000002);
+        bitField0_ = (bitField0_ & ~0x00000001);
         return this;
       }
 
@@ -424,14 +351,6 @@ public final class LlapPluginProtocolProtos {
         if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
           to_bitField0_ |= 0x00000001;
         }
-        if (queryIdBuilder_ == null) {
-          result.queryId_ = queryId_;
-        } else {
-          result.queryId_ = queryIdBuilder_.build();
-        }
-        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
-          to_bitField0_ |= 0x00000002;
-        }
         result.guaranteedTaskCount_ = guaranteedTaskCount_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
@@ -449,9 +368,6 @@ public final class LlapPluginProtocolProtos {
 
       public Builder 
mergeFrom(org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto
 other) {
         if (other == 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto.getDefaultInstance())
 return this;
-        if (other.hasQueryId()) {
-          mergeQueryId(other.getQueryId());
-        }
         if (other.hasGuaranteedTaskCount()) {
           setGuaranteedTaskCount(other.getGuaranteedTaskCount());
         }
@@ -482,151 +398,34 @@ public final class LlapPluginProtocolProtos {
       }
       private int bitField0_;
 
-      // optional .QueryIdentifierProto query_id = 1;
-      private 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto
 queryId_ = 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance();
-      private com.google.protobuf.SingleFieldBuilder<
-          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder>
 queryIdBuilder_;
-      /**
-       * <code>optional .QueryIdentifierProto query_id = 1;</code>
-       */
-      public boolean hasQueryId() {
-        return ((bitField0_ & 0x00000001) == 0x00000001);
-      }
-      /**
-       * <code>optional .QueryIdentifierProto query_id = 1;</code>
-       */
-      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto
 getQueryId() {
-        if (queryIdBuilder_ == null) {
-          return queryId_;
-        } else {
-          return queryIdBuilder_.getMessage();
-        }
-      }
-      /**
-       * <code>optional .QueryIdentifierProto query_id = 1;</code>
-       */
-      public Builder 
setQueryId(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto
 value) {
-        if (queryIdBuilder_ == null) {
-          if (value == null) {
-            throw new NullPointerException();
-          }
-          queryId_ = value;
-          onChanged();
-        } else {
-          queryIdBuilder_.setMessage(value);
-        }
-        bitField0_ |= 0x00000001;
-        return this;
-      }
-      /**
-       * <code>optional .QueryIdentifierProto query_id = 1;</code>
-       */
-      public Builder setQueryId(
-          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder
 builderForValue) {
-        if (queryIdBuilder_ == null) {
-          queryId_ = builderForValue.build();
-          onChanged();
-        } else {
-          queryIdBuilder_.setMessage(builderForValue.build());
-        }
-        bitField0_ |= 0x00000001;
-        return this;
-      }
-      /**
-       * <code>optional .QueryIdentifierProto query_id = 1;</code>
-       */
-      public Builder 
mergeQueryId(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto
 value) {
-        if (queryIdBuilder_ == null) {
-          if (((bitField0_ & 0x00000001) == 0x00000001) &&
-              queryId_ != 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance())
 {
-            queryId_ =
-              
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder(queryId_).mergeFrom(value).buildPartial();
-          } else {
-            queryId_ = value;
-          }
-          onChanged();
-        } else {
-          queryIdBuilder_.mergeFrom(value);
-        }
-        bitField0_ |= 0x00000001;
-        return this;
-      }
-      /**
-       * <code>optional .QueryIdentifierProto query_id = 1;</code>
-       */
-      public Builder clearQueryId() {
-        if (queryIdBuilder_ == null) {
-          queryId_ = 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.getDefaultInstance();
-          onChanged();
-        } else {
-          queryIdBuilder_.clear();
-        }
-        bitField0_ = (bitField0_ & ~0x00000001);
-        return this;
-      }
-      /**
-       * <code>optional .QueryIdentifierProto query_id = 1;</code>
-       */
-      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder
 getQueryIdBuilder() {
-        bitField0_ |= 0x00000001;
-        onChanged();
-        return getQueryIdFieldBuilder().getBuilder();
-      }
-      /**
-       * <code>optional .QueryIdentifierProto query_id = 1;</code>
-       */
-      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder
 getQueryIdOrBuilder() {
-        if (queryIdBuilder_ != null) {
-          return queryIdBuilder_.getMessageOrBuilder();
-        } else {
-          return queryId_;
-        }
-      }
-      /**
-       * <code>optional .QueryIdentifierProto query_id = 1;</code>
-       */
-      private com.google.protobuf.SingleFieldBuilder<
-          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder>
 
-          getQueryIdFieldBuilder() {
-        if (queryIdBuilder_ == null) {
-          queryIdBuilder_ = new com.google.protobuf.SingleFieldBuilder<
-              
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto.Builder,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProtoOrBuilder>(
-                  queryId_,
-                  getParentForChildren(),
-                  isClean());
-          queryId_ = null;
-        }
-        return queryIdBuilder_;
-      }
-
-      // optional int32 guaranteed_task_count = 2;
+      // optional int32 guaranteed_task_count = 1;
       private int guaranteedTaskCount_ ;
       /**
-       * <code>optional int32 guaranteed_task_count = 2;</code>
+       * <code>optional int32 guaranteed_task_count = 1;</code>
        */
       public boolean hasGuaranteedTaskCount() {
-        return ((bitField0_ & 0x00000002) == 0x00000002);
+        return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>optional int32 guaranteed_task_count = 2;</code>
+       * <code>optional int32 guaranteed_task_count = 1;</code>
        */
       public int getGuaranteedTaskCount() {
         return guaranteedTaskCount_;
       }
       /**
-       * <code>optional int32 guaranteed_task_count = 2;</code>
+       * <code>optional int32 guaranteed_task_count = 1;</code>
        */
       public Builder setGuaranteedTaskCount(int value) {
-        bitField0_ |= 0x00000002;
+        bitField0_ |= 0x00000001;
         guaranteedTaskCount_ = value;
         onChanged();
         return this;
       }
       /**
-       * <code>optional int32 guaranteed_task_count = 2;</code>
+       * <code>optional int32 guaranteed_task_count = 1;</code>
        */
       public Builder clearGuaranteedTaskCount() {
-        bitField0_ = (bitField0_ & ~0x00000002);
+        bitField0_ = (bitField0_ & ~0x00000001);
         guaranteedTaskCount_ = 0;
         onChanged();
         return this;
@@ -1232,15 +1031,13 @@ public final class LlapPluginProtocolProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\030LlapPluginProtocol.proto\032\030LlapDaemonPr" +
-      "otocol.proto\"a\n\027UpdateQueryRequestProto\022" +
-      "\'\n\010query_id\030\001 \001(\0132\025.QueryIdentifierProto" +
-      "\022\035\n\025guaranteed_task_count\030\002 \001(\005\"\032\n\030Updat" 
+
-      "eQueryResponseProto2X\n\022LlapPluginProtoco" +
-      "l\022B\n\013updateQuery\022\030.UpdateQueryRequestPro" +
-      "to\032\031.UpdateQueryResponseProtoBH\n&org.apa" +
-      "che.hadoop.hive.llap.plugin.rpcB\030LlapPlu" +
-      "ginProtocolProtos\210\001\001\240\001\001"
+      "\n\030LlapPluginProtocol.proto\"8\n\027UpdateQuer" +
+      "yRequestProto\022\035\n\025guaranteed_task_count\030\001" +
+      " \001(\005\"\032\n\030UpdateQueryResponseProto2X\n\022Llap" +
+      "PluginProtocol\022B\n\013updateQuery\022\030.UpdateQu" +
+      "eryRequestProto\032\031.UpdateQueryResponsePro" +
+      "toBH\n&org.apache.hadoop.hive.llap.plugin" +
+      ".rpcB\030LlapPluginProtocolProtos\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1252,7 +1049,7 @@ public final class LlapPluginProtocolProtos {
           internal_static_UpdateQueryRequestProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_UpdateQueryRequestProto_descriptor,
-              new java.lang.String[] { "QueryId", "GuaranteedTaskCount", });
+              new java.lang.String[] { "GuaranteedTaskCount", });
           internal_static_UpdateQueryResponseProto_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_UpdateQueryResponseProto_fieldAccessorTable = new
@@ -1265,7 +1062,6 @@ public final class LlapPluginProtocolProtos {
     com.google.protobuf.Descriptors.FileDescriptor
       .internalBuildGeneratedFileFrom(descriptorData,
         new com.google.protobuf.Descriptors.FileDescriptor[] {
-          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor(),
         }, assigner);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
index 7726794..0415169 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
@@ -81,7 +81,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType 
extends TokenIdent
       private final Condition queueCondition = lock.newCondition();
       private final AtomicBoolean shouldRun = new AtomicBoolean(false);
 
-      private final int maxConcurrentRequestsPerNode = 1;
+      private final int maxConcurrentRequestsPerNode;
       private final ListeningExecutorService executor;
 
 
@@ -97,9 +97,10 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
       // Tracks completed requests pre node
       private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>();
 
-      public RequestManager(int numThreads) {
+      public RequestManager(int numThreads, int maxPerNode) {
         ExecutorService localExecutor = 
Executors.newFixedThreadPool(numThreads,
             new ThreadFactoryBuilder().setNameFormat("TaskCommunicator 
#%2d").build());
+        maxConcurrentRequestsPerNode = maxPerNode;
         executor = MoreExecutors.listeningDecorator(localExecutor);
       }
 
@@ -123,12 +124,8 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
               break;
             }
           } catch (InterruptedException e) {
-            if (isShutdown.get()) {
-              break;
-            } else {
-              LOG.warn("RunLoop interrupted without being shutdown first");
-              throw new RuntimeException(e);
-            }
+            handleInterrupt(e);
+            break;
           } finally {
             lock.unlock();
           }
@@ -137,6 +134,12 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
         return null;
       }
 
+      private void handleInterrupt(InterruptedException e) {
+        if (isShutdown.get()) return;
+        LOG.warn("RunLoop interrupted without being shutdown first");
+        throw new RuntimeException(e);
+      }
+
       /* Add a new request to be executed */
       public void queueRequest(CallableRequest<?, ?> request) {
         synchronized (newRequestList) {
@@ -171,7 +174,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
       }
 
       @VisibleForTesting
-      boolean process() {
+      boolean process() throws InterruptedException {
         if (isShutdown.get()) {
           return true;
         }
@@ -182,6 +185,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
         // otherwise an add/completion after draining the lists but before 
setting it to false,
         // will not trigger a run. May cause one unnecessary run if an add 
comes in before drain.
         // drain list. add request (setTrue). setFalse needs to be avoided.
+        // TODO: why CAS if the result is not checked?
         shouldRun.compareAndSet(true, false);
         // Drain any calls which may have come in during the last execution of 
the loop.
         drainNewRequestList();  // Locks newRequestList
@@ -192,7 +196,16 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
         while (iterator.hasNext()) {
           CallableRequest<?, ?> request = iterator.next();
           iterator.remove();
-          LlapNodeId nodeId = request.getNodeId();
+          LlapNodeId nodeId;
+          try {
+            nodeId = request.getNodeId();
+          } catch (InterruptedException e) {
+            throw e;
+          } catch (Exception e) {
+            request.getCallback().indicateError(e);
+            continue;
+          }
+
           if (canRunForNode(nodeId, currentLoopDisabledNodes)) {
             submitToExecutor(request, nodeId);
           } else {
@@ -321,19 +334,15 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
   @VisibleForTesting
   protected static abstract class CallableRequest<REQUEST extends Message, 
RESPONSE extends Message>
         implements Callable<RESPONSE> {
-    protected final LlapNodeId nodeId;
     protected final ExecuteRequestCallback<RESPONSE> callback;
     protected final REQUEST request;
 
-    protected CallableRequest(LlapNodeId nodeId, REQUEST request, 
ExecuteRequestCallback<RESPONSE> callback) {
-      this.nodeId = nodeId;
+    protected CallableRequest(REQUEST request, 
ExecuteRequestCallback<RESPONSE> callback) {
       this.request = request;
       this.callback = callback;
     }
 
-    public LlapNodeId getNodeId() {
-      return nodeId;
-    }
+    public abstract LlapNodeId getNodeId() throws Exception;
 
     public ExecuteRequestCallback<RESPONSE> getCallback() {
       return callback;
@@ -342,13 +351,31 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
     public abstract RESPONSE call() throws Exception;
   }
 
+
+  @VisibleForTesting
+  protected static abstract class NodeCallableRequest<
+    REQUEST extends Message, RESPONSE extends Message> extends 
CallableRequest<REQUEST, RESPONSE> {
+    protected final LlapNodeId nodeId;
+
+    protected NodeCallableRequest(LlapNodeId nodeId, REQUEST request,
+        ExecuteRequestCallback<RESPONSE> callback) {
+      super(request, callback);
+      this.nodeId = nodeId;
+    }
+
+    @Override
+    public LlapNodeId getNodeId() {
+      return nodeId;
+    }
+  }
+
   public interface ExecuteRequestCallback<T extends Message> {
     void setResponse(T response);
     void indicateError(Throwable t);
   }
 
-  public AsyncPbRpcProxy(String name, int numThreads, Configuration conf,
-      Token<TokenType> token, long connectionTimeoutMs, long retrySleepMs, int 
expectedNodes) {
+  public AsyncPbRpcProxy(String name, int numThreads, Configuration conf, 
Token<TokenType> token,
+      long connectionTimeoutMs, long retrySleepMs, int expectedNodes, int 
maxPerNode) {
     super(name);
     // Note: we may make size/etc. configurable later.
     CacheBuilder<String, ProtocolType> cb = 
CacheBuilder.newBuilder().expireAfterAccess(
@@ -371,7 +398,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
     this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
         connectionTimeoutMs, retrySleepMs, TimeUnit.MILLISECONDS);
 
-    this.requestManager = new RequestManager(numThreads);
+    this.requestManager = new RequestManager(numThreads, maxPerNode);
     ExecutorService localExecutor = Executors.newFixedThreadPool(1,
         new 
ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
     this.requestManagerExecutor = 
MoreExecutors.listeningDecorator(localExecutor);
@@ -382,13 +409,19 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
         "retrySleep(millis)=" + retrySleepMs);
   }
 
-  protected final ProtocolType getProxy(final LlapNodeId nodeId) {
+  /**
+   * @param nodeId Hostname + post.
+   * @param nodeToken A custom node token. If not specified, the default token 
is used.
+   * @return the protocol client implementation for the node.
+   */
+  protected final ProtocolType getProxy(
+      final LlapNodeId nodeId, final Token<TokenType> nodeToken) {
     String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
     try {
       return hostProxies.get(hostId, new Callable<ProtocolType>() {
         @Override
         public ProtocolType call() throws Exception {
-          return createProxy(nodeId);
+          return createProxy(nodeId, nodeToken);
         }
       });
     } catch (ExecutionException e) {
@@ -396,8 +429,8 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
     }
   }
 
-  private ProtocolType createProxy(final LlapNodeId nodeId) {
-    if (token == null) {
+  private ProtocolType createProxy(final LlapNodeId nodeId, Token<TokenType> 
nodeToken) {
+    if (nodeToken == null && token == null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Creating a client without a token for " + nodeId);
       }
@@ -406,7 +439,9 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
     }
     final UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(tokenUser);
     // Clone the token as we'd need to set the service to the one we are 
talking to.
-    Token<TokenType> nodeToken = new Token<TokenType>(token);
+    if (nodeToken == null) {
+      nodeToken = new Token<TokenType>(token);
+    }
     SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost(
         nodeId.getHostname(), nodeId.getPort()));
     ugi.addToken(nodeToken);

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java
 
b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java
index 19e81e6..3883052 100644
--- 
a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java
+++ 
b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapPluginProtocolClientImpl.java
@@ -21,6 +21,7 @@ import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.UserGroupInformation;
 import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
 import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto;
 import org.apache.hadoop.hive.llap.protocol.LlapPluginProtocolPB;
@@ -29,9 +30,9 @@ public class LlapPluginProtocolClientImpl implements 
LlapPluginProtocolPB {
   private ProtobufProxy<LlapPluginProtocolPB> protobuf;
 
   public LlapPluginProtocolClientImpl(Configuration conf, String hostname, int 
port,
-      RetryPolicy retryPolicy, SocketFactory socketFactory) {
+      RetryPolicy retryPolicy, SocketFactory socketFactory, 
UserGroupInformation ugi) {
     protobuf = new ProtobufProxy<>(
-        conf, hostname, port, retryPolicy, socketFactory, 
LlapPluginProtocolPB.class);
+        conf, ugi, hostname, port, retryPolicy, socketFactory, 
LlapPluginProtocolPB.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java
index fa99536..c2711db 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/ProtobufProxy.java
@@ -18,19 +18,14 @@ import javax.annotation.Nullable;
 import javax.net.SocketFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolProxy;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
-import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
-import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto;
-import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
-import org.apache.hadoop.hive.llap.protocol.LlapPluginProtocolPB;
 import org.apache.hadoop.security.UserGroupInformation;
 
 // TODO: move other protocols to use this too.
@@ -42,12 +37,12 @@ public class ProtobufProxy<BlockingInterface> {
   private final SocketFactory socketFactory;
   private BlockingInterface proxy;
   private final Class<?> blockingInterfaceClass;
+  private UserGroupInformation ugi;
 
 
-  public ProtobufProxy(Configuration conf, String hostname, int port,
-                                      @Nullable RetryPolicy retryPolicy,
-                                      @Nullable SocketFactory socketFactory,
-                                      Class<?> blockingInterfaceClass) {
+  public ProtobufProxy(Configuration conf, UserGroupInformation ugi,
+      String hostname, int port, @Nullable RetryPolicy retryPolicy,
+      @Nullable SocketFactory socketFactory, Class<?> blockingInterfaceClass) {
     this.conf = conf;
     this.serverAddr = NetUtils.createSocketAddr(hostname, port);
     this.retryPolicy = retryPolicy;
@@ -57,6 +52,7 @@ public class ProtobufProxy<BlockingInterface> {
       this.socketFactory = socketFactory;
     }
     this.blockingInterfaceClass = blockingInterfaceClass;
+    this.ugi = ugi;
   }
 
   public BlockingInterface getProxy() throws IOException {
@@ -68,6 +64,21 @@ public class ProtobufProxy<BlockingInterface> {
 
   private BlockingInterface createProxy() throws IOException {
     RPC.setProtocolEngine(conf, blockingInterfaceClass, 
ProtobufRpcEngine.class);
+    if (ugi == null) return createProxyInternal();
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<BlockingInterface>() {
+        @Override
+        public BlockingInterface run() throws IOException {
+          return createProxyInternal();
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private BlockingInterface createProxyInternal() throws IOException {
+    @SuppressWarnings("unchecked")
     ProtocolProxy<BlockingInterface> proxy = (ProtocolProxy<BlockingInterface>)
         RPC.getProtocolProxy(blockingInterfaceClass, 0, serverAddr,
             UserGroupInformation.getCurrentUser(), conf, socketFactory, 0, 
retryPolicy);

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-common/src/protobuf/LlapPluginProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapPluginProtocol.proto 
b/llap-common/src/protobuf/LlapPluginProtocol.proto
index 39349b1..2a3f441 100644
--- a/llap-common/src/protobuf/LlapPluginProtocol.proto
+++ b/llap-common/src/protobuf/LlapPluginProtocol.proto
@@ -21,11 +21,8 @@ option java_outer_classname = "LlapPluginProtocolProtos";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 
-import "LlapDaemonProtocol.proto";
-
 message UpdateQueryRequestProto {
-  optional QueryIdentifierProto query_id = 1;
-  optional int32 guaranteed_task_count = 2;
+  optional int32 guaranteed_task_count = 1;
 }
 
 message UpdateQueryResponseProto {

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 26747fc..0bfab8b 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -253,6 +253,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
 
 
   // The fields that HS2 uses to give AM information about plugin endpoint.
+  // Some of these will be removed when AM registry is implemented, as AM will 
generate and publish them.
   /** Whether to enable the endpoint. */
   public static final String LLAP_PLUGIN_ENDPOINT_ENABLED = 
"llap.plugin.endpoint.enabled";
 
@@ -287,8 +288,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
         serializedToken = jobIdForToken = null;
       }
       pluginEndpoint = new LlapPluginServerImpl(sm,
-          HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS),
-          HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_PORT), this);
+          HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS), 
this);
     } else {
       serializedToken = jobIdForToken = null;
       pluginEndpoint = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
index 4d5333f..e9a011a 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
@@ -14,31 +14,29 @@
 
 package org.apache.hadoop.hive.llap.tezplugins.endpoint;
 
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicReference;
-
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapUtil;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos;
 import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
 import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto;
 import org.apache.hadoop.hive.llap.protocol.LlapPluginProtocolPB;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.tez.common.security.JobTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LlapPluginServerImpl extends AbstractService implements 
LlapPluginProtocolPB {
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapPluginServerImpl.class);
 
-  private final int port;
   private RPC.Server server;
   private final SecretManager<JobTokenIdentifier> secretManager;
   private final int numHandlers;
@@ -46,13 +44,11 @@ public class LlapPluginServerImpl extends AbstractService 
implements LlapPluginP
   private final AtomicReference<InetSocketAddress> bindAddress = new 
AtomicReference<>();
 
   public LlapPluginServerImpl(SecretManager<JobTokenIdentifier> secretManager,
-      int numHandlers, int port, LlapTaskSchedulerService parent) {
+      int numHandlers, LlapTaskSchedulerService parent) {
     super("LlapPluginServerImpl");
     this.secretManager = secretManager;
-    this.port = port;
     this.numHandlers = numHandlers;
     this.parent = parent;
-    LOG.info("Creating the plugin endpoint on port " + port);
   }
 
   @Override
@@ -67,9 +63,10 @@ public class LlapPluginServerImpl extends AbstractService 
implements LlapPluginP
     final Configuration conf = getConfig();
     final BlockingService daemonImpl =
         
LlapPluginProtocolProtos.LlapPluginProtocol.newReflectiveBlockingService(this);
-    server = LlapUtil.startProtocolServer(port, numHandlers, bindAddress , 
conf, daemonImpl,
+    server = LlapUtil.startProtocolServer(0, numHandlers, bindAddress , conf, 
daemonImpl,
         LlapPluginProtocolPB.class, secretManager, new 
LlapPluginPolicyProvider(),
         ConfVars.LLAP_PLUGIN_ACL, ConfVars.LLAP_PLUGIN_ACL_DENY);
+    LOG.info("Starting the plugin endpoint on port " + 
bindAddress.get().getPort());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 93a36c6..97df36e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -399,10 +399,9 @@ public class ExecDriver extends Task<MapredWork> 
implements Serializable, Hadoop
       Utilities.createTmpDirs(job, rWork);
 
       SessionState ss = SessionState.get();
-      if (HiveConf.getVar(job, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
-          && ss != null) {
-        TezSessionState session = ss.getTezSession();
-        TezSessionPoolManager.getInstance().closeIfNotDefault(session, true);
+      // TODO: why is there a TezSession in MR ExecDriver?
+      if (ss != null && HiveConf.getVar(job, 
ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+        TezSessionPoolManager.closeIfNotDefault(ss.getTezSession(), true);
       }
 
       HiveConfUtil.updateJobCredentialProviders(job);

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
new file mode 100644
index 0000000..35d380c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+
+public interface AmPluginNode {
+  public static class AmPluginInfo {
+    public final int amPluginPort;
+    public final Token<JobTokenIdentifier> amPluginToken;
+    public final String amPluginTokenJobId;
+    public final String amHost;
+
+    AmPluginInfo(String amHost, int amPluginPort,
+        Token<JobTokenIdentifier> amPluginToken, String amPluginTokenJobId) {
+      this.amHost = amHost;
+      this.amPluginPort = amPluginPort;
+      this.amPluginToken = amPluginToken;
+      this.amPluginTokenJobId = amPluginTokenJobId;
+    }
+  }
+
+  AmPluginInfo waitForAmPluginInfo(int timeoutMs) throws InterruptedException, 
TimeoutException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
new file mode 100644
index 0000000..d978a25
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback;
+import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
+import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto;
+import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Implements query resource allocation using guaranteed tasks. */
+public class GuaranteedTasksAllocator implements QueryAllocationManager {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      GuaranteedTasksAllocator.class);
+  private final static long CLUSTER_INFO_UPDATE_INTERVAL_MS = 120 * 1000L;
+
+  private final Configuration conf;
+  private final LlapClusterStateForCompile clusterState;
+  private final Thread clusterStateUpdateThread;
+  private final LlapPluginEndpointClient amCommunicator;
+
+  public GuaranteedTasksAllocator(
+      Configuration conf, LlapPluginEndpointClient amCommunicator) {
+    this.conf = conf;
+    this.clusterState = new LlapClusterStateForCompile(conf, 
CLUSTER_INFO_UPDATE_INTERVAL_MS);
+    this.amCommunicator = amCommunicator;
+    this.clusterStateUpdateThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (true) {
+          getExecutorCount(true); // Trigger an update if needed.
+          try {
+            Thread.sleep(CLUSTER_INFO_UPDATE_INTERVAL_MS / 2);
+          } catch (InterruptedException e) {
+            LOG.info("Cluster state update thread was interrupted");
+            return;
+          }
+        }
+      }
+    }, "Cluster State Updater");
+    clusterStateUpdateThread.setDaemon(true);
+  }
+
+  @Override
+  public void start() {
+    clusterStateUpdateThread.start();
+  }
+
+  @Override
+  public void stop() {
+    clusterStateUpdateThread.interrupt(); // Don't wait for the thread.
+  }
+
+  @VisibleForTesting
+  protected int getExecutorCount(boolean allowUpdate) {
+    if (!clusterState.initClusterInfo(allowUpdate)) {
+      LOG.warn("Failed to get LLAP cluster information for "
+          + HiveConf.getTrimmedVar(this.conf, 
ConfVars.LLAP_DAEMON_SERVICE_HOSTS)
+          + "; we may rely on outdated cluster status");
+    }
+    if (!clusterState.hasClusterInfo()) {
+      LOG.error("No cluster information available to allocate; no guaranteed 
tasks will be used");
+      return 0;
+    }
+    int unknownNodes = clusterState.getNodeCountWithUnknownExecutors();
+    if (unknownNodes > 0) {
+      LOG.error("There are " + unknownNodes + " nodes with unknown executor 
count; only " +
+          clusterState.getKnownExecutorCount() + " guaranteed tasks will be 
allocated");
+    }
+    return clusterState.getKnownExecutorCount();
+  }
+
+  @Override
+  public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> 
sessionsToUpdate) {
+    // Do not make a remote call unless we have no information at all.
+    int totalCount = getExecutorCount(false);
+    int totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
+    double lastDelta = 0;
+    for (int i = 0; i < sessionsToUpdate.size(); ++i) {
+      WmTezSession session = sessionsToUpdate.get(i);
+      int intAlloc = -1;
+      if (i + 1 == sessionsToUpdate.size()) {
+        intAlloc = totalToDistribute;
+        // We rely on the caller to supply a reasonable total; we could log a 
warning
+        // if this doesn't match the allocation of the last session beyond 
some threshold.
+      } else {
+        // This ensures we don't create skew, e.g. with 8 ducks and 5 queries 
with simple rounding
+        // we'd produce 2-2-2-2-0 as we round 1.6; whereas adding the last 
delta to the next query
+        // we'd round 1.6-1.2-1.8-1.4-2.0 and thus give out 2-1-2-1-2, as 
intended.
+        // Note that fractions don't have to all be the same like in this 
example.
+        double fraction = session.getClusterFraction();
+        double allocation = fraction * totalCount + lastDelta;
+        double roundedAlloc = Math.round(allocation);
+        lastDelta = allocation - roundedAlloc;
+        if (roundedAlloc < 0) {
+          roundedAlloc = 0; // Can this happen? Delta cannot exceed 0.5.
+        }
+        intAlloc = (int)roundedAlloc;
+      }
+      // Make sure we don't give out more than allowed due to double/rounding 
artifacts.
+      if (intAlloc > totalToDistribute) {
+        intAlloc = totalToDistribute;
+      }
+      totalToDistribute -= intAlloc;
+      // This will only send update if it's necessary.
+      updateSessionAsync(session, intAlloc);
+    }
+  }
+
+  private void updateSessionAsync(final WmTezSession session, final int 
intAlloc) {
+    boolean needsUpdate = session.setSendingGuaranteed(intAlloc);
+    if (!needsUpdate) return;
+    // Note: this assumes that the pattern where the same session object is 
reset with a different
+    //       Tez client is not used. It was used a lot in the past but appears 
to be gone from most
+    //       HS2 session pool paths, and this patch removes the last one 
(reopen).
+    UpdateQueryRequestProto request = UpdateQueryRequestProto
+        .newBuilder().setGuaranteedTaskCount(intAlloc).build();
+    amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new 
UpdateCallback(session));
+  }
+
+  private final class UpdateCallback implements 
ExecuteRequestCallback<UpdateQueryResponseProto> {
+    private final WmTezSession session;
+
+    private UpdateCallback(WmTezSession session) {
+      this.session = session;
+    }
+
+    @Override
+    public void setResponse(UpdateQueryResponseProto response) {
+      int nextUpdate = session.setSentGuaranteed();
+      if (nextUpdate >= 0) {
+        updateSessionAsync(session, nextUpdate);
+      }
+    }
+
+    @Override
+    public void indicateError(Throwable t) {
+      LOG.error("Failed to update guaranteed tasks count for the session " + 
session, t);
+      boolean isOkToFail = session.setFailedToSendGuaranteed();
+      if (isOkToFail) return;
+      // RPC already handles retries, so we will just try to kill the session 
here.
+      // This will cause the current query to fail. We could instead keep 
retrying.
+      try {
+        session.destroy();
+      } catch (Exception e) {
+        LOG.error("Failed to kill the session " + session);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClient.java
new file mode 100644
index 0000000..86c8a2d
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClient.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback;
+import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
+import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+
+public interface LlapPluginEndpointClient {
+  void sendUpdateQuery(UpdateQueryRequestProto request, AmPluginNode node,
+      ExecuteRequestCallback<UpdateQueryResponseProto> callback);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
new file mode 100644
index 0000000..45c3e38
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.AsyncPbRpcProxy;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.impl.LlapPluginProtocolClientImpl;
+import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
+import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto;
+import org.apache.hadoop.hive.llap.protocol.LlapPluginProtocolPB;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+
+public class LlapPluginEndpointClientImpl extends
+    AsyncPbRpcProxy<LlapPluginProtocolPB, JobTokenIdentifier>
+    implements LlapPluginEndpointClient {
+
+  public LlapPluginEndpointClientImpl(
+      Configuration conf, Token<JobTokenIdentifier> token, int expectedNodes) {
+    // A single concurrent request per node is currently hardcoded. The node 
includes a port number
+    // so different AMs on the same host count as different nodes; we only 
have one request type,
+    // and it is not useful to send more than one in parallel.
+    super(LlapPluginEndpointClientImpl.class.getSimpleName(),
+        HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_CLIENT_NUM_THREADS), 
conf, token,
+        HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS,
+            TimeUnit.MILLISECONDS),
+        HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+            TimeUnit.MILLISECONDS), expectedNodes, 1);
+  }
+
+  @Override
+  protected LlapPluginProtocolPB createProtocolImpl(Configuration conf,
+      String hostname, int port, UserGroupInformation ugi,
+      RetryPolicy retryPolicy, SocketFactory socketFactory) {
+    return new LlapPluginProtocolClientImpl(conf, hostname, port, retryPolicy, 
socketFactory, ugi);
+  }
+
+  @Override
+  protected void shutdownProtocolImpl(LlapPluginProtocolPB proxy) {
+    // Nothing to do.
+  }
+
+  @Override
+  protected String getTokenUser(Token<JobTokenIdentifier> token) {
+    try {
+      return token.decodeIdentifier().getJobId().toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  /* (non-Javadoc)
+   * @see 
org.apache.hadoop.hive.ql.exec.tez.LlapPluginEndpointClient#sendUpdateQuery(org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto,
 java.lang.String, int, org.apache.hadoop.security.token.Token, 
org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback)
+   */
+  @Override
+  public void sendUpdateQuery(UpdateQueryRequestProto request,
+      AmPluginNode node, ExecuteRequestCallback<UpdateQueryResponseProto> 
callback) {
+    queueRequest(new SendUpdateQueryCallable(node, request, callback));
+  }
+
+  private class SendUpdateQueryCallable
+      extends CallableRequest<UpdateQueryRequestProto, 
UpdateQueryResponseProto> {
+    private AmPluginNode node;
+    private AmPluginInfo info;
+
+    protected SendUpdateQueryCallable(AmPluginNode node, 
UpdateQueryRequestProto request,
+        ExecuteRequestCallback<UpdateQueryResponseProto> callback) {
+      super(request, callback);
+      this.node = node;
+    }
+
+    @Override
+    public LlapNodeId getNodeId() throws InterruptedException, 
TimeoutException {
+      ensureInfo();
+      return LlapNodeId.getInstance(info.amHost, info.amPluginPort);
+    }
+
+    @Override
+    public UpdateQueryResponseProto call() throws Exception {
+      ensureInfo();
+      LlapNodeId nodeId = LlapNodeId.getInstance(info.amHost, 
info.amPluginPort);
+      return getProxy(nodeId, info.amPluginToken).updateQuery(null, request);
+    }
+
+    private void ensureInfo() throws InterruptedException, TimeoutException {
+      if (info != null) return;
+      info = node.waitForAmPluginInfo(0); // Don't wait - should already be 
initialized.
+      if (info == null) {
+        throw new AssertionError("A request was created without AM plugin info 
for " + node);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
new file mode 100644
index 0000000..a326db3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.List;
+
+interface QueryAllocationManager {
+  void start();
+  void stop();
+  /**
+   * Updates the session allocations asynchoronously.
+   * @param totalMaxAlloc The total maximum fraction of the cluster to 
allocate. Used to
+   *                      avoid various artifacts, esp. with small numbers and 
double weirdness.
+   * @param sessions Sessions to update based on their allocation fraction.
+   */
+  void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessions);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 6e8122d..4f2997b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -17,56 +17,55 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
-
-import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
-import org.apache.hadoop.hive.registry.impl.TezAmInstance;
-import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.registry.impl.TezAmInstance;
+import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Distinct from TezSessionPool manager in that it implements a session pool, 
and nothing else.
  */
-class TezSessionPool {
+class TezSessionPool<SessionType extends TezSessionPoolSession> {
   private static final Logger LOG = 
LoggerFactory.getLogger(TezSessionPool.class);
 
   /** A queue for initial sessions that have not been started yet. */
-  private final Queue<TezSessionPoolSession> initialSessions =
-      new ConcurrentLinkedQueue<TezSessionPoolSession>();
+  private final Queue<SessionType> initialSessions =
+      new ConcurrentLinkedQueue<SessionType>();
 
   private final HiveConf initConf;
-  private final BlockingDeque<TezSessionPoolSession> defaultQueuePool;
+
+  // TODO: eventually, this will need to support resize. That would probably 
require replacement
+  //       with a RW lock, a semaphore and linked list.
+  private BlockingDeque<SessionType> defaultQueuePool;
 
   private final String amRegistryName;
   private final TezAmRegistryImpl amRegistry;
 
-  private final ConcurrentHashMap<String, TezSessionPoolSession> bySessionId =
+  private final ConcurrentHashMap<String, SessionType> bySessionId =
       new ConcurrentHashMap<>();
 
 
   TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean 
useAmRegistryIfPresent) {
     this.initConf = initConf;
     assert numSessionsTotal > 0;
-    defaultQueuePool = new 
LinkedBlockingDeque<TezSessionPoolSession>(numSessionsTotal);
+    defaultQueuePool = new LinkedBlockingDeque<>(numSessionsTotal);
     this.amRegistry = useAmRegistryIfPresent ? 
TezAmRegistryImpl.create(initConf, true) : null;
     this.amRegistryName = amRegistry == null ? null : 
amRegistry.getRegistryName();
   }
@@ -86,7 +85,7 @@ class TezSessionPool {
     Preconditions.checkArgument(threadCount > 0);
     if (threadCount == 1) {
       while (true) {
-        TezSessionPoolSession session = initialSessions.poll();
+        SessionType session = initialSessions.poll();
         if (session == null) break;
         startInitialSession(session);
       }
@@ -100,7 +99,7 @@ class TezSessionPool {
             SessionState.setCurrentSessionState(parentSessionState);
           }
           while (true) {
-            TezSessionPoolSession session = initialSessions.poll();
+            SessionType session = initialSessions.poll();
             if (session == null) break;
             if (firstError.get() != null) break; // Best-effort.
             try {
@@ -130,38 +129,46 @@ class TezSessionPool {
     }
   }
 
-  void addInitialSession(TezSessionPoolSession session) {
+  void addInitialSession(SessionType session) {
     initialSessions.add(session);
   }
 
-  TezSessionState getSession() throws Exception {
+  SessionType getSession() throws Exception {
     while (true) {
-      TezSessionPoolSession result = defaultQueuePool.take();
+      SessionType result = defaultQueuePool.take();
       if (result.tryUse()) return result;
       LOG.info("Couldn't use a session [" + result + "]; attempting another 
one");
     }
   }
 
-  void returnSession(TezSessionPoolSession session) throws Exception {
-    // TODO: should this be in pool, or pool manager? Probably common to all 
the use cases.
+  void returnSession(SessionType session) throws Exception {
+    // Make sure that if the session is returned to the pool, it doesn't live 
in the global.
     SessionState sessionState = SessionState.get();
     if (sessionState != null) {
       sessionState.setTezSession(null);
     }
-    if (session.returnAfterUse()) {
+    if (session.stopUsing()) {
       defaultQueuePool.putFirst(session);
     }
   }
 
-  void replaceSession(
-      TezSessionPoolSession oldSession, TezSessionPoolSession newSession) 
throws Exception {
+  void replaceSession(SessionType oldSession, SessionType newSession,
+      boolean keepTmpDir, String[] additionalFilesArray, HiveConf conf) throws 
Exception {
     // Retain the stuff from the old session.
     // Re-setting the queue config is an old hack that we may remove in future.
     Path scratchDir = oldSession.getTezScratchDir();
-    Set<String> additionalFiles = oldSession.getAdditionalFilesNotFromConf();
     String queueName = oldSession.getQueueName();
+    Set<String> additionalFiles = null;
+    if (additionalFilesArray != null) {
+      additionalFiles = new HashSet<>();
+      for (String file : additionalFilesArray) {
+        additionalFiles.add(file);
+      }
+    } else {
+      additionalFiles = oldSession.getAdditionalFilesNotFromConf();
+    }
     try {
-      oldSession.close(false);
+      oldSession.close(keepTmpDir);
       boolean wasRemoved = defaultQueuePool.remove(oldSession);
       if (!wasRemoved) {
         LOG.error("Old session was closed but it was not in the pool", 
oldSession);
@@ -179,18 +186,18 @@ class TezSessionPool {
     }
   }
 
-  private void startInitialSession(TezSessionPoolSession session) throws 
Exception {
+  private void startInitialSession(SessionType session) throws Exception {
     boolean isUsable = session.tryUse();
     if (!isUsable) throw new IOException(session + " is not usable at pool 
startup");
     session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, 
session.getQueueName());
     configureAmRegistry(session);
     session.open();
-    if (session.returnAfterUse()) {
+    if (session.stopUsing()) {
       defaultQueuePool.put(session);
     }
   }
 
-  private void configureAmRegistry(TezSessionPoolSession session) {
+  private void configureAmRegistry(SessionType session) {
     if (amRegistryName != null) {
       bySessionId.put(session.getSessionId(), session);
       HiveConf conf = session.getConf();
@@ -206,12 +213,16 @@ class TezSessionPool {
     implements ServiceInstanceStateChangeListener<TezAmInstance> {
 
     @Override
-    public void onCreate(TezAmInstance serviceInstance) {
-      String sessionId = serviceInstance.getSessionId();
-      TezSessionPoolSession session = bySessionId.get(sessionId);
-      LOG.warn("AM for " + sessionId + " has registered; updating [" + session
-          + "] with an endpoint at " + serviceInstance.getPluginPort());
-      // TODO: actually update the session once WM is committed
+    public void onCreate(TezAmInstance si) {
+      String sessionId = si.getSessionId();
+      SessionType session = bySessionId.get(sessionId);
+      if (session != null) {
+        LOG.info("AM for " + sessionId + " has registered; updating [" + 
session
+            + "] with an endpoint at " + si.getPluginPort());
+        session.updateFromRegistry(si);
+      } else {
+        LOG.warn("AM for an unknown " + sessionId + " has registered; 
ignoring");
+      }
     }
 
     @Override

Reply via email to