HIVE-16571 : HiveServer2: Prefer LIFO over round-robin for Tez session reuse (Gopal Vijayaraghavan, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bdacb105 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bdacb105 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bdacb105 Branch: refs/heads/hive-14535 Commit: bdacb10523cd8b46ed11649fee62e87388c26752 Parents: 04bb1ca Author: sergey <[email protected]> Authored: Mon Jun 5 14:31:13 2017 -0700 Committer: sergey <[email protected]> Committed: Mon Jun 5 14:31:13 2017 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/tez/TezSessionPoolManager.java | 9 ++-- .../hive/ql/exec/tez/TestTezSessionPool.java | 43 ++++++++++++++++---- 2 files changed, 39 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bdacb105/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index b4d8ffa..dabca3f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -23,12 +23,13 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.net.URISyntaxException; - import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; @@ -77,7 +78,7 @@ public class TezSessionPoolManager { private static final Random rdm = new Random(); private volatile SessionState initSessionState; - private BlockingQueue<TezSessionPoolSession> defaultQueuePool; + private BlockingDeque<TezSessionPoolSession> defaultQueuePool; /** Priority queue sorted by expiration time of live sessions that could be expired. */ private PriorityBlockingQueue<TezSessionPoolSession> expirationQueue; @@ -204,7 +205,7 @@ public class TezSessionPoolManager { int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames); if (numSessionsTotal > 0) { - defaultQueuePool = new ArrayBlockingQueue<TezSessionPoolSession>(numSessionsTotal); + defaultQueuePool = new LinkedBlockingDeque<TezSessionPoolSession>(numSessionsTotal); } numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); @@ -426,7 +427,7 @@ public class TezSessionPoolManager { TezSessionPoolSession poolSession = (TezSessionPoolSession) tezSessionState; if (poolSession.returnAfterUse()) { - defaultQueuePool.put(poolSession); + defaultQueuePool.putFirst(poolSession); } } // non default session nothing changes. The user can continue to use the existing http://git-wip-us.apache.org/repos/asf/hive/blob/bdacb105/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index cbd2bfe..5a3eba3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -88,24 +88,49 @@ public class TestTezSessionPool { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); poolManager.startPool(); + // this is now a LIFO operation + + // draw 1 and replace TezSessionState sessionState = poolManager.getSession(null, conf, true, false); assertEquals("a", sessionState.getQueueName()); poolManager.returnSession(sessionState, false); sessionState = poolManager.getSession(null, conf, true, false); - assertEquals("b", sessionState.getQueueName()); + assertEquals("a", sessionState.getQueueName()); poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true, false); - assertEquals("c", sessionState.getQueueName()); - poolManager.returnSession(sessionState, false); + // [a,b,c,a,b,c] - sessionState = poolManager.getSession(null, conf, true, false); - if (sessionState.getQueueName().compareTo("a") != 0) { - fail(); - } + // draw 2 and return in order - further run should return last returned + TezSessionState first = poolManager.getSession(null, conf, true, false); + TezSessionState second = poolManager.getSession(null, conf, true, false); + assertEquals("a", first.getQueueName()); + assertEquals("b", second.getQueueName()); + poolManager.returnSession(first, false); + poolManager.returnSession(second, false); + TezSessionState third = poolManager.getSession(null, conf, true, false); + assertEquals("b", third.getQueueName()); + poolManager.returnSession(third, false); - poolManager.returnSession(sessionState, false); + // [b,a,c,a,b,c] + + first = poolManager.getSession(null, conf, true, false); + second = poolManager.getSession(null, conf, true, false); + third = poolManager.getSession(null, conf, true, false); + + assertEquals("b", first.getQueueName()); + assertEquals("a", second.getQueueName()); + assertEquals("c", third.getQueueName()); + + poolManager.returnSession(first, false); + poolManager.returnSession(second, false); + poolManager.returnSession(third, false); + + // [c,a,b,a,b,c] + + first = poolManager.getSession(null, conf, true, false); + assertEquals("c", third.getQueueName()); + poolManager.returnSession(first, false); } catch (Exception e) { e.printStackTrace();
