Repository: hive
Updated Branches:
  refs/heads/master f008a38b4 -> 090adc60a


HIVE-14397: Queries ran after reopening of tez session launches additional 
sessions (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/090adc60
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/090adc60
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/090adc60

Branch: refs/heads/master
Commit: 090adc60a0bb574c90c7bad593f796e55e5b40a9
Parents: f008a38
Author: Prasanth Jayachandran <[email protected]>
Authored: Wed Aug 3 13:22:16 2016 -0700
Committer: Prasanth Jayachandran <[email protected]>
Committed: Wed Aug 3 13:22:16 2016 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/tez/TezSessionPoolManager.java | 11 +++-
 .../hive/ql/exec/tez/TezSessionState.java       |  9 ++++
 .../hive/ql/exec/tez/TestTezSessionPool.java    | 57 ++++++++++++++++++++
 3 files changed, 75 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/090adc60/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 917268f..83c2916 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -285,8 +285,9 @@ public class TezSessionPoolManager {
      */
     if (forceCreate || nonDefaultUser || !hasInitialSessions
         || ((queueName != null) && !queueName.isEmpty())) {
-      LOG.info("QueueName: " + queueName + " nonDefaultUser: " + 
nonDefaultUser +
-          " defaultQueuePool: " + defaultQueuePool + " hasInitialSessions: " + 
hasInitialSessions);
+      LOG.info("QueueName: {} nonDefaultUser: {} defaultQueuePool: {} 
hasInitialSessions: {}" +
+              " forceCreate: {}", queueName, nonDefaultUser, defaultQueuePool, 
hasInitialSessions,
+          forceCreate);
       return getNewSessionState(conf, queueName, doOpen);
     }
 
@@ -463,7 +464,13 @@ public class TezSessionPoolManager {
       String[] additionalFiles, boolean keepTmpDir) throws Exception {
     HiveConf sessionConf = sessionState.getConf();
     if (sessionConf != null && 
sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) != null) {
+      // user has explicitly specified queue name
       conf.set(TezConfiguration.TEZ_QUEUE_NAME, 
sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME));
+    } else {
+      // default queue name when the initial session was created
+      if (sessionState.getQueueName() != null) {
+        conf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
+      }
     }
     // TODO: close basically resets the object to a bunch of nulls.
     //       We should ideally not reuse the object because it's pointless and 
error-prone.

http://git-wip-us.apache.org/repos/asf/hive/blob/090adc60/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 2607db1..38250f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -395,6 +395,15 @@ public class TezSessionState {
         //ignore
       }
       isSuccessful = true;
+      // sessionState.getQueueName() comes from cluster wide configured queue 
names.
+      // sessionState.getConf().get("tez.queue.name") is explicitly set by 
user in a session.
+      // TezSessionPoolManager sets tez.queue.name if user has specified one 
or use the one from
+      // cluster wide queue names.
+      // There is no way to differentiate how this was set (user vs system).
+      // Unset this after opening the session so that reopening of session 
uses the correct queue
+      // names i.e, if client has not died and if the user has explicitly set 
a queue name
+      // then reopened session will use user specified queue name else default 
cluster queue names.
+      conf.unset(TezConfiguration.TEZ_QUEUE_NAME);
       return session;
     } finally {
       if (isOnThread && !isSuccessful) {

http://git-wip-us.apache.org/repos/asf/hive/blob/090adc60/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index ec90801..cbd2bfe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import static org.junit.Assert.*;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -144,6 +146,61 @@ public class TestTezSessionPool {
   }
 
   @Test
+  public void testSessionReopen() {
+    try {
+      conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default,tezq1");
+      conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 1);
+
+      poolManager = new TestTezSessionPoolManager();
+      TezSessionState session = Mockito.mock(TezSessionState.class);
+      Mockito.when(session.getQueueName()).thenReturn("default");
+      Mockito.when(session.isDefault()).thenReturn(false);
+      Mockito.when(session.getConf()).thenReturn(conf);
+
+      poolManager.reopenSession(session, conf, null, false);
+
+      Mockito.verify(session).close(false);
+      String[] files = null;
+      Mockito.verify(session).open(conf, files);
+
+      // mocked session starts with default queue
+      assertEquals("default", session.getQueueName());
+
+      // user explicitly specified queue name
+      conf.set("tez.queue.name", "tezq1");
+      poolManager.reopenSession(session, conf, null, false);
+      assertEquals("tezq1", poolManager.getSession(null, conf, false, 
false).getQueueName());
+
+      // user unsets queue name, will fallback to default session queue
+      conf.unset("tez.queue.name");
+      poolManager.reopenSession(session, conf, null, false);
+      assertEquals("default", poolManager.getSession(null, conf, false, 
false).getQueueName());
+
+      // session.open will unset the queue name from conf but Mockito 
intercepts the open call
+      // and does not call the real method, so explicitly unset the queue name 
here
+      conf.unset("tez.queue.name");
+      // change session's default queue to tezq1 and rerun test sequence
+      Mockito.when(session.getQueueName()).thenReturn("tezq1");
+      poolManager.reopenSession(session, conf, null, false);
+      assertEquals("tezq1", poolManager.getSession(null, conf, false, 
false).getQueueName());
+
+      // user sets default queue now
+      conf.set("tez.queue.name", "default");
+      poolManager.reopenSession(session, conf, null, false);
+      assertEquals("default", poolManager.getSession(null, conf, false, 
false).getQueueName());
+
+      // user does not specify queue so use session default
+      conf.unset("tez.queue.name");
+      poolManager.reopenSession(session, conf, null, false);
+      assertEquals("tezq1", poolManager.getSession(null, conf, false, 
false).getQueueName());
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
   public void testLlapSessionQueuing() {
     try {
       random = new Random(1000);

Reply via email to