Author: vikram
Date: Wed Mar  5 19:55:24 2014
New Revision: 1574640

URL: http://svn.apache.org/r1574640
Log:
HIVE-6325: Enable using multiple concurrent sessions in tez (Vikram Dixit, 
reviewed by Gunther Hagleitner)

Added:
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
    
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
    
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
Modified:
    
hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/branch-0.13/conf/hive-default.xml.template
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
    
hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java

Modified: 
hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1574640&r1=1574639&r2=1574640&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
 (original)
+++ 
hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
 Wed Mar  5 19:55:24 2014
@@ -956,6 +956,11 @@ public class HiveConf extends Configurat
 
     HIVECOUNTERGROUP("hive.counters.group.name", "HIVE"),
 
+    HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", ""),
+    
HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE("hive.server2.tez.sessions.per.default.queue",
 1),
+    
HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS("hive.server2.tez.initialize.default.sessions",
+        false),
+
     // none, column
     // none is the default(past) behavior. Implies only alphaNumeric and 
underscore are valid characters in identifiers.
     // column: implies column names can contain any character.

Modified: hive/branches/branch-0.13/conf/hive-default.xml.template
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/conf/hive-default.xml.template?rev=1574640&r1=1574639&r2=1574640&view=diff
==============================================================================
--- hive/branches/branch-0.13/conf/hive-default.xml.template (original)
+++ hive/branches/branch-0.13/conf/hive-default.xml.template Wed Mar  5 
19:55:24 2014
@@ -2406,4 +2406,34 @@
   <description>By default tez will use the java opts from map tasks. This can 
be used to overwrite.</description>
 </property>
 
+<property>
+  <name>hive.server2.tez.default.queues</name>
+  <value></value>
+  <description>
+    A list of comma separated values corresponding to yarn queues of the same 
name.
+    When hive server 2 is launched in tez mode, this configuration needs to be 
set
+    for multiple tez sessions to run in parallel on the cluster.
+  </description>
+</property>
+
+<property>
+  <name>hive.server2.tez.sessions.per.default.queue</name>
+  <value>1</value>
+  <description>
+    A positive integer that determines the number of tez sessions that should 
be
+    launched on each of the queues specified by 
"hive.server2.tez.default.queues".
+    Determines the parallelism on each queue.
+  </description>
+</property>
+
+<property>
+  <name>hive.server2.tez.initialize.default.sessions</name>
+  <value>false</value>
+  <description>
+    This flag is used in hive server 2 to enable a user to use hive server 2 
without
+    turning on tez for hive server 2. The user could potentially want to run 
queries
+    over tez without the pool of sessions.
+  </description>
+</property>
+
 </configuration>

