Repository: hive
Updated Branches:
  refs/heads/master f3cb704a5 -> ac24537f2


http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 9f72155..538d745 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.registry.impl.TezAmInstance;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.HashSet;
 
 import java.util.concurrent.Semaphore;
 import java.util.ArrayList;
@@ -32,13 +32,16 @@ import java.util.Set;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import 
org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.OpenSessionTracker;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is for managing multiple tez sessions particularly when
  * HiveServer2 is being used to submit queries.
@@ -47,7 +50,7 @@ import org.apache.hadoop.security.UserGroupInformation;
  * on that queue and assigned to the session state.
  */
 public class TezSessionPoolManager
-  implements SessionExpirationTracker.RestartImpl, OpenSessionTracker {
+  implements SessionExpirationTracker.RestartImpl, Manager {
 
   private enum CustomQueueAllowed {
     TRUE,
@@ -64,7 +67,7 @@ public class TezSessionPoolManager
   private int numConcurrentLlapQueries = -1;
   private CustomQueueAllowed customQueueAllowed = CustomQueueAllowed.TRUE;
 
-  private TezSessionPool defaultSessionPool;
+  private TezSessionPool<TezSessionPoolSession> defaultSessionPool;
   private SessionExpirationTracker expirationTracker;
   private RestrictedConfigChecker restrictedConfig;
 
@@ -114,9 +117,8 @@ public class TezSessionPoolManager
     int numSessions = 
conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
     int numSessionsTotal = numSessions * (defaultQueueList.length - 
emptyNames);
     if (numSessionsTotal > 0) {
-      // TODO: this can be enabled to test. Will only be used in WM case for 
now.
       boolean enableAmRegistry = false;
-      defaultSessionPool = new TezSessionPool(initConf, numSessionsTotal, 
enableAmRegistry);
+      defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, 
enableAmRegistry);
     }
 
     numConcurrentLlapQueries = 
conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
@@ -249,12 +251,16 @@ public class TezSessionPoolManager
     return retTezSessionState;
   }
 
-  public void returnSession(TezSessionState tezSessionState, boolean llap)
-      throws Exception {
+  @Override
+  public void returnAfterUse(TezSessionPoolSession session) throws Exception {
+    returnSession(session);
+  }
+
+  void returnSession(TezSessionState tezSessionState) throws Exception {
     // Ignore the interrupt status while returning the session, but set it back
     // on the thread in case anything else needs to deal with it.
     boolean isInterrupted = Thread.interrupted();
-
+    boolean llap = tezSessionState.getLegacyLlapMode();
     try {
       if (isInterrupted) {
         LOG.info("returnSession invoked with interrupt status set");
@@ -262,6 +268,7 @@ public class TezSessionPoolManager
       if (llap && (this.numConcurrentLlapQueries > 0)) {
         llapQueue.release();
       }
+      tezSessionState.setLegacyLlapMode(false);
       if (tezSessionState.isDefault() &&
           tezSessionState instanceof TezSessionPoolSession) {
         LOG.info("The session " + tezSessionState.getSessionId()
@@ -377,6 +384,7 @@ public class TezSessionPoolManager
     }
 
     if (canWorkWithSameSession(session, conf)) {
+      session.setLegacyLlapMode(llap);
       return session;
     }
 
@@ -384,23 +392,33 @@ public class TezSessionPoolManager
       closeIfNotDefault(session, false);
     }
 
-    return getSession(conf, doOpen);
+    session = getSession(conf, doOpen);
+    session.setLegacyLlapMode(llap);
+    return session;
   }
 
   /** Reopens the session that was found to not be running. */
-  public void reopenSession(TezSessionState sessionState, Configuration conf) 
throws Exception {
+  public TezSessionState reopenSession(TezSessionState sessionState,
+      Configuration conf, String[] additionalFiles) throws Exception {
     HiveConf sessionConf = sessionState.getConf();
     if (sessionState.getQueueName() != null
         && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
       sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, 
sessionState.getQueueName());
     }
     Set<String> oldAdditionalFiles = 
sessionState.getAdditionalFilesNotFromConf();
+    if ((oldAdditionalFiles == null || oldAdditionalFiles.isEmpty())
+        && (additionalFiles != null)) {
+      oldAdditionalFiles = new HashSet<>();
+      for (String file : additionalFiles) {
+        oldAdditionalFiles.add(file);
+      }
+    }
     // TODO: close basically resets the object to a bunch of nulls.
     //       We should ideally not reuse the object because it's pointless and 
error-prone.
-    // Close the old one, but keep the tmp files around.
     sessionState.close(true);
     // TODO: should we reuse scratchDir too?
     sessionState.open(oldAdditionalFiles, null);
+    return sessionState;
   }
 
   public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception {
@@ -422,8 +440,8 @@ public class TezSessionPoolManager
       LOG.warn("Pool session has a null queue: " + oldSession);
     }
     TezSessionPoolSession newSession = createAndInitSession(
-        queueName, oldSession.isDefault(), oldSession.getConf());
-    defaultSessionPool.replaceSession(oldSession, newSession);
+      queueName, oldSession.isDefault(), oldSession.getConf());
+    defaultSessionPool.replaceSession(oldSession, newSession, false, null, 
null);
   }
 
   /** Called by TezSessionPoolSession when opened. */
@@ -449,4 +467,15 @@ public class TezSessionPoolManager
   public SessionExpirationTracker getExpirationTracker() {
     return expirationTracker;
   }
+
+  @Override
+  public TezSessionPoolSession reopen(
+      TezSessionPoolSession session, Configuration conf, String[] 
inputOutputJars) {
+    return reopen(session, conf, inputOutputJars);
+  }
+
+  @Override
+  public void destroy(TezSessionPoolSession session) throws Exception {
+    destroySession(session);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 8ecdbbf..4488c12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -18,6 +18,13 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import org.apache.hadoop.hive.registry.impl.TezAmInstance;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+
+import org.apache.hadoop.conf.Configuration;
+
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Collection;
@@ -45,21 +52,26 @@ import com.google.common.annotations.VisibleForTesting;
 class TezSessionPoolSession extends TezSessionState {
   private static final int STATE_NONE = 0, STATE_IN_USE = 1, STATE_EXPIRED = 2;
 
-  interface OpenSessionTracker {
+  interface Manager {
     void registerOpenSession(TezSessionPoolSession session);
     void unregisterOpenSession(TezSessionPoolSession session);
+    void returnAfterUse(TezSessionPoolSession session) throws Exception;
+    TezSessionState reopen(TezSessionPoolSession session, Configuration conf,
+        String[] inputOutputJars) throws Exception;
+    void destroy(TezSessionPoolSession session) throws Exception;
   }
 
   private final AtomicInteger sessionState = new AtomicInteger(STATE_NONE);
   private Long expirationNs;
-  private final OpenSessionTracker parent;
+  private final Manager parent;
   private final SessionExpirationTracker expirationTracker;
 
-  public TezSessionPoolSession(String sessionId, OpenSessionTracker parent,
-      SessionExpirationTracker expirationTracker, HiveConf conf) {
+
+  public TezSessionPoolSession(String sessionId, Manager parent,
+      SessionExpirationTracker tracker, HiveConf conf) {
     super(sessionId, conf);
     this.parent = parent;
-    this.expirationTracker = expirationTracker;
+    this.expirationTracker = tracker;
   }
 
   void setExpirationNs(long expirationNs) {
@@ -71,7 +83,7 @@ class TezSessionPoolSession extends TezSessionState {
   }
 
   @Override
-  public void close(boolean keepTmpDir) throws Exception {
+  void close(boolean keepTmpDir) throws Exception {
     try {
       super.close(keepTmpDir);
     } finally {
@@ -119,12 +131,7 @@ class TezSessionPoolSession extends TezSessionState {
     }
   }
 
-  /**
-   * Notifies the session that it's no longer in use. If the session has 
expired while in use,
-   * this method will take care of the expiration.
-   * @return True if the session was returned, false if it was restarted.
-   */
-  public boolean returnAfterUse() throws Exception {
+  boolean stopUsing() throws Exception {
     int finalState = shouldExpire() ? STATE_EXPIRED : STATE_NONE;
     if (!sessionState.compareAndSet(STATE_IN_USE, finalState)) {
       throw new AssertionError("Unexpected state change; currently " + 
sessionState.get());
@@ -155,4 +162,28 @@ class TezSessionPoolSession extends TezSessionState {
   private boolean shouldExpire() {
     return expirationNs != null && (System.nanoTime() - expirationNs) >= 0;
   }
+
+  @Override
+  public void returnToSessionManager() throws Exception {
+    parent.returnAfterUse(this);
+  }
+
+  @Override
+  public TezSessionState reopen(
+      Configuration conf, String[] inputOutputJars) throws Exception {
+    return parent.reopen(this, conf, inputOutputJars);
+  }
+
+  @Override
+  public void destroy() throws Exception {
+    parent.destroy(this);
+  }
+
+  boolean isOwnedBy(Manager parent) {
+    return this.parent == parent;
+  }
+
+  void updateFromRegistry(TezAmInstance si) {
+    // Nothing to do.
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 170de21..e5850f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -111,6 +111,7 @@ public class TezSessionState {
   private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
   private final Set<LocalResource> localizedResources = new 
HashSet<LocalResource>();
   private boolean doAsEnabled;
+  private boolean isLegacyLlapMode;
 
   /**
    * Constructor. We do not automatically connect, because we only want to
@@ -496,13 +497,14 @@ public class TezSessionState {
 
   /**
    * Close a tez session. Will cleanup any tez/am related resources. After 
closing a session no
-   * further DAGs can be executed against it.
+   * further DAGs can be executed against it. Only called by session 
management classes; some
+   * sessions should not simply be closed by users - e.g. pool sessions need 
to be restarted.
    *
    * @param keepTmpDir
    *          whether or not to remove the scratch dir at the same time.
    * @throws Exception
    */
-  public void close(boolean keepTmpDir) throws Exception {
+  void close(boolean keepTmpDir) throws Exception {
     if (session != null) {
       LOG.info("Closing Tez Session");
       closeClient(session);
@@ -726,4 +728,27 @@ public class TezSessionState {
     } while (!ownerThread.compareAndSet(null, newName));
   }
 
+  void setLegacyLlapMode(boolean value) {
+    this.isLegacyLlapMode = value;
+  }
+
+  boolean getLegacyLlapMode() {
+    return this.isLegacyLlapMode;
+  }
+
+  public void returnToSessionManager() throws Exception {
+    // By default, TezSessionPoolManager handles this for both pool and 
non-pool session.
+    TezSessionPoolManager.getInstance().returnSession(this);
+  }
+
+  public TezSessionState reopen(
+      Configuration conf, String[] inputOutputJars) throws Exception {
+    // By default, TezSessionPoolManager handles this for both pool and 
non-pool session.
+    return TezSessionPoolManager.getInstance().reopenSession(this, conf, 
inputOutputJars);
+  }
+
+  public void destroy() throws Exception {
+    // By default, TezSessionPoolManager handles this for both pool and 
non-pool session.
+    TezSessionPoolManager.getInstance().destroySession(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index e6e236d..29d6fe6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -143,12 +143,20 @@ public class TezTask extends Task<TezWork> {
 
       // Need to remove this static hack. But this is the way currently to get 
a session.
       SessionState ss = SessionState.get();
+      // Note: given that we return pool sessions to the pool in the finally 
block below, and that
+      //       we need to set the global to null to do that, this "reuse" may 
be pointless.
       session = ss.getTezSession();
       if (session != null && !session.isOpen()) {
         LOG.warn("The session: " + session + " has not been opened");
       }
-      session = TezSessionPoolManager.getInstance().getSession(
-          session, conf, false, getWork().getLlapMode());
+      if (WorkloadManager.isInUse(ss.getConf())) {
+        // TODO: in future, we may also pass getUserIpAddress.
+        // Note: for now this will just block to wait for a session based on 
parallelism.
+        session = WorkloadManager.getInstance().getSession(session, 
ss.getUserName(), conf);
+      } else {
+        session = TezSessionPoolManager.getInstance().getSession(
+            session, conf, false, getWork().getLlapMode());
+      }
       ss.setTezSession(session);
       try {
         // jobConf will hold all the configuration for hadoop, tez, and hive
@@ -230,8 +238,7 @@ public class TezTask extends Task<TezWork> {
       } finally {
         // We return this to the pool even if it's unusable; reopen is 
supposed to handle this.
         try {
-          TezSessionPoolManager.getInstance()
-              .returnSession(session, getWork().getLlapMode());
+          session.returnToSessionManager();
         } catch (Exception e) {
           LOG.error("Failed to return session: {} to pool", session, e);
           throw e;
@@ -547,7 +554,9 @@ public class TezTask extends Task<TezWork> {
       } catch (SessionNotRunning nr) {
         console.printInfo("Tez session was closed. Reopening...");
 
-        TezSessionPoolManager.getInstance().reopenSession(sessionState, conf);
+        // close the old one, but keep the tmp files around
+        // conf is passed in only for the case when session conf is null 
(tests and legacy paths?)
+        sessionState = sessionState.reopen(conf, inputOutputJars);
         console.printInfo("Session re-established.");
 
         dagClient = sessionState.getSession().submitDAG(dag);
@@ -557,11 +566,11 @@ public class TezTask extends Task<TezWork> {
       try {
         console.printInfo("Dag submit failed due to " + e.getMessage() + " 
stack trace: "
             + Arrays.toString(e.getStackTrace()) + " retrying...");
-        TezSessionPoolManager.getInstance().reopenSession(sessionState, conf);
+        sessionState = sessionState.reopen(conf, inputOutputJars);
         dagClient = sessionState.getSession().submitDAG(dag);
       } catch (Exception retryException) {
         // we failed to submit after retrying. Destroy session and bail.
-        TezSessionPoolManager.getInstance().destroySession(sessionState);
+        sessionState.destroy();
         throw retryException;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
new file mode 100644
index 0000000..00501ee
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.registry.impl.TezAmInstance;
+
+public class WmTezSession extends TezSessionPoolSession implements 
AmPluginNode {
+  private String poolName;
+  private double clusterFraction;
+
+  private final Object amPluginInfoLock = new Object();
+  private AmPluginInfo amPluginInfo = null;
+
+
+  /** The actual state of the guaranteed task, and the update state, for the 
session. */
+  // Note: hypothetically, a generic WM-aware-session should not know about 
guaranteed tasks.
+  //       We should have another subclass for a 
WM-aware-session-implemented-using-ducks.
+  //       However, since this is the only type of WM for now, this can live 
here.
+  private final static class ActualWmState {
+    // All accesses synchronized on the object itself. Could be replaced with 
CAS.
+    int sending = -1, sent = -1, target = 0;
+  }
+  private final ActualWmState actualState = new ActualWmState();
+
+  public WmTezSession(String sessionId, Manager parent,
+      SessionExpirationTracker expiration, HiveConf conf) {
+    super(sessionId, parent, expiration, conf);
+  }
+
+  @Override
+  public AmPluginInfo waitForAmPluginInfo(int timeoutMs)
+      throws InterruptedException, TimeoutException {
+    synchronized (amPluginInfoLock) {
+      if (amPluginInfo == null) {
+        amPluginInfoLock.wait(timeoutMs);
+        if (amPluginInfo == null) {
+          throw new TimeoutException("No plugin information for " + 
getSessionId());
+        }
+      }
+      return amPluginInfo;
+    }
+  }
+
+  @Override
+  void updateFromRegistry(TezAmInstance si) {
+    synchronized (amPluginInfoLock) {
+      this.amPluginInfo = new AmPluginInfo(si.getHost(), si.getPluginPort(),
+          si.getPluginToken(), si.getPluginTokenJobId());
+      amPluginInfoLock.notifyAll();
+    }
+  }
+
+  public AmPluginInfo getAmPluginInfo() {
+    return amPluginInfo; // Only has final fields, no artifacts from the 
absence of sync.
+  }
+
+  void setPoolName(String poolName) {
+    this.poolName = poolName;
+  }
+
+  String getPoolName() {
+    return poolName;
+  }
+
+  void setClusterFraction(double fraction) {
+    this.clusterFraction = fraction;
+  }
+
+  double getClusterFraction() {
+    return this.clusterFraction;
+  }
+
+  boolean setSendingGuaranteed(int intAlloc) {
+    assert intAlloc >= 0;
+    synchronized (actualState) {
+      actualState.target = intAlloc;
+      if (actualState.sending != -1) return false; // The sender will take 
care of this.
+      if (actualState.sent == intAlloc) return false; // The value didn't 
change.
+      actualState.sending = intAlloc;
+      return true;
+    }
+  }
+
+  int setSentGuaranteed() {
+    // Only one send can be active at the same time.
+    synchronized (actualState) {
+      assert actualState.sending != -1;
+      actualState.sent = actualState.sending;
+      actualState.sending = -1;
+      return (actualState.sent == actualState.target) ? -1 : 
actualState.target;
+    }
+  }
+
+  boolean setFailedToSendGuaranteed() {
+    synchronized (actualState) {
+      assert actualState.sending != -1;
+      actualState.sending = -1;
+      // It's ok to skip a failed message if the target has changed back to 
the old value.
+      return (actualState.sent == actualState.target);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
new file mode 100644
index 0000000..288d705
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.concurrent.TimeoutException;
+
+import java.util.concurrent.TimeUnit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+
+/** Workload management entry point for HS2. */
+public class WorkloadManager
+    implements TezSessionPoolSession.Manager, 
SessionExpirationTracker.RestartImpl {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WorkloadManager.class);
+  // TODO: this is a temporary setting that will go away, so it's not in 
HiveConf.
+  public static final String TEST_WM_CONFIG = "hive.test.workload.management";
+
+  private final HiveConf conf;
+  private final TezSessionPool<WmTezSession> sessions;
+  private final SessionExpirationTracker expirationTracker;
+  private final RestrictedConfigChecker restrictedConfig;
+  private final QueryAllocationManager allocationManager;
+  private final String yarnQueue;
+  // TODO: it's not clear that we need to track this - unlike PoolManager we 
don't have non-pool
+  //       sessions, so the pool itself could internally track the sessions it 
gave out, since
+  //       calling close on an unopened session is probably harmless.
+  private final IdentityHashMap<TezSessionPoolSession, Boolean> openSessions =
+      new IdentityHashMap<>();
+  /** Sessions given out (i.e. between get... and return... calls), separated 
by Hive pool. */
+  private final ReentrantReadWriteLock poolsLock = new 
ReentrantReadWriteLock();
+  private final HashMap<String, PoolState> pools = new HashMap<>();
+  private final int amRegistryTimeoutMs;
+
+  private static class PoolState {
+    // Add stuff here as WM is implemented.
+    private final Object lock = new Object();
+    private final List<WmTezSession> sessions = new ArrayList<>();
+  }
+
+  // TODO: this is temporary before HiveServerEnvironment is merged.
+  private static volatile WorkloadManager INSTANCE;
+  public static WorkloadManager getInstance() {
+    WorkloadManager wm = INSTANCE;
+    assert wm != null;
+    return wm;
+  }
+
+  public static boolean isInUse(Configuration conf) {
+    return INSTANCE != null && conf.getBoolean(TEST_WM_CONFIG, false);
+  }
+
+  /** Called once, when HS2 initializes. */
+  public static WorkloadManager create(String yarnQueue, HiveConf conf) {
+    assert INSTANCE == null;
+    Token<JobTokenIdentifier> amsToken = createAmsToken();
+    // We could derive the expected number of AMs to pass in.
+    LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, 
amsToken, -1);
+    QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm);
+    // TODO: Hardcode one session for now; initial policies should be passed 
in.
+    return (INSTANCE = new WorkloadManager(yarnQueue, conf, 1, qam, amsToken));
+  }
+
+  private static Token<JobTokenIdentifier> createAmsToken() {
+    if (!UserGroupInformation.isSecurityEnabled()) return null;
+    // This application ID is completely bogus.
+    ApplicationId id = ApplicationId.newInstance(
+        System.nanoTime(), (int)(System.nanoTime() % 100000));
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new 
Text(id.toString()));
+    JobTokenSecretManager jobTokenManager = new JobTokenSecretManager();
+    Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, 
jobTokenManager);
+    sessionToken.setService(identifier.getJobId());
+    return sessionToken;
+  }
+
+  @VisibleForTesting
+  WorkloadManager(String yarnQueue, HiveConf conf, int numSessions,
+      QueryAllocationManager qam, Token<JobTokenIdentifier> amsToken) {
+    this.yarnQueue = yarnQueue;
+    this.conf = conf;
+    initializeHivePools();
+
+    this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar(
+        conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, 
TimeUnit.MILLISECONDS);
+    sessions = new TezSessionPool<>(conf, numSessions, true);
+    restrictedConfig = new RestrictedConfigChecker(conf);
+    allocationManager = qam;
+    // Only creates the expiration tracker if expiration is configured.
+    expirationTracker = SessionExpirationTracker.create(conf, this);
+    for (int i = 0; i < numSessions; i++) {
+      sessions.addInitialSession(createSession());
+    }
+  }
+
+  private void initializeHivePools() {
+    // TODO: real implementation
+    poolsLock.writeLock().lock();
+    try {
+      pools.put("llap", new PoolState());
+    } finally {
+      poolsLock.writeLock().unlock();
+    }
+  }
+
+  public TezSessionState getSession(
+      TezSessionState session, String userName, HiveConf conf) throws 
Exception {
+    validateConfig(conf);
+    String poolName = mapSessionToPoolName(userName);
+    // TODO: do query parallelism enforcement here based on the policies and 
pools.
+    while (true) {
+      WmTezSession result = checkSessionForReuse(session);
+      // TODO: when proper AM management is implemented, we should call 
tryGet... here, because the
+      //       parallelism will be enforced here, and pool would always have a 
session for us.
+      result = (result == null ? sessions.getSession() : result);
+      result.setQueueName(yarnQueue);
+      result.setPoolName(poolName);
+      if (!ensureAmIsRegistered(result)) continue; // Try another.
+      redistributePoolAllocations(poolName, result, null);
+      return result;
+    }
+  }
+
+  @VisibleForTesting
+  protected boolean ensureAmIsRegistered(WmTezSession session) throws 
Exception {
+    // Make sure AM is ready to use and registered with AM registry.
+    try {
+      session.waitForAmPluginInfo(amRegistryTimeoutMs);
+    } catch (TimeoutException ex) {
+      LOG.error("Timed out waiting for AM registry information for " + 
session.getSessionId());
+      session.destroy();
+      return false;
+    }
+    return true;
+  }
+
+  private void redistributePoolAllocations(
+      String poolName, WmTezSession sessionToAdd, WmTezSession 
sessionToRemove) {
+    List<WmTezSession> sessionsToUpdate = null;
+    double totalAlloc = 0;
+    assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName());
+    assert sessionToRemove == null || 
poolName.equals(sessionToRemove.getPoolName());
+    poolsLock.readLock().lock();
+    try {
+      PoolState pool = pools.get(poolName);
+      synchronized (pool.lock) {
+        // This should be a 2nd order fn but it's too much pain in Java for 
one LOC.
+        if (sessionToAdd != null) {
+          pool.sessions.add(sessionToAdd);
+        }
+        if (sessionToRemove != null) {
+          if (!pool.sessions.remove(sessionToRemove)) {
+            LOG.error("Session " + sessionToRemove + " could not be removed 
from the pool");
+          }
+          sessionToRemove.setClusterFraction(0);
+        }
+        totalAlloc = updatePoolAllocations(pool.sessions);
+        sessionsToUpdate = new ArrayList<>(pool.sessions);
+      }
+    } finally {
+      poolsLock.readLock().unlock();
+    }
+    allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate);
+  }
+
+  private WmTezSession checkSessionForReuse(TezSessionState session) throws 
Exception {
+    if (session == null) return null;
+    WmTezSession result = null;
+    if (session instanceof WmTezSession) {
+      result = (WmTezSession) session;
+      if (result.isOwnedBy(this)) {
+        return result;
+      }
+      // TODO: this should never happen, at least for now. Throw?
+      LOG.warn("Attempting to reuse a session not belonging to us: " + result);
+      result.returnToSessionManager();
+      return null;
+    }
+    LOG.warn("Attempting to reuse a non-WM session for workload management:" + 
session);
+    if (session instanceof TezSessionPoolSession) {
+      session.returnToSessionManager();
+    } else {
+      session.close(false); // This is a non-pool session, get rid of it.
+    }
+    return null;
+  }
+
+  private double updatePoolAllocations(List<WmTezSession> sessions) {
+    // TODO: real implementation involving in-the-pool policy interface, etc.
+    double allocation = 1.0 / sessions.size();
+    for (WmTezSession session : sessions) {
+      session.setClusterFraction(allocation);
+    }
+    return 1.0;
+  }
+
+  private String mapSessionToPoolName(String userName) {
+    // TODO: real implementation, probably calling into another class 
initialized with policies.
+    return "llap";
+  }
+
+  private void validateConfig(HiveConf conf) throws HiveException {
+    String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
+    if ((queueName != null) && !queueName.isEmpty()) {
+      LOG.warn("Ignoring " + TezConfiguration.TEZ_QUEUE_NAME + "=" + 
queueName);
+      conf.set(TezConfiguration.TEZ_QUEUE_NAME, yarnQueue);
+    }
+    if (conf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+      // Should this also just be ignored? Throw for now, doAs unlike queue is 
often set by admin.
+      throw new HiveException(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + " is 
not supported");
+    }
+    if (restrictedConfig != null) {
+      restrictedConfig.validate(conf);
+    }
+  }
+
+  public void start() throws Exception {
+    sessions.startInitialSessions();
+    if (expirationTracker != null) {
+      expirationTracker.start();
+    }
+    allocationManager.start();
+  }
+
+  public void stop() throws Exception {
+    List<TezSessionPoolSession> sessionsToClose = null;
+    synchronized (openSessions) {
+      sessionsToClose = new 
ArrayList<TezSessionPoolSession>(openSessions.keySet());
+    }
+    for (TezSessionState sessionState : sessionsToClose) {
+      sessionState.close(false);
+    }
+    if (expirationTracker != null) {
+      expirationTracker.stop();
+    }
+    allocationManager.stop();
+  }
+
+  private WmTezSession createSession() {
+    WmTezSession session = 
createSessionObject(TezSessionState.makeSessionId());
+    session.setQueueName(yarnQueue);
+    session.setDefault();
+    LOG.info("Created new interactive session " + session.getSessionId());
+    return session;
+  }
+
+  @VisibleForTesting
+  protected WmTezSession createSessionObject(String sessionId) {
+    return new WmTezSession(sessionId, this, expirationTracker, new 
HiveConf(conf));
+  }
+
+  @Override
+  public void returnAfterUse(TezSessionPoolSession session) throws Exception {
+    boolean isInterrupted = Thread.interrupted();
+    try {
+      WmTezSession wmSession = ensureOwnedSession(session);
+      redistributePoolAllocations(wmSession.getPoolName(), null, wmSession);
+      sessions.returnSession((WmTezSession) session);
+    } finally {
+      // Reset the interrupt status.
+      if (isInterrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+
+  /** Closes a running (expired) pool session and reopens it. */
+  @Override
+  public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) 
throws Exception {
+    sessions.replaceSession(ensureOwnedSession(oldSession), createSession(), 
false, null, null);
+  }
+
+  private WmTezSession ensureOwnedSession(TezSessionPoolSession oldSession) {
+    if (!(oldSession instanceof WmTezSession) || !oldSession.isOwnedBy(this)) {
+      throw new AssertionError("Not a WM session " + oldSession);
+    }
+    WmTezSession session = (WmTezSession) oldSession;
+    return session;
+  }
+
+  /** Called by TezSessionPoolSession when opened. */
+  @Override
+  public void registerOpenSession(TezSessionPoolSession session) {
+    synchronized (openSessions) {
+      openSessions.put(session, true);
+    }
+  }
+
+  /** Called by TezSessionPoolSession when closed. */
+  @Override
+  public void unregisterOpenSession(TezSessionPoolSession session) {
+    synchronized (openSessions) {
+      openSessions.remove(session);
+    }
+  }
+
+  @VisibleForTesting
+  public SessionExpirationTracker getExpirationTracker() {
+    return expirationTracker;
+  }
+
+  @Override
+  public TezSessionState reopen(TezSessionPoolSession session, Configuration 
conf,
+      String[] additionalFiles) throws Exception {
+    WmTezSession oldSession = ensureOwnedSession(session), newSession = 
createSession();
+    newSession.setPoolName(oldSession.getPoolName());
+    HiveConf sessionConf = session.getConf();
+    if (sessionConf == null) {
+      LOG.warn("Session configuration is null for " + session);
+      // default queue name when the initial session was created
+      sessionConf = new HiveConf(conf, WorkloadManager.class);
+    }
+    sessions.replaceSession(oldSession, newSession, true, additionalFiles, 
sessionConf);
+    // We are going to immediately give this session out, so ensure AM 
registry.
+    if (!ensureAmIsRegistered(newSession)) {
+      throw new Exception("Session is not usable after reopen");
+    }
+    redistributePoolAllocations(oldSession.getPoolName(), newSession, 
oldSession);
+    return newSession;
+  }
+
+  @Override
+  public void destroy(TezSessionPoolSession session) throws Exception {
+    LOG.warn("Closing a pool session because of retry failure.");
+    // We never want to lose pool sessions. Replace it instead; al trigger 
duck redistribution.
+    WmTezSession wmSession = ensureOwnedSession(session);
+    closeAndReopenPoolSession(wmSession);
+    redistributePoolAllocations(wmSession.getPoolName(), null, wmSession);
+  }
+
+  protected final HiveConf getConf() {
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 9e2846c..0de9de5 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -81,6 +81,8 @@ public class TezJobMonitor {
       public void run() {
         TezJobMonitor.killRunningJobs();
         try {
+          // TODO: why does this only kill non-default sessions?
+          // Nothing for workload management since that only deals with 
default ones.
           TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
         } catch (Exception e) {
           // ignore

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index 7a02a56..209cf57 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -25,6 +25,7 @@ import com.google.common.cache.CacheBuilder;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Callable;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -38,12 +39,15 @@ import org.slf4j.LoggerFactory;
 public class LlapClusterStateForCompile {
   protected static final Logger LOG = 
LoggerFactory.getLogger(LlapClusterStateForCompile.class);
 
-  private static final long CLUSTER_UPDATE_INTERVAL_NS = 120 * 1000000000L; // 
2 minutes.
-  private Long lastClusterUpdateNs;
-  private Integer noConfigNodeCount, executorCount;
-  private int numExecutorsPerNode = -1;
+  private static final long CLUSTER_UPDATE_INTERVAL_MS = 120 * 1000L; // 2 
minutes.
+  private volatile Long lastClusterUpdateNs;
+  private volatile Integer noConfigNodeCount, executorCount;
+  private volatile int numExecutorsPerNode = -1;
   private LlapRegistryService svc;
   private final Configuration conf;
+  private final long updateIntervalNs;
+  /** Synchronizes the actual update from the cluster; only one thread at a 
time. */
+  private final Object updateInfoLock = new Object();
 
   // It's difficult to impossible to pass global things to compilation, so we 
have a static cache.
   private static final Cache<String, LlapClusterStateForCompile> CACHE =
@@ -57,7 +61,7 @@ public class LlapClusterStateForCompile {
       @Override
       public LlapClusterStateForCompile call() throws Exception {
         LOG.info("Creating cluster info for " + userName + ":" + nodes);
-        return new LlapClusterStateForCompile(conf);
+        return new LlapClusterStateForCompile(conf, 
CLUSTER_UPDATE_INTERVAL_MS);
       }
     };
     try {
@@ -67,8 +71,9 @@ public class LlapClusterStateForCompile {
     }
   }
 
-  private LlapClusterStateForCompile(Configuration conf) {
+  public LlapClusterStateForCompile(Configuration conf, long updateIntervalMs) 
{
     this.conf = conf;
+    this.updateIntervalNs = updateIntervalMs * 1000000L;
   }
 
   public boolean hasClusterInfo() {
@@ -87,46 +92,60 @@ public class LlapClusterStateForCompile {
     return numExecutorsPerNode;
   }
 
-  public synchronized void initClusterInfo() {
-    if (lastClusterUpdateNs != null) {
-      long elapsed = System.nanoTime() - lastClusterUpdateNs;
-      if (elapsed < CLUSTER_UPDATE_INTERVAL_NS) return;
-    }
-    if (svc == null) {
-      try {
-        svc = LlapRegistryService.getClient(conf);
-      } catch (Throwable t) {
-        LOG.info("Cannot create the client; ignoring", t);
-        return; // Don't fail; this is best-effort.
-      }
-    }
-    LlapServiceInstanceSet instances;
-    try {
-      instances = svc.getInstances(10);
-    } catch (IOException e) {
-      LOG.info("Cannot update cluster information; ignoring", e);
-      return; // Don't wait for the cluster if not started; this is 
best-effort.
-    }
-    int executorsLocal = 0, noConfigNodesLocal = 0;
-    for (LlapServiceInstance si : instances.getAll()) {
-      if (si instanceof InactiveServiceInstance) continue; // Shouldn't happen 
in getAll.
-      Map<String, String> props = si.getProperties();
-      if (props == null) {
-        ++noConfigNodesLocal;
-        continue;
+  public boolean initClusterInfo() {
+    return initClusterInfo(true);
+  }
+
+  private boolean isUpdateNeeded(boolean allowUpdate) {
+    Long lastUpdateLocal = lastClusterUpdateNs;
+    if (lastUpdateLocal == null) return true;
+    if (!allowUpdate) return false;
+    long elapsed = System.nanoTime() - lastUpdateLocal;
+    return (elapsed >= updateIntervalNs);
+  }
+
+  public boolean initClusterInfo(boolean allowUpdate) {
+    if (!isUpdateNeeded(allowUpdate)) return true;
+    synchronized (updateInfoLock) {
+      // At this point, no one will take the write lock and update, so we can 
do the last check.
+      if (!isUpdateNeeded(allowUpdate)) return true;
+      if (svc == null) {
+        try {
+          svc = LlapRegistryService.getClient(conf);
+        } catch (Throwable t) {
+          LOG.info("Cannot create the client; ignoring", t);
+          return false; // Don't fail; this is best-effort.
+        }
       }
+      LlapServiceInstanceSet instances;
       try {
-        int numExecutors = 
Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
-        executorsLocal += numExecutors;
-        if (numExecutorsPerNode == -1) {
-          numExecutorsPerNode = numExecutors;
+        instances = svc.getInstances(10);
+      } catch (IOException e) {
+        LOG.info("Cannot update cluster information; ignoring", e);
+        return false; // Don't wait for the cluster if not started; this is 
best-effort.
+      }
+      int executorsLocal = 0, noConfigNodesLocal = 0;
+      for (LlapServiceInstance si : instances.getAll()) {
+        if (si instanceof InactiveServiceInstance) continue; // Shouldn't 
happen in getAll.
+        Map<String, String> props = si.getProperties();
+        if (props == null) {
+          ++noConfigNodesLocal;
+          continue;
+        }
+        try {
+          int numExecutors = 
Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
+          executorsLocal += numExecutors;
+          if (numExecutorsPerNode == -1) {
+            numExecutorsPerNode = numExecutors;
+          }
+        } catch (NumberFormatException e) {
+          ++noConfigNodesLocal;
         }
-      } catch (NumberFormatException e) {
-        ++noConfigNodesLocal;
       }
+      noConfigNodeCount = noConfigNodesLocal;
+      executorCount = executorsLocal;
+      lastClusterUpdateNs = System.nanoTime();
+      return true;
     }
-    lastClusterUpdateNs = System.nanoTime();
-    noConfigNodeCount = noConfigNodesLocal;
-    executorCount = executorsLocal;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
index 4e5d991..59efd43 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 
+import java.util.Collection;
+import org.apache.hadoop.fs.Path;
+
 import java.io.IOException;
 import java.net.URISyntaxException;
 
@@ -35,7 +38,7 @@ import org.apache.tez.dag.api.TezException;
  * use case from hive server 2, we need a session simulation.
  *
  */
-public class SampleTezSessionState extends TezSessionPoolSession {
+public class SampleTezSessionState extends WmTezSession {
 
   private boolean open;
   private final String sessionId;
@@ -43,8 +46,10 @@ public class SampleTezSessionState extends 
TezSessionPoolSession {
   private String user;
   private boolean doAsEnabled;
 
-  public SampleTezSessionState(String sessionId, TezSessionPoolManager parent, 
HiveConf conf) {
-    super(sessionId, parent, parent.getExpirationTracker(), conf);
+  public SampleTezSessionState(
+      String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) {
+    super(sessionId, parent, (parent instanceof TezSessionPoolManager)
+        ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf);
     this.sessionId = sessionId;
     this.hiveConf = conf;
   }
@@ -59,8 +64,7 @@ public class SampleTezSessionState extends 
TezSessionPoolSession {
   }
 
   @Override
-  public void open() throws IOException, LoginException, URISyntaxException,
-      TezException {
+  public void open() throws LoginException, IOException {
     UserGroupInformation ugi = Utils.getUGI();
     user = ugi.getShortUserName();
     this.doAsEnabled = 
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
@@ -68,7 +72,18 @@ public class SampleTezSessionState extends 
TezSessionPoolSession {
   }
 
   @Override
-  public void close(boolean keepTmpDir) throws TezException, IOException {
+  public void open(Collection<String> additionalFiles, Path scratchDir)
+      throws LoginException, IOException {
+    open();
+  }
+
+  @Override
+  public void open(String[] additionalFiles) throws IOException, 
LoginException {
+    open();
+  }
+
+  @Override
+  void close(boolean keepTmpDir) throws TezException, IOException {
     open = keepTmpDir;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java
new file mode 100644
index 0000000..5d1a3b6
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback;
+import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto;
+import 
org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.junit.Test;
+
+
+public class TestGuaranteedTaskAllocator {
+
+  static class MockCommunicator implements LlapPluginEndpointClient {
+    HashMap<Integer, Integer> messages = new HashMap<>();
+
+    @Override
+    public void sendUpdateQuery(UpdateQueryRequestProto request,
+        AmPluginNode node,
+        ExecuteRequestCallback<UpdateQueryResponseProto> callback) {
+      WmTezSession session = (WmTezSession)node;
+      messages.put(Integer.parseInt(session.getSessionId()), 
request.getGuaranteedTaskCount());
+      callback.setResponse(UpdateQueryResponseProto.getDefaultInstance());
+    }
+  }
+
+  static class GuaranteedTasksAllocatorForTest extends 
GuaranteedTasksAllocator {
+    int executorCount = 0;
+
+    public GuaranteedTasksAllocatorForTest(LlapPluginEndpointClient 
amCommunicator) {
+      super(new Configuration(), amCommunicator);
+    }
+
+    // Override external stuff. These could also be injected as extra classes.
+
+    @Override
+    protected int getExecutorCount(boolean allowUpdate) {
+      return executorCount;
+    }
+  }
+
+  @Test
+  public void testEqualAllocations() {
+    testEqualAllocation(8, 5, 1.0f);
+    testEqualAllocation(0, 3, 1.0f);
+    testEqualAllocation(3, 1, 1.0f);
+    testEqualAllocation(5, 5, 1.0f);
+    testEqualAllocation(7, 10, 1.0f);
+    testEqualAllocation(98, 10, 1.0f);
+    testEqualAllocation(40, 5, 0.5f);
+    testEqualAllocation(40, 5, 0.25f);
+    testEqualAllocation(40, 5, 0.1f);
+    testEqualAllocation(40, 5, 0.01f);
+  }
+
+  @Test
+  public void testAllocations() {
+    testAllocation(8, 1.0f,
+        new double[] { 0.5f, 0.25f, 0.25f }, new int[] { 4, 2, 2 });
+    testAllocation(10, 1.0f,
+        new double[] { 0.33f, 0.4f, 0.27f }, new int[] { 3, 4, 3 });
+    // Test incorrect totals. We don't normalize; just make sure we don't 
under- or overshoot.
+    testAllocation(10, 1.0f,
+        new double[] { 0.5f, 0.5f, 0.5f }, new int[] { 5, 5, 0 });
+    testAllocation(100, 0.5f,
+        new double[] { 0.15f, 0.15f, 0.15f }, new int[] { 15, 15, 20 });
+  }
+
+  private void testAllocation(int ducks, double total, double[] in, int[] out) 
{
+    MockCommunicator comm = new MockCommunicator();
+    GuaranteedTasksAllocatorForTest qam = new 
GuaranteedTasksAllocatorForTest(comm);
+    List<WmTezSession> sessionsToUpdate = new ArrayList<>();
+    comm.messages.clear();
+    for (int i = 0; i < in.length; ++i) {
+      addSession(in[i], sessionsToUpdate, i);
+    }
+    qam.executorCount = ducks;
+    qam.updateSessionsAsync(total, sessionsToUpdate);
+    Integer[] results = getAllocationResults(comm, in.length);
+    for (int i = 0; i < results.length; ++i) {
+      assertNotNull(results[i]);
+      assertEquals(out[i], results[i].intValue());
+    }
+  }
+
+  private void testEqualAllocation(int ducks, int sessions, double total) {
+    MockCommunicator comm = new MockCommunicator();
+    GuaranteedTasksAllocatorForTest qam = new 
GuaranteedTasksAllocatorForTest(comm);
+    List<WmTezSession> sessionsToUpdate = new ArrayList<>();
+    comm.messages.clear();
+    double fraction = total / sessions;
+    for (int i = 0; i < sessions; ++i) {
+      addSession(fraction, sessionsToUpdate, i);
+    }
+    qam.executorCount = ducks;
+    qam.updateSessionsAsync(total, sessionsToUpdate);
+    Integer[] results = getAllocationResults(comm, sessions);
+    int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE, totalAssigned = 0;
+    for (int i = 0; i < results.length; ++i) {
+      assertNotNull(results[i]);
+      int val = results[i];
+      min = Math.min(val, min);
+      max = Math.max(val, max);
+      totalAssigned += val;
+    }
+    assertTrue((max - min) <= 1);
+    assertTrue(Math.abs(total * ducks - totalAssigned) <= 0.5f);
+  }
+
+  private Integer[] getAllocationResults(MockCommunicator comm, int sessions) {
+    assertEquals(sessions, comm.messages.size());
+    Integer[] results = new Integer[sessions];
+    for (Entry<Integer, Integer> e : comm.messages.entrySet()) {
+      assertNull(results[e.getKey()]);
+      results[e.getKey()] = e.getValue();
+    }
+    return results;
+  }
+
+  private void addSession(double alloc, List<WmTezSession> sessionsToUpdate, 
int i) {
+    SampleTezSessionState session = new SampleTezSessionState("" + i, null, 
null);
+    session.setClusterFraction(alloc);
+    sessionsToUpdate.add(session);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index 5e1e68c..b9f9f5e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec.tez;
 import static org.junit.Assert.*;
 
 import java.util.HashSet;
-
 import java.util.Set;
 
 import java.util.ArrayList;
@@ -101,11 +100,11 @@ public class TestTezSessionPool {
       // draw 1 and replace
       TezSessionState sessionState = poolManager.getSession(null, conf, true, 
false);
       assertEquals("a", sessionState.getQueueName());
-      poolManager.returnSession(sessionState, false);
+      poolManager.returnSession(sessionState);
 
       sessionState = poolManager.getSession(null, conf, true, false);
       assertEquals("a", sessionState.getQueueName());
-      poolManager.returnSession(sessionState, false);
+      poolManager.returnSession(sessionState);
 
       // [a,b,c,a,b,c]
 
@@ -114,11 +113,11 @@ public class TestTezSessionPool {
       TezSessionState second = poolManager.getSession(null, conf, true, false);
       assertEquals("a", first.getQueueName());
       assertEquals("b", second.getQueueName());
-      poolManager.returnSession(first, false);
-      poolManager.returnSession(second, false);
+      poolManager.returnSession(first);
+      poolManager.returnSession(second);
       TezSessionState third = poolManager.getSession(null, conf, true, false);
       assertEquals("b", third.getQueueName());
-      poolManager.returnSession(third, false);
+      poolManager.returnSession(third);
 
       // [b,a,c,a,b,c]
 
@@ -130,15 +129,15 @@ public class TestTezSessionPool {
       assertEquals("a", second.getQueueName());
       assertEquals("c", third.getQueueName());
 
-      poolManager.returnSession(first, false);
-      poolManager.returnSession(second, false);
-      poolManager.returnSession(third, false);
+      poolManager.returnSession(first);
+      poolManager.returnSession(second);
+      poolManager.returnSession(third);
 
       // [c,a,b,a,b,c]
 
       first = poolManager.getSession(null, conf, true, false);
       assertEquals("c", third.getQueueName());
-      poolManager.returnSession(first, false);
+      poolManager.returnSession(first);
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -169,7 +168,7 @@ public class TestTezSessionPool {
         assertEquals(4, queueCounts[i]);
       }
       for (int i = 0; i < sessions.length; ++i) {
-        poolManager.returnSession(sessions[i], false);
+        poolManager.returnSession(sessions[i]);
       }
 
     } catch (Exception e) {
@@ -191,7 +190,7 @@ public class TestTezSessionPool {
       Mockito.when(session.isDefault()).thenReturn(false);
       Mockito.when(session.getConf()).thenReturn(conf);
 
-      poolManager.reopenSession(session, conf);
+      poolManager.reopenSession(session, conf, null);
 
       Mockito.verify(session).close(true);
       Mockito.verify(session).open(new HashSet<String>(), null);
@@ -201,12 +200,12 @@ public class TestTezSessionPool {
 
       // user explicitly specified queue name
       conf.set("tez.queue.name", "tezq1");
-      poolManager.reopenSession(session, conf);
+      poolManager.reopenSession(session, conf, null);
       assertEquals("tezq1", poolManager.getSession(null, conf, false, 
false).getQueueName());
 
       // user unsets queue name, will fallback to default session queue
       conf.unset("tez.queue.name");
-      poolManager.reopenSession(session, conf);
+      poolManager.reopenSession(session, conf, null);
       assertEquals("default", poolManager.getSession(null, conf, false, 
false).getQueueName());
 
       // session.open will unset the queue name from conf but Mockito 
intercepts the open call
@@ -214,17 +213,17 @@ public class TestTezSessionPool {
       conf.unset("tez.queue.name");
       // change session's default queue to tezq1 and rerun test sequence
       Mockito.when(session.getQueueName()).thenReturn("tezq1");
-      poolManager.reopenSession(session, conf);
+      poolManager.reopenSession(session, conf, null);
       assertEquals("tezq1", poolManager.getSession(null, conf, false, 
false).getQueueName());
 
       // user sets default queue now
       conf.set("tez.queue.name", "default");
-      poolManager.reopenSession(session, conf);
+      poolManager.reopenSession(session, conf, null);
       assertEquals("default", poolManager.getSession(null, conf, false, 
false).getQueueName());
 
       // user does not specify queue so use session default
       conf.unset("tez.queue.name");
-      poolManager.reopenSession(session, conf);
+      poolManager.reopenSession(session, conf, null);
       assertEquals("tezq1", poolManager.getSession(null, conf, false, 
false).getQueueName());
     } catch (Exception e) {
       e.printStackTrace();
@@ -282,7 +281,8 @@ public class TestTezSessionPool {
 
         TezSessionState session = poolManager.getSession(null, tmpConf, true, 
llap);
         Thread.sleep((random.nextInt(9) % 10) * 1000);
-        poolManager.returnSession(session, llap);
+        session.setLegacyLlapMode(llap);
+        poolManager.returnSession(session);
       } catch (Exception e) {
         e.printStackTrace();
       }
@@ -328,7 +328,7 @@ public class TestTezSessionPool {
     Mockito.when(session.isDefault()).thenReturn(false);
     Mockito.when(session.getConf()).thenReturn(conf);
 
-    poolManager.reopenSession(session, conf);
+    poolManager.reopenSession(session, conf, null);
 
     Mockito.verify(session).close(true);
     Mockito.verify(session).open(new HashSet<String>(), null);

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 9b9eead..2dc334d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -20,15 +20,8 @@ package org.apache.hadoop.hive.ql.exec.tez;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.never;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -180,6 +173,8 @@ public class TestTezTask {
     session = mock(TezClient.class);
     sessionState = mock(TezSessionState.class);
     when(sessionState.getSession()).thenReturn(session);
+    when(sessionState.reopen(any(Configuration.class), any(String[].class)))
+      .thenReturn(sessionState);
     when(session.submitDAG(any(DAG.class)))
       .thenThrow(new SessionNotRunning(""))
       .thenReturn(mock(DAGClient.class));
@@ -227,8 +222,7 @@ public class TestTezTask {
     task.submit(conf, dag, path, appLr, sessionState, 
Collections.<LocalResource> emptyList(),
         new String[0], Collections.<String,LocalResource> emptyMap());
     // validate close/reopen
-    verify(sessionState, times(1)).open(any(Collection.class), 
any(Path.class));
-    verify(sessionState, times(1)).close(eq(true)); // now uses pool after 
HIVE-7043
+    verify(sessionState, times(1)).reopen(any(Configuration.class), 
any(String[].class));
     verify(session, times(2)).submitDAG(any(DAG.class));
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
new file mode 100644
index 0000000..7adf895
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import java.util.List;
+
+import org.junit.Test;
+
+public class TestWorkloadManager {
+  private static class MockQam implements QueryAllocationManager {
+    boolean isCalled = false;
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> 
sessions) {
+      isCalled = true;
+    }
+
+    void assertWasCalled() {
+      assertTrue(isCalled);
+      isCalled = false;
+    }
+  }
+
+  private static class WorkloadManagerForTest extends WorkloadManager {
+
+    WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
+        QueryAllocationManager qam) {
+      super(yarnQueue, conf, numSessions, qam, null);
+    }
+
+    @Override
+    protected WmTezSession createSessionObject(String sessionId) {
+      return new SampleTezSessionState(sessionId, this, new 
HiveConf(getConf()));
+    }
+
+    @Override
+    protected boolean ensureAmIsRegistered(WmTezSession session) throws 
Exception {
+      return true;
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testReuse() throws Exception {
+    HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
+    wm.start();
+    TezSessionState nonPool = mock(TezSessionState.class);
+    when(nonPool.getConf()).thenReturn(conf);
+    doNothing().when(nonPool).close(anyBoolean());
+    TezSessionState session = wm.getSession(nonPool, null, conf);
+    verify(nonPool).close(anyBoolean());
+    assertNotSame(nonPool, session);
+    session.returnToSessionManager();
+    TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class);
+    when(diffPool.getConf()).thenReturn(conf);
+    doNothing().when(diffPool).returnToSessionManager();
+    session = wm.getSession(diffPool, null, conf);
+    verify(diffPool).returnToSessionManager();
+    assertNotSame(diffPool, session);
+    TezSessionState session2 = wm.getSession(session, null, conf);
+    assertSame(session, session2);
+  }
+
+  @Test(timeout = 10000)
+  public void testQueueName() throws Exception {
+    HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
+    wm.start();
+    // The queue should be ignored.
+    conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
+    TezSessionState session = wm.getSession(null, null, conf);
+    assertEquals("test", session.getQueueName());
+    assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
+    session.setQueueName("test2");
+    session = wm.getSession(session, null, conf);
+    assertEquals("test", session.getQueueName());
+  }
+
+  // Note (unrelated to epsilon): all the fraction checks are valid with the 
current logic in the
+  //                              absence of policies. This will change when 
there are policies.
+  private final static double EPSILON = 0.001;
+
+  @Test(timeout = 10000)
+  public void testReopen() throws Exception {
+    // We should always get a different object, and cluster fraction should be 
propagated.
+    HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
+    wm.start();
+    WmTezSession session = (WmTezSession) wm.getSession(null, null, conf);
+    assertEquals(1.0, session.getClusterFraction(), EPSILON);
+    qam.assertWasCalled();
+    WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
+    assertNotSame(session, session2);
+    assertEquals(1.0, session2.getClusterFraction(), EPSILON);
+    assertEquals(0.0, session.getClusterFraction(), EPSILON);
+    qam.assertWasCalled();
+  }
+
+  @Test(timeout = 10000)
+  public void testDestroyAndReturn() throws Exception {
+    // Session should not be lost; however the fraction should be discarded.
+    HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
+    wm.start();
+    WmTezSession session = (WmTezSession) wm.getSession(null, null, conf);
+    assertEquals(1.0, session.getClusterFraction(), EPSILON);
+    qam.assertWasCalled();
+    WmTezSession session2 = (WmTezSession) wm.getSession(null, null, conf);
+    assertEquals(0.5, session.getClusterFraction(), EPSILON);
+    assertEquals(0.5, session2.getClusterFraction(), EPSILON);
+    qam.assertWasCalled();
+    assertNotSame(session, session2);
+    session.destroy(); // Destroy before returning to the pool.
+    assertEquals(1.0, session2.getClusterFraction(), EPSILON);
+    assertEquals(0.0, session.getClusterFraction(), EPSILON);
+    qam.assertWasCalled();
+
+    // We never lose pool session, so we should still be able to get.
+    session = (WmTezSession) wm.getSession(null, null, conf);
+    session.returnToSessionManager();
+    assertEquals(1.0, session2.getClusterFraction(), EPSILON);
+    assertEquals(0.0, session.getClusterFraction(), EPSILON);
+    qam.assertWasCalled();
+
+    // Now destroy the returned session (which is technically not valid) and 
confirm correctness.
+    session.destroy();
+    assertEquals(1.0, session2.getClusterFraction(), EPSILON);
+    //qam.assertWasNotCalled();
+  }
+
+  private HiveConf createConf() {
+    HiveConf conf = new HiveConf();
+    conf.set(ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME.varname, "-1");
+    conf.set(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+    conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, "");
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java 
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index a55cf59..5cb973c 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -58,6 +58,7 @@ import 
org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
@@ -91,7 +92,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 
 /**
  * HiveServer2.
@@ -107,6 +107,7 @@ public class HiveServer2 extends CompositeService {
   private CuratorFramework zooKeeperClient;
   private boolean deregisteredWithZooKeeper = false; // Set to true only when 
deregistration happens
   private HttpServer webServer; // Web UI
+  private WorkloadManager wm;
 
   public HiveServer2() {
     super(HiveServer2.class.getSimpleName());
@@ -161,6 +162,14 @@ public class HiveServer2 extends CompositeService {
       LlapRegistryService.getClient(hiveConf);
     }
 
+    // Initialize workload management.
+    String wmQueue = HiveConf.getVar(hiveConf, 
ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
+    if (wmQueue != null && !wmQueue.isEmpty()) {
+      wm = WorkloadManager.create(wmQueue, hiveConf);
+    } else {
+      wm = null;
+    }
+
     // Create views registry
     try {
       Hive sessionHive = Hive.get(hiveConf);
@@ -553,6 +562,14 @@ public class HiveServer2 extends CompositeService {
             + "Shutting down HiveServer2 anyway.", e);
       }
     }
+    if (wm != null) {
+      try {
+        wm.stop();
+      } catch (Exception e) {
+        LOG.error("Workload manager stop had an error during stop of 
HiveServer2. "
+            + "Shutting down HiveServer2 anyway.", e);
+      }
+    }
 
     if (hiveConf != null && 
hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
       try {
@@ -618,6 +635,9 @@ public class HiveServer2 extends CompositeService {
         if (sessionPool != null) {
           sessionPool.startPool();
         }
+        if (server.wm != null) {
+          server.wm.start();
+        }
 
         if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
           SparkSessionManagerImpl.getInstance().setup(hiveConf);

Reply via email to