Author: vikram Date: Wed Mar 5 19:55:24 2014 New Revision: 1574640 URL: http://svn.apache.org/r1574640 Log: HIVE-6325: Enable using multiple concurrent sessions in tez (Vikram Dixit, reviewed by Gunther Hagleitner)
Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java Modified: hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/branches/branch-0.13/conf/hive-default.xml.template hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java Modified: hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1574640&r1=1574639&r2=1574640&view=diff ============================================================================== --- hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Mar 5 19:55:24 2014 @@ -956,6 +956,11 @@ public class HiveConf extends Configurat HIVECOUNTERGROUP("hive.counters.group.name", "HIVE"), + HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", ""), + HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE("hive.server2.tez.sessions.per.default.queue", 1), + HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS("hive.server2.tez.initialize.default.sessions", + false), + // none, column // none is the default(past) behavior. Implies only alphaNumeric and underscore are valid characters in identifiers. // column: implies column names can contain any character. Modified: hive/branches/branch-0.13/conf/hive-default.xml.template URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/conf/hive-default.xml.template?rev=1574640&r1=1574639&r2=1574640&view=diff ============================================================================== --- hive/branches/branch-0.13/conf/hive-default.xml.template (original) +++ hive/branches/branch-0.13/conf/hive-default.xml.template Wed Mar 5 19:55:24 2014 @@ -2406,4 +2406,34 @@ <description>By default tez will use the java opts from map tasks. This can be used to overwrite.</description> </property> +<property> + <name>hive.server2.tez.default.queues</name> + <value></value> + <description> + A list of comma separated values corresponding to yarn queues of the same name. + When hive server 2 is launched in tez mode, this configuration needs to be set + for multiple tez sessions to run in parallel on the cluster. + </description> +</property> + +<property> + <name>hive.server2.tez.sessions.per.default.queue</name> + <value>1</value> + <description> + A positive integer that determines the number of tez sessions that should be + launched on each of the queues specified by "hive.server2.tez.default.queues". + Determines the parallelism on each queue. + </description> +</property> + +<property> + <name>hive.server2.tez.initialize.default.sessions</name> + <value>false</value> + <description> + This flag is used in hive server 2 to enable a user to use hive server 2 without + turning on tez for hive server 2. The user could potentially want to run queries + over tez without the pool of sessions. + </description> +</property> + </configuration> Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1574640&view=auto ============================================================================== --- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (added) +++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Wed Mar 5 19:55:24 2014 @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; + +/** + * This class is for managing multiple tez sessions particularly when + * HiveServer2 is being used to submit queries. + * + * In case the user specifies a queue explicitly, a new session is created + * on that queue and assigned to the session state. + */ +public class TezSessionPoolManager { + + private static final Log LOG = LogFactory.getLog(TezSessionPoolManager.class); + + private BlockingQueue<TezSessionState> defaultQueuePool; + private int blockingQueueLength = -1; + private HiveConf initConf = null; + + private boolean inited = false; + + private static TezSessionPoolManager sessionPool = null; + + public static TezSessionPoolManager getInstance() + throws Exception { + if (sessionPool == null) { + sessionPool = new TezSessionPoolManager(); + } + + return sessionPool; + } + + protected TezSessionPoolManager() { + } + + public void startPool() throws Exception { + this.inited = true; + for (int i = 0; i < blockingQueueLength; i++) { + HiveConf newConf = new HiveConf(initConf); + TezSessionState sessionState = defaultQueuePool.take(); + newConf.set("tez.queue.name", sessionState.getQueueName()); + sessionState.open(TezSessionState.makeSessionId(), newConf); + defaultQueuePool.put(sessionState); + } + } + + public void setupPool(HiveConf conf) throws InterruptedException { + + String defaultQueues = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES); + int numSessions = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); + + // the list of queues is a comma separated list. + String defaultQueueList[] = defaultQueues.split(","); + defaultQueuePool = + new ArrayBlockingQueue<TezSessionState>(numSessions * defaultQueueList.length); + this.initConf = conf; + /* + * with this the ordering of sessions in the queue will be (with 2 sessions 3 queues) + * s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform distribution of + * the sessions across queues at least to begin with. Then as sessions get freed up, the list + * may change this ordering. + */ + blockingQueueLength = 0; + for (int i = 0; i < numSessions; i++) { + for (String queue : defaultQueueList) { + if (queue.length() == 0) { + continue; + } + TezSessionState sessionState = createSession(); + sessionState.setQueueName(queue); + sessionState.setDefault(); + LOG.info("Created new tez session for queue: " + queue + + " with session id: " + sessionState.getSessionId()); + defaultQueuePool.put(sessionState); + blockingQueueLength++; + } + } + } + + private TezSessionState getSession(HiveConf conf) + throws Exception { + + String queueName = conf.get("tez.queue.name"); + + boolean nonDefaultUser = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); + + /* + * if the user has specified a queue name themselves, we create a new session. + * also a new session is created if the user tries to submit to a queue using + * their own credentials. We expect that with the new security model, things will + * run as user hive in most cases. + */ + if (!(this.inited) || ((queueName != null) && (!queueName.isEmpty())) + || (nonDefaultUser) || (defaultQueuePool == null) || (blockingQueueLength <= 0)) { + LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser + + " defaultQueuePool: " + defaultQueuePool + + " blockingQueueLength: " + blockingQueueLength); + return getNewSessionState(conf, queueName); + } + + LOG.info("Choosing a session from the defaultQueuePool"); + return defaultQueuePool.take(); + } + + /** + * @param conf HiveConf that is used to initialize the session + * @param queueName could be null. Set in the tez session. + * @return + * @throws Exception + */ + private TezSessionState getNewSessionState(HiveConf conf, + String queueName) throws Exception { + TezSessionState retTezSessionState = createSession(); + retTezSessionState.setQueueName(queueName); + retTezSessionState.open(TezSessionState.makeSessionId(), conf); + + LOG.info("Started a new session for queue: " + queueName + + " session id: " + retTezSessionState.getSessionId()); + return retTezSessionState; + } + + public void returnSession(TezSessionState tezSessionState) + throws Exception { + if (tezSessionState.isDefault()) { + LOG.info("The session " + tezSessionState.getSessionId() + + " belongs to the pool. Put it back in"); + SessionState sessionState = SessionState.get(); + if (sessionState != null) { + sessionState.setTezSession(null); + } + defaultQueuePool.put(tezSessionState); + } + // non default session nothing changes. The user can continue to use the existing + // session in the SessionState + } + + public void close(TezSessionState tezSessionState) throws Exception { + LOG.info("Closing tez session default? " + tezSessionState.isDefault()); + if (!tezSessionState.isDefault()) { + tezSessionState.close(false); + } + } + + public void stop() throws Exception { + if ((sessionPool == null) || (this.inited == false)) { + return; + } + + // we can just stop all the sessions + for (TezSessionState sessionState: TezSessionState.getOpenSessions()) { + if (sessionState.isDefault()) { + sessionState.close(false); + } + } + } + + protected TezSessionState createSession() { + return new TezSessionState(); + } + + public TezSessionState getSession(TezSessionState session, HiveConf conf) throws Exception { + if (canWorkWithSameSession(session, conf)) { + return session; + } + + if (session != null) { + session.close(false); + } + + return getSession(conf); + } + + /* + * This method helps to re-use a session in case there has been no change in + * the configuration of a session. This will happen only in the case of non-hive-server2 + * sessions for e.g. when a CLI session is started. The CLI session could re-use the + * same tez session eliminating the latencies of new AM and containers. + */ + private boolean canWorkWithSameSession(TezSessionState session, HiveConf conf) + throws HiveException { + if (session == null || conf == null) { + return false; + } + + HiveConf existingConf = session.getConf(); + if (existingConf == null) { + return false; + } + + // either variables will never be null because a default value is returned in case of absence + if (existingConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) != + conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + return false; + } + + if (!session.isDefault()) { + if (existingConf.get("tez.queue.name") == conf.get("tez.queue.name")) { + // both are null + return true; + } + if ((existingConf.get("tez.queue.name") == null)) { + // doesn't matter if the other conf is null or not. if it is null, above case catches it + return false; + } + + if (!existingConf.get("tez.queue.name").equals(conf.get("tez.queue.name"))) { + // handles the case of incoming conf having a null for tez.queue.name + return false; + } + } else { + // this session should never be a default session unless something has messed up. + throw new HiveException("Default queue should always be returned." + + "Hence we should not be here."); + } + + return true; + } +} Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1574640&r1=1574639&r2=1574640&view=diff ============================================================================== --- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original) +++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Wed Mar 5 19:55:24 2014 @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.UUID; import javax.security.auth.login.LoginException; @@ -60,6 +61,8 @@ public class TezSessionState { private TezSession session; private String sessionId; private DagUtils utils; + private String queueName; + private boolean defaultQueue = false; private static List<TezSessionState> openSessions = Collections.synchronizedList(new LinkedList<TezSessionState>()); @@ -95,6 +98,10 @@ public class TezSessionState { return openSessions; } + public static String makeSessionId() { + return UUID.randomUUID().toString(); + } + /** * Creates a tez session. A session is tied to either a cli/hs2 session. You can * submit multiple DAGs against a session (as long as they are executed serially). @@ -104,7 +111,7 @@ public class TezSessionState { * @throws TezException */ public void open(String sessionId, HiveConf conf) - throws IOException, LoginException, URISyntaxException, TezException { + throws IOException, LoginException, URISyntaxException, TezException { this.sessionId = sessionId; this.conf = conf; @@ -126,7 +133,7 @@ public class TezSessionState { commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr); AMConfiguration amConfig = new AMConfiguration(null, commonLocalResources, - tezConfig, null); + tezConfig, null); // configuration for the session TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig); @@ -212,7 +219,7 @@ public class TezSessionState { * be used with Tez. Assumes scratchDir exists. */ private Path createTezDir(String sessionId) - throws IOException { + throws IOException { // tez needs its own scratch dir (per session) Path tezDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR), @@ -236,7 +243,7 @@ public class TezSessionState { * @throws URISyntaxException when current jar location cannot be determined. */ private LocalResource createHiveExecLocalResource() - throws IOException, LoginException, URISyntaxException { + throws IOException, LoginException, URISyntaxException { String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY); String currentVersionPathStr = utils.getExecJarPathLocal(); String currentJarName = utils.getResourceBaseName(currentVersionPathStr); @@ -245,6 +252,7 @@ public class TezSessionState { FileStatus dirStatus = null; if (hiveJarDir != null) { + LOG.info("Hive jar directory is " + hiveJarDir); // check if it is a valid directory in HDFS Path hiveJarDirPath = new Path(hiveJarDir); fs = hiveJarDirPath.getFileSystem(conf); @@ -286,6 +294,7 @@ public class TezSessionState { if ((hiveJarDir == null) || (dirStatus == null) || ((dirStatus != null) && (!dirStatus.isDir()))) { Path dest = utils.getDefaultDestDir(conf); + LOG.info("Jar dir is null/directory doesn't exist. Choosing HIVE_INSTALL_DIR - " + dest); String destPathStr = dest.toString(); String jarPathStr = destPathStr + "/" + currentJarName; dirStatus = fs.getFileStatus(dest); @@ -294,9 +303,29 @@ public class TezSessionState { } else { throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString())); } - } + } // we couldn't find any valid locations. Throw exception throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg()); } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + public String getQueueName() { + return queueName; + } + + public void setDefault() { + defaultQueue = true; + } + + public boolean isDefault() { + return defaultQueue; + } + + public HiveConf getConf() { + return conf; + } } Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1574640&r1=1574639&r2=1574640&view=diff ============================================================================== --- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Wed Mar 5 19:55:24 2014 @@ -114,19 +114,15 @@ public class TezTask extends Task<TezWor // get a session. SessionState ss = SessionState.get(); session = ss.getTezSession(); - - // if we don't have one yet create it. - if (session == null) { - session = new TezSessionState(); - ss.setTezSession(session); - } + session = TezSessionPoolManager.getInstance().getSession(session, conf); + ss.setTezSession(session); // if it's not running start it. if (!session.isOpen()) { // can happen if the user sets the tez flag after the session was // established LOG.info("Tez session hasn't been created yet. Opening session"); - session.open(ss.getSessionId(), conf); + session.open(session.getSessionId(), conf); } // we will localize all the files (jars, plans, hashtables) to the @@ -156,6 +152,7 @@ public class TezTask extends Task<TezWor // fetch the counters Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); + TezSessionPoolManager.getInstance().returnSession(session); if (LOG.isInfoEnabled()) { for (CounterGroup group: counters) { Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1574640&r1=1574639&r2=1574640&view=diff ============================================================================== --- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original) +++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Wed Mar 5 19:55:24 2014 @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.history.HiveHistoryImpl; @@ -333,7 +334,7 @@ public class SessionState { } if (HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE) - .equals("tez")) { + .equals("tez") && (startSs.isHiveServerQuery == false)) { try { if (startSs.tezSessionState == null) { startSs.tezSessionState = new TezSessionState(); @@ -942,7 +943,7 @@ public class SessionState { try { if (tezSessionState != null) { - tezSessionState.close(false); + TezSessionPoolManager.getInstance().close(tezSessionState); } } catch (Exception e) { LOG.info("Error closing tez session", e); Added: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java?rev=1574640&view=auto ============================================================================== --- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java (added) +++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java Wed Mar 5 19:55:24 2014 @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.junit.Before; +import org.junit.Test; +import org.apache.hadoop.hive.conf.HiveConf; + +public class TestTezSessionPool { + + HiveConf conf; + Random random; + private TezSessionPoolManager poolManager; + + private class TestTezSessionPoolManager extends TezSessionPoolManager { + public TestTezSessionPoolManager() { + super(); + } + + @Override + public TezSessionState createSession() { + return new TestTezSessionState(); + } + } + + @Before + public void setUp() { + conf = new HiveConf(); + } + + @Test + public void testGetNonDefaultSession() { + poolManager = new TestTezSessionPoolManager(); + try { + TezSessionState sessionState = poolManager.getSession(null, conf); + TezSessionState sessionState1 = poolManager.getSession(sessionState, conf); + if (sessionState1 != sessionState) { + fail(); + } + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testSessionPoolGetInOrder() { + try { + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c"); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2); + + poolManager = new TestTezSessionPoolManager(); + poolManager.setupPool(conf); + poolManager.startPool(); + TezSessionState sessionState = poolManager.getSession(null, conf); + if (sessionState.getQueueName().compareTo("a") != 0) { + fail(); + } + poolManager.returnSession(sessionState); + + sessionState = poolManager.getSession(null, conf); + if (sessionState.getQueueName().compareTo("b") != 0) { + fail(); + } + poolManager.returnSession(sessionState); + + sessionState = poolManager.getSession(null, conf); + if (sessionState.getQueueName().compareTo("c") != 0) { + fail(); + } + poolManager.returnSession(sessionState); + + sessionState = poolManager.getSession(null, conf); + if (sessionState.getQueueName().compareTo("a") != 0) { + fail(); + } + + poolManager.returnSession(sessionState); + + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + public class SessionThread implements Runnable { + + @Override + public void run() { + try { + HiveConf tmpConf = new HiveConf(conf); + if (random.nextDouble() > 0.5) { + tmpConf.set("tez.queue.name", "default"); + } else { + tmpConf.set("tez.queue.name", ""); + } + + TezSessionState session = poolManager.getSession(null, tmpConf); + Thread.sleep((random.nextInt(9) % 10) * 1000); + poolManager.returnSession(session); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + @Test + public void testReturn() { + conf.set("tez.queue.name", ""); + random = new Random(1000); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c"); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2); + try { + poolManager = new TestTezSessionPoolManager(); + poolManager.setupPool(conf); + poolManager.startPool(); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + List<Thread> threadList = new ArrayList<Thread>(); + for (int i = 0; i < 15; i++) { + Thread t = new Thread(new SessionThread()); + t.start(); + } + + for (Thread t : threadList) { + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + } +} Added: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java?rev=1574640&view=auto ============================================================================== --- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java (added) +++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java Wed Mar 5 19:55:24 2014 @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.net.URISyntaxException; + +import javax.security.auth.login.LoginException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.tez.dag.api.TezException; + + +/** + * This class is needed for writing junit tests. For testing the multi-session + * use case from hive server 2, we need a session simulation. + * + */ +public class TestTezSessionState extends TezSessionState { + + private boolean open; + private String sessionId; + private HiveConf hiveConf; + + @Override + public boolean isOpen() { + return open; + } + + public void setOpen(boolean open) { + this.open = open; + } + + @Override + public void open(String sessionId, HiveConf conf) throws IOException, + LoginException, URISyntaxException, TezException { + this.sessionId = sessionId; + this.hiveConf = conf; + } + + @Override + public void close(boolean keepTmpDir) throws TezException, IOException { + open = keepTmpDir; + } + + public HiveConf getConf() { + return this.hiveConf; + } + + @Override + public String getSessionId() { + return sessionId; + } +} Modified: hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1574640&r1=1574639&r2=1574640&view=diff ============================================================================== --- hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java (original) +++ hive/branches/branch-0.13/service/src/java/org/apache/hive/service/server/HiveServer2.java Wed Mar 5 19:55:24 2014 @@ -23,6 +23,9 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.CLIService; @@ -73,6 +76,17 @@ public class HiveServer2 extends Composi @Override public synchronized void stop() { super.stop(); + // there should already be an instance of the session pool manager. + // if not, ignoring is fine while stopping the hive server. + HiveConf hiveConf = this.getHiveConf(); + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + try { + TezSessionPoolManager.getInstance().stop(); + } catch (Exception e) { + LOG.error("Tez session pool manager stop had an error during stop of hive server"); + e.printStackTrace(); + } + } } private static void startHiveServer2() throws Throwable { @@ -85,6 +99,11 @@ public class HiveServer2 extends Composi server = new HiveServer2(); server.init(hiveConf); server.start(); + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance(); + sessionPool.setupPool(hiveConf); + sessionPool.startPool(); + } break; } catch (Throwable throwable) { if(++attempts >= maxAttempts) {