Added: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1574640&view=auto
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
 (added)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
 Wed Mar  5 19:55:24 2014
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * This class is for managing multiple tez sessions particularly when
+ * HiveServer2 is being used to submit queries.
+ *
+ * In case the user specifies a queue explicitly, a new session is created
+ * on that queue and assigned to the session state.
+ */
+public class TezSessionPoolManager {
+
+  private static final Log LOG = 
LogFactory.getLog(TezSessionPoolManager.class);
+
+  private BlockingQueue<TezSessionState> defaultQueuePool;
+  private int blockingQueueLength = -1;
+  private HiveConf initConf = null;
+
+  private boolean inited = false;
+
+  private static TezSessionPoolManager sessionPool = null;
+
+  public static TezSessionPoolManager getInstance()
+      throws Exception {
+    if (sessionPool == null) {
+      sessionPool = new TezSessionPoolManager();
+    }
+
+    return sessionPool;
+  }
+
+  protected TezSessionPoolManager() {
+  }
+
+  public void startPool() throws Exception {
+    this.inited = true;
+    for (int i = 0; i < blockingQueueLength; i++) {
+      HiveConf newConf = new HiveConf(initConf);
+      TezSessionState sessionState = defaultQueuePool.take();
+      newConf.set("tez.queue.name", sessionState.getQueueName());
+      sessionState.open(TezSessionState.makeSessionId(), newConf);
+      defaultQueuePool.put(sessionState);
+    }
+  }
+
+  public void setupPool(HiveConf conf) throws InterruptedException {
+
+    String defaultQueues = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
+    int numSessions = 
conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
+
+    // the list of queues is a comma separated list.
+    String defaultQueueList[] = defaultQueues.split(",");
+    defaultQueuePool =
+        new ArrayBlockingQueue<TezSessionState>(numSessions * 
defaultQueueList.length);
+    this.initConf = conf;
+    /*
+     *  with this the ordering of sessions in the queue will be (with 2 
sessions 3 queues)
+     *  s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform 
distribution of
+     *  the sessions across queues at least to begin with. Then as sessions 
get freed up, the list
+     *  may change this ordering.
+     */
+    blockingQueueLength = 0;
+    for (int i = 0; i < numSessions; i++) {
+      for (String queue : defaultQueueList) {
+        if (queue.length() == 0) {
+          continue;
+        }
+        TezSessionState sessionState = createSession();
+        sessionState.setQueueName(queue);
+        sessionState.setDefault();
+        LOG.info("Created new tez session for queue: " + queue +
+            " with session id: " + sessionState.getSessionId());
+        defaultQueuePool.put(sessionState);
+        blockingQueueLength++;
+      }
+    }
+  }
+
+  private TezSessionState getSession(HiveConf conf)
+      throws Exception {
+
+    String queueName = conf.get("tez.queue.name");
+
+    boolean nonDefaultUser = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
+
+    /*
+     * if the user has specified a queue name themselves, we create a new 
session.
+     * also a new session is created if the user tries to submit to a queue 
using
+     * their own credentials. We expect that with the new security model, 
things will
+     * run as user hive in most cases.
+     */
+    if (!(this.inited) || ((queueName != null) && (!queueName.isEmpty()))
+       || (nonDefaultUser) || (defaultQueuePool == null) || 
(blockingQueueLength <= 0)) {
+      LOG.info("QueueName: " + queueName + " nonDefaultUser: " + 
nonDefaultUser +
+          " defaultQueuePool: " + defaultQueuePool +
+          " blockingQueueLength: " + blockingQueueLength);
+      return getNewSessionState(conf, queueName);
+    }
+
+    LOG.info("Choosing a session from the defaultQueuePool");
+    return defaultQueuePool.take();
+  }
+
+  /**
+   * @param conf HiveConf that is used to initialize the session
+   * @param queueName could be null. Set in the tez session.
+   * @return
+   * @throws Exception
+   */
+  private TezSessionState getNewSessionState(HiveConf conf,
+      String queueName) throws Exception {
+    TezSessionState retTezSessionState = createSession();
+    retTezSessionState.setQueueName(queueName);
+    retTezSessionState.open(TezSessionState.makeSessionId(), conf);
+
+    LOG.info("Started a new session for queue: " + queueName +
+        " session id: " + retTezSessionState.getSessionId());
+    return retTezSessionState;
+  }
+
+  public void returnSession(TezSessionState tezSessionState)
+      throws Exception {
+    if (tezSessionState.isDefault()) {
+      LOG.info("The session " + tezSessionState.getSessionId() 
+          + " belongs to the pool. Put it back in");
+      SessionState sessionState = SessionState.get();
+      if (sessionState != null) {
+        sessionState.setTezSession(null);
+      }
+      defaultQueuePool.put(tezSessionState);
+    }
+    // non default session nothing changes. The user can continue to use the 
existing
+    // session in the SessionState
+  }
+
+  public void close(TezSessionState tezSessionState) throws Exception {
+    LOG.info("Closing tez session default? " + tezSessionState.isDefault());
+    if (!tezSessionState.isDefault()) {
+      tezSessionState.close(false);
+    }
+  }
+
+  public void stop() throws Exception {
+    if ((sessionPool == null) || (this.inited == false)) {
+      return;
+    }
+
+    // we can just stop all the sessions
+    for (TezSessionState sessionState: TezSessionState.getOpenSessions()) {
+      if (sessionState.isDefault()) {
+        sessionState.close(false);
+      }
+    }
+  }
+
+  protected TezSessionState createSession() {
+    return new TezSessionState();
+  }
+
+  public TezSessionState getSession(TezSessionState session, HiveConf conf) 
throws Exception {
+    if (canWorkWithSameSession(session, conf)) {
+      return session;
+    }
+
+    if (session != null) {
+      session.close(false);
+    }
+
+    return getSession(conf);
+  }
+
+  /*
+   * This method helps to re-use a session in case there has been no change in
+   * the configuration of a session. This will happen only in the case of 
non-hive-server2
+   * sessions for e.g. when a CLI session is started. The CLI session could 
re-use the
+   * same tez session eliminating the latencies of new AM and containers.
+   */
+  private boolean canWorkWithSameSession(TezSessionState session, HiveConf 
conf)
+       throws HiveException {
+    if (session == null || conf == null) {
+      return false;
+    }
+
+    HiveConf existingConf = session.getConf();
+    if (existingConf == null) {
+      return false;
+    }
+
+    // either variables will never be null because a default value is returned 
in case of absence
+    if (existingConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) !=
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+      return false;
+    }
+
+    if (!session.isDefault()) {
+      if (existingConf.get("tez.queue.name") == conf.get("tez.queue.name")) {
+        // both are null
+        return true;
+      }
+      if ((existingConf.get("tez.queue.name") == null)) {
+        // doesn't matter if the other conf is null or not. if it is null, 
above case catches it
+        return false;
+      }
+
+      if 
(!existingConf.get("tez.queue.name").equals(conf.get("tez.queue.name"))) {
+        // handles the case of incoming conf having a null for tez.queue.name
+        return false;
+      }
+    } else {
+      // this session should never be a default session unless something has 
messed up.
+      throw new HiveException("Default queue should always be returned." +
+      "Hence we should not be here.");
+    }
+
+    return true;
+  }
+}

