Repository: hive Updated Branches: refs/heads/branch-2.1 18c679722 -> 87536ed62
HIVE-14397: Queries ran after reopening of tez session launches additional sessions (Prasanth Jayachandran reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/87536ed6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/87536ed6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/87536ed6 Branch: refs/heads/branch-2.1 Commit: 87536ed6265376aed43ed0af71ff5feb620a067b Parents: 18c6797 Author: Prasanth Jayachandran <[email protected]> Authored: Wed Aug 3 13:22:16 2016 -0700 Committer: Prasanth Jayachandran <[email protected]> Committed: Wed Aug 3 13:22:43 2016 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/tez/TezSessionPoolManager.java | 11 +++- .../hive/ql/exec/tez/TezSessionState.java | 9 ++++ .../hive/ql/exec/tez/TestTezSessionPool.java | 57 ++++++++++++++++++++ 3 files changed, 75 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/87536ed6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 917268f..83c2916 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -285,8 +285,9 @@ public class TezSessionPoolManager { */ if (forceCreate || nonDefaultUser || !hasInitialSessions || ((queueName != null) && !queueName.isEmpty())) { - LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser + - " defaultQueuePool: " + defaultQueuePool + " hasInitialSessions: " + hasInitialSessions); + LOG.info("QueueName: {} nonDefaultUser: {} defaultQueuePool: {} hasInitialSessions: {}" + + " forceCreate: {}", queueName, nonDefaultUser, defaultQueuePool, hasInitialSessions, + forceCreate); return getNewSessionState(conf, queueName, doOpen); } @@ -463,7 +464,13 @@ public class TezSessionPoolManager { String[] additionalFiles, boolean keepTmpDir) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionConf != null && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) != null) { + // user has explicitly specified queue name conf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME)); + } else { + // default queue name when the initial session was created + if (sessionState.getQueueName() != null) { + conf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName()); + } } // TODO: close basically resets the object to a bunch of nulls. // We should ideally not reuse the object because it's pointless and error-prone. http://git-wip-us.apache.org/repos/asf/hive/blob/87536ed6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 1009359..84dd545 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -419,6 +419,15 @@ public class TezSessionState { //ignore } isSuccessful = true; + // sessionState.getQueueName() comes from cluster wide configured queue names. + // sessionState.getConf().get("tez.queue.name") is explicitly set by user in a session. + // TezSessionPoolManager sets tez.queue.name if user has specified one or use the one from + // cluster wide queue names. + // There is no way to differentiate how this was set (user vs system). + // Unset this after opening the session so that reopening of session uses the correct queue + // names i.e, if client has not died and if the user has explicitly set a queue name + // then reopened session will use user specified queue name else default cluster queue names. + conf.unset(TezConfiguration.TEZ_QUEUE_NAME); return session; } finally { if (isOnThread && !isSuccessful) { http://git-wip-us.apache.org/repos/asf/hive/blob/87536ed6/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index ec90801..cbd2bfe 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.exec.tez; import static org.junit.Assert.*; +import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Random; +import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -144,6 +146,61 @@ public class TestTezSessionPool { } @Test + public void testSessionReopen() { + try { + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default,tezq1"); + conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 1); + + poolManager = new TestTezSessionPoolManager(); + TezSessionState session = Mockito.mock(TezSessionState.class); + Mockito.when(session.getQueueName()).thenReturn("default"); + Mockito.when(session.isDefault()).thenReturn(false); + Mockito.when(session.getConf()).thenReturn(conf); + + poolManager.reopenSession(session, conf, null, false); + + Mockito.verify(session).close(false); + String[] files = null; + Mockito.verify(session).open(conf, files); + + // mocked session starts with default queue + assertEquals("default", session.getQueueName()); + + // user explicitly specified queue name + conf.set("tez.queue.name", "tezq1"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); + + // user unsets queue name, will fallback to default session queue + conf.unset("tez.queue.name"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName()); + + // session.open will unset the queue name from conf but Mockito intercepts the open call + // and does not call the real method, so explicitly unset the queue name here + conf.unset("tez.queue.name"); + // change session's default queue to tezq1 and rerun test sequence + Mockito.when(session.getQueueName()).thenReturn("tezq1"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); + + // user sets default queue now + conf.set("tez.queue.name", "default"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName()); + + // user does not specify queue so use session default + conf.unset("tez.queue.name"); + poolManager.reopenSession(session, conf, null, false); + assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test public void testLlapSessionQueuing() { try { random = new Random(1000);