Modified: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1574640&r1=1574639&r2=1574640&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
 (original)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
 Wed Mar  5 19:55:24 2014
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import javax.security.auth.login.LoginException;
 
@@ -60,6 +61,8 @@ public class TezSessionState {
   private TezSession session;
   private String sessionId;
   private DagUtils utils;
+  private String queueName;
+  private boolean defaultQueue = false;
 
   private static List<TezSessionState> openSessions
     = Collections.synchronizedList(new LinkedList<TezSessionState>());
@@ -95,6 +98,10 @@ public class TezSessionState {
     return openSessions;
   }
 
+  public static String makeSessionId() {
+    return UUID.randomUUID().toString();
+  }
+
   /**
    * Creates a tez session. A session is tied to either a cli/hs2 session. You 
can
    * submit multiple DAGs against a session (as long as they are executed 
serially).
@@ -104,7 +111,7 @@ public class TezSessionState {
    * @throws TezException
    */
   public void open(String sessionId, HiveConf conf)
-      throws IOException, LoginException, URISyntaxException, TezException {
+    throws IOException, LoginException, URISyntaxException, TezException {
 
     this.sessionId = sessionId;
     this.conf = conf;
@@ -126,7 +133,7 @@ public class TezSessionState {
     commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
 
     AMConfiguration amConfig = new AMConfiguration(null, commonLocalResources,
-         tezConfig, null);
+        tezConfig, null);
 
     // configuration for the session
     TezSessionConfiguration sessionConfig = new 
TezSessionConfiguration(amConfig, tezConfig);
@@ -212,7 +219,7 @@ public class TezSessionState {
    * be used with Tez. Assumes scratchDir exists.
    */
   private Path createTezDir(String sessionId)
-      throws IOException {
+    throws IOException {
 
     // tez needs its own scratch dir (per session)
     Path tezDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
@@ -236,7 +243,7 @@ public class TezSessionState {
    * @throws URISyntaxException when current jar location cannot be determined.
    */
   private LocalResource createHiveExecLocalResource()
-      throws IOException, LoginException, URISyntaxException {
+    throws IOException, LoginException, URISyntaxException {
     String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
     String currentVersionPathStr = utils.getExecJarPathLocal();
     String currentJarName = utils.getResourceBaseName(currentVersionPathStr);
@@ -245,6 +252,7 @@ public class TezSessionState {
     FileStatus dirStatus = null;
 
     if (hiveJarDir != null) {
+      LOG.info("Hive jar directory is " + hiveJarDir);
       // check if it is a valid directory in HDFS
       Path hiveJarDirPath = new Path(hiveJarDir);
       fs = hiveJarDirPath.getFileSystem(conf);
@@ -286,6 +294,7 @@ public class TezSessionState {
     if ((hiveJarDir == null) || (dirStatus == null) ||
         ((dirStatus != null) && (!dirStatus.isDir()))) {
       Path dest = utils.getDefaultDestDir(conf);
+      LOG.info("Jar dir is null/directory doesn't exist. Choosing 
HIVE_INSTALL_DIR - " + dest);
       String destPathStr = dest.toString();
       String jarPathStr = destPathStr + "/" + currentJarName;
       dirStatus = fs.getFileStatus(dest);
@@ -294,9 +303,29 @@ public class TezSessionState {
       } else {
         throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString()));
       }
-    }
+        }
 
     // we couldn't find any valid locations. Throw exception
     throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
   }
+
+  public void setQueueName(String queueName) {
+    this.queueName = queueName;
+  }
+
+  public String getQueueName() {
+    return queueName;
+  }
+
+  public void setDefault() {
+    defaultQueue  = true;
+  }
+
+  public boolean isDefault() {
+    return defaultQueue;
+  }
+
+  public HiveConf getConf() {
+    return conf;
+  }
 }

Modified: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1574640&r1=1574639&r2=1574640&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
 (original)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
 Wed Mar  5 19:55:24 2014
@@ -114,19 +114,15 @@ public class TezTask extends Task<TezWor
       // get a session.
       SessionState ss = SessionState.get();
       session = ss.getTezSession();
-
-      // if we don't have one yet create it.
-      if (session == null) {
-        session = new TezSessionState();
-        ss.setTezSession(session);
-      }
+      session = TezSessionPoolManager.getInstance().getSession(session, conf);
+      ss.setTezSession(session);
 
       // if it's not running start it.
       if (!session.isOpen()) {
         // can happen if the user sets the tez flag after the session was
         // established
         LOG.info("Tez session hasn't been created yet. Opening session");
-        session.open(ss.getSessionId(), conf);
+        session.open(session.getSessionId(), conf);
       }
 
       // we will localize all the files (jars, plans, hashtables) to the
@@ -156,6 +152,7 @@ public class TezTask extends Task<TezWor
       // fetch the counters
       Set<StatusGetOpts> statusGetOpts = 
EnumSet.of(StatusGetOpts.GET_COUNTERS);
       counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
+      TezSessionPoolManager.getInstance().returnSession(session);
 
       if (LOG.isInfoEnabled()) {
         for (CounterGroup group: counters) {

Modified: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1574640&r1=1574639&r2=1574640&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
 (original)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
 Wed Mar  5 19:55:24 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.MapRedStats;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.history.HiveHistoryImpl;
@@ -333,7 +334,7 @@ public class SessionState {
     }
 
     if (HiveConf.getVar(startSs.getConf(), 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)
-        .equals("tez")) {
+        .equals("tez") && (startSs.isHiveServerQuery == false)) {
       try {
         if (startSs.tezSessionState == null) {
           startSs.tezSessionState = new TezSessionState();
@@ -942,7 +943,7 @@ public class SessionState {
 
     try {
       if (tezSessionState != null) {
-        tezSessionState.close(false);
+        TezSessionPoolManager.getInstance().close(tezSessionState);
       }
     } catch (Exception e) {
       LOG.info("Error closing tez session", e);

Added: 
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java?rev=1574640&view=auto
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
 (added)
+++ 
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
 Wed Mar  5 19:55:24 2014
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public class TestTezSessionPool {
+
+  HiveConf conf;
+  Random random;
+  private TezSessionPoolManager poolManager;
+
+  private class TestTezSessionPoolManager extends TezSessionPoolManager {
+    public TestTezSessionPoolManager() {
+      super();
+    }
+
+    @Override
+      public TezSessionState createSession() {
+        return new TestTezSessionState();
+      }
+  }
+
+  @Before
+    public void setUp() {
+      conf = new HiveConf();
+    }
+
+  @Test
+    public void testGetNonDefaultSession() {
+      poolManager = new TestTezSessionPoolManager();
+      try {
+        TezSessionState sessionState = poolManager.getSession(null, conf);
+        TezSessionState sessionState1 = poolManager.getSession(sessionState, 
conf);
+        if (sessionState1 != sessionState) {
+          fail();
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail();
+      }
+    }
+
+  @Test
+    public void testSessionPoolGetInOrder() {
+      try {
+        conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+        conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, 
"a,b,c");
+        
conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 
2);
+
+        poolManager = new TestTezSessionPoolManager();
+        poolManager.setupPool(conf);
+        poolManager.startPool();
+        TezSessionState sessionState = poolManager.getSession(null, conf);
+        if (sessionState.getQueueName().compareTo("a") != 0) {
+          fail();
+        }
+        poolManager.returnSession(sessionState);
+
+        sessionState = poolManager.getSession(null, conf);
+        if (sessionState.getQueueName().compareTo("b") != 0) {
+          fail();
+        }
+        poolManager.returnSession(sessionState);
+
+        sessionState = poolManager.getSession(null, conf);
+        if (sessionState.getQueueName().compareTo("c") != 0) {
+          fail();
+        }
+        poolManager.returnSession(sessionState);
+
+        sessionState = poolManager.getSession(null, conf);
+        if (sessionState.getQueueName().compareTo("a") != 0) {
+          fail();
+        }
+
+        poolManager.returnSession(sessionState);
+
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail();
+      }
+    }
+
+  public class SessionThread implements Runnable {
+
+    @Override
+      public void run() {
+        try {
+          HiveConf tmpConf = new HiveConf(conf);
+          if (random.nextDouble() > 0.5) {
+            tmpConf.set("tez.queue.name", "default");
+          } else {
+            tmpConf.set("tez.queue.name", "");
+          }
+
+          TezSessionState session = poolManager.getSession(null, tmpConf);
+          Thread.sleep((random.nextInt(9) % 10) * 1000);
+          poolManager.returnSession(session);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+  }
+
+  @Test
+    public void testReturn() {
+      conf.set("tez.queue.name", "");
+      random = new Random(1000);
+      conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+      conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c");
+      
conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 
2);
+      try {
+        poolManager = new TestTezSessionPoolManager();
+        poolManager.setupPool(conf);
+        poolManager.startPool();
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail();
+      }
+      List<Thread> threadList = new ArrayList<Thread>();
+      for (int i = 0; i < 15; i++) {
+        Thread t = new Thread(new SessionThread());
+        t.start();
+      }
+
+      for (Thread t : threadList) {
+        try {
+          t.join();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          fail();
+        }
+      }
+    }
+}

Added: 
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java?rev=1574640&view=auto
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
 (added)
+++ 
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
 Wed Mar  5 19:55:24 2014
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.tez.dag.api.TezException;
+
+
+/**
+ * This class is needed for writing junit tests. For testing the multi-session
+ * use case from hive server 2, we need a session simulation.
+ *
+ */
+public class TestTezSessionState extends TezSessionState {
+
+  private boolean open;
+  private String sessionId;
+  private HiveConf hiveConf;
+
+  @Override
+    public boolean isOpen() {
+      return open;
+    }
+
+  public void setOpen(boolean open) {
+    this.open = open;
+  }
+
+  @Override
+    public void open(String sessionId, HiveConf conf) throws IOException,
+           LoginException, URISyntaxException, TezException {
+             this.sessionId = sessionId;
+             this.hiveConf = conf;
+    }
+
+  @Override
+    public void close(boolean keepTmpDir) throws TezException, IOException {
+      open = keepTmpDir;
+    }
+
+  public HiveConf getConf() {
+    return this.hiveConf;
+  }
+
+  @Override
+    public String getSessionId() {
+      return sessionId;
+    }
+}

Modified: 
hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1574640&r1=1574639&r2=1574640&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java
 (original)
+++ 
hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java
 Wed Mar  5 19:55:24 2014
@@ -23,6 +23,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.service.CompositeService;
 import org.apache.hive.service.cli.CLIService;
@@ -73,6 +76,17 @@ public class HiveServer2 extends Composi
   @Override
   public synchronized void stop() {
     super.stop();
+    // there should already be an instance of the session pool manager.
+    // if not, ignoring is fine while stopping the hive server.
+    HiveConf hiveConf = this.getHiveConf();
+    if 
(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+      try {
+        TezSessionPoolManager.getInstance().stop();
+      } catch (Exception e) {
+        LOG.error("Tez session pool manager stop had an error during stop of 
hive server");
+        e.printStackTrace();
+      }
+    }
   }
 
   private static void startHiveServer2() throws Throwable {
@@ -85,6 +99,11 @@ public class HiveServer2 extends Composi
         server = new HiveServer2();
         server.init(hiveConf);
         server.start();
+        if 
(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+          TezSessionPoolManager sessionPool = 
TezSessionPoolManager.getInstance();
+          sessionPool.setupPool(hiveConf);
+          sessionPool.startPool();
+        }
         break;
       } catch (Throwable throwable) {
         if(++attempts >= maxAttempts) {


Reply via email to