Repository: hive
Updated Branches:
  refs/heads/branch-2.0 36f8b7590 -> cbfd25704
  refs/heads/master c0b04f7b0 -> 8c65c3455


HIVE-12915 : Tez session pool has concurrency issues during init (Sergey 
Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d3af8215
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d3af8215
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d3af8215

Branch: refs/heads/master
Commit: d3af82154eb8bed52f63942de681905c95d4f45c
Parents: c0b04f7
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Wed Jan 27 14:13:40 2016 -0800
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Wed Jan 27 14:13:40 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   | 11 +++
 .../hive/ql/exec/spark/HashTableLoader.java     |  1 -
 .../hive/ql/exec/tez/TezSessionPoolManager.java | 93 +++++++++++---------
 .../hive/ql/exec/tez/TestTezSessionPool.java    |  2 +-
 .../apache/hive/service/server/HiveServer2.java | 12 ++-
 5 files changed, 74 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d3af8215/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 125d40c..3e10fd4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3163,6 +3163,17 @@ public class HiveConf extends Configuration {
     return conf.getTrimmed(var.varname, var.defaultStrVal);
   }
 
+  public static String[] getTrimmedStringsVar(Configuration conf, ConfVars 
var) {
+    assert (var.valClass == String.class) : var.varname;
+    String[] result = conf.getTrimmedStrings(var.varname, (String[])null);
+    if (result != null) return result;
+    if (var.altName != null) {
+      result = conf.getTrimmedStrings(var.altName, (String[])null);
+      if (result != null) return result;
+    }
+    return 
org.apache.hadoop.util.StringUtils.getTrimmedStrings(var.defaultStrVal);
+  }
+
   public static String getVar(Configuration conf, ConfVars var, String 
defaultVal) {
     if (var.altName != null) {
       return conf.get(var.varname, conf.get(var.altName, defaultVal));

http://git-wip-us.apache.org/repos/asf/hive/blob/d3af8215/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
index 64474e6..1634f42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
@@ -161,7 +161,6 @@ public class HashTableLoader implements 
org.apache.hadoop.hive.ql.exec.HashTable
     }
     MapJoinTableContainer mapJoinTable = SmallTableCache.get(path);
     if (mapJoinTable == null) {
-      // TODO#: HERE?
       synchronized (path.toString().intern()) {
         mapJoinTable = SmallTableCache.get(path);
         if (mapJoinTable == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d3af8215/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 0d9fa6d..73f3766 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -28,17 +28,17 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 
@@ -80,13 +80,18 @@ public class TezSessionPoolManager {
 
 
   private Semaphore llapQueue;
-  private int blockingQueueLength = -1;
   private HiveConf initConf = null;
   int numConcurrentLlapQueries = -1;
   private long sessionLifetimeMs = 0;
   private long sessionLifetimeJitterMs = 0;
-
-  private boolean inited = false;
+  /** A queue for initial sessions that have not been started yet. */
+  private Queue<TezSessionPoolSession> initialSessions =
+      new ConcurrentLinkedQueue<TezSessionPoolSession>();
+  /**
+   * Indicates whether we should try to use defaultSessionPool.
+   * We assume that setupPool is either called before any activity, or not 
called at all.
+   */
+  private volatile boolean hasInitialSessions = false;
 
   private static TezSessionPoolManager sessionPool = null;
 
@@ -104,9 +109,8 @@ public class TezSessionPoolManager {
   protected TezSessionPoolManager() {
   }
 
-  private void startNextSessionFromQueue() throws Exception {
+  private void startInitialSession(TezSessionPoolSession sessionState) throws 
Exception {
     HiveConf newConf = new HiveConf(initConf);
-    TezSessionPoolSession sessionState = defaultQueuePool.take();
     boolean isUsable = sessionState.tryUse();
     if (!isUsable) throw new IOException(sessionState + " is not usable at 
pool startup");
     newConf.set("tez.queue.name", sessionState.getQueueName());
@@ -117,20 +121,18 @@ public class TezSessionPoolManager {
   }
 
   public void startPool() throws Exception {
-    this.inited = true;
-    if (blockingQueueLength == 0) return;
-    int threadCount = Math.min(blockingQueueLength,
+    if (initialSessions.isEmpty()) return;
+    int threadCount = Math.min(initialSessions.size(),
         HiveConf.getIntVar(initConf, 
ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
     Preconditions.checkArgument(threadCount > 0);
     if (threadCount == 1) {
-      for (int i = 0; i < blockingQueueLength; i++) {
-        // The queue is FIFO, so if we cycle thru length items, we'd start 
each session once.
-        startNextSessionFromQueue();
+      while (true) {
+        TezSessionPoolSession session = initialSessions.poll();
+        if (session == null) break;
+        startInitialSession(session);
       }
     } else {
       final SessionState parentSessionState = SessionState.get();
-      // The queue is FIFO, so if we cycle thru length items, we'd start each 
session once.
-      final AtomicInteger remainingToStart = new 
AtomicInteger(blockingQueueLength);
       // The runnable has no mutable state, so each thread can run the same 
thing.
       final AtomicReference<Exception> firstError = new 
AtomicReference<>(null);
       Runnable runnable = new Runnable() {
@@ -138,9 +140,11 @@ public class TezSessionPoolManager {
           if (parentSessionState != null) {
             SessionState.setCurrentSessionState(parentSessionState);
           }
-          while (remainingToStart.decrementAndGet() >= 0) {
+          while (true) {
+            TezSessionPoolSession session = initialSessions.poll();
+            if (session == null) break;
             try {
-              startNextSessionFromQueue();
+              startInitialSession(session);
             } catch (Exception e) {
               if (!firstError.compareAndSet(null, e)) {
                 LOG.error("Failed to start session; ignoring due to previous 
error", e);
@@ -170,16 +174,32 @@ public class TezSessionPoolManager {
   }
 
   public void setupPool(HiveConf conf) throws InterruptedException {
-    String defaultQueues = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
+    String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
+        conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
+    int emptyNames = 0; // We don't create sessions for empty entries.
+    for (String queueName : defaultQueueList) {
+      if (queueName.isEmpty()) {
+        ++emptyNames;
+      }
+    }
     int numSessions = 
conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
+    int numSessionsTotal = numSessions * (defaultQueueList.length - 
emptyNames);
+    if (numSessionsTotal > 0) {
+      defaultQueuePool = new 
ArrayBlockingQueue<TezSessionPoolSession>(numSessionsTotal);
+    }
+
     numConcurrentLlapQueries = 
conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
+    llapQueue = new Semaphore(numConcurrentLlapQueries, true);
+
+    this.initConf = conf;
+
     sessionLifetimeMs = conf.getTimeVar(
         ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME, TimeUnit.MILLISECONDS);
     if (sessionLifetimeMs != 0) {
       sessionLifetimeJitterMs = conf.getTimeVar(
           ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME_JITTER, 
TimeUnit.MILLISECONDS);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Starting session expiration threads; session lifetime is "
+        LOG.debug("Session expiration is enabled; session lifetime is "
             + sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") 
ms");
       }
       expirationQueue = new PriorityBlockingQueue<>(11, new 
Comparator<TezSessionPoolSession>() {
@@ -190,6 +210,11 @@ public class TezSessionPoolManager {
         }
       });
       restartQueue = new LinkedBlockingQueue<>();
+    }
+    this.hasInitialSessions = numSessionsTotal > 0;
+    // From this point on, session creation will wait for the default pool (if 
# of sessions > 0).
+
+    if (sessionLifetimeMs != 0) {
       expirationThread = new Thread(new Runnable() {
         @Override
         public void run() {
@@ -204,13 +229,6 @@ public class TezSessionPoolManager {
       }, "TezSessionPool-cleanup");
     }
 
-    // the list of queues is a comma separated list.
-    String defaultQueueList[] = defaultQueues.split(",");
-    defaultQueuePool = new ArrayBlockingQueue<TezSessionPoolSession>(
-        numSessions * defaultQueueList.length);
-    llapQueue = new Semaphore(numConcurrentLlapQueries, true);
-
-    this.initConf = conf;
     /*
      * In a single-threaded init case, with this the ordering of sessions in 
the queue will be
      * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by 
ensuring uniform
@@ -218,14 +236,12 @@ public class TezSessionPoolManager {
      * freed up, the list may change this ordering.
      * In a multi threaded init case it's a free for all.
      */
-    blockingQueueLength = 0;
     for (int i = 0; i < numSessions; i++) {
-      for (String queue : defaultQueueList) {
-        if (queue.length() == 0) {
+      for (String queueName : defaultQueueList) {
+        if (queueName.isEmpty()) {
           continue;
         }
-        defaultQueuePool.put(createAndInitSession(queue, true));
-        blockingQueueLength++;
+        initialSessions.add(createAndInitSession(queueName, true));
       }
     }
   }
@@ -246,7 +262,6 @@ public class TezSessionPoolManager {
   private TezSessionState getSession(HiveConf conf, boolean doOpen,
       boolean forceCreate)
       throws Exception {
-
     String queueName = conf.get("tez.queue.name");
 
     boolean nonDefaultUser = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
@@ -257,12 +272,10 @@ public class TezSessionPoolManager {
      * their own credentials. We expect that with the new security model, 
things will
      * run as user hive in most cases.
      */
-    if (forceCreate || !(this.inited)
-        || ((queueName != null) && (!queueName.isEmpty()))
-        || (nonDefaultUser) || (defaultQueuePool == null) || 
(blockingQueueLength <= 0)) {
+    if (forceCreate || nonDefaultUser || !hasInitialSessions
+        || ((queueName != null) && !queueName.isEmpty())) {
       LOG.info("QueueName: " + queueName + " nonDefaultUser: " + 
nonDefaultUser +
-          " defaultQueuePool: " + defaultQueuePool +
-          " blockingQueueLength: " + blockingQueueLength);
+          " defaultQueuePool: " + defaultQueuePool + " hasInitialSessions: " + 
hasInitialSessions);
       return getNewSessionState(conf, queueName, doOpen);
     }
 
@@ -316,7 +329,7 @@ public class TezSessionPoolManager {
     // session in the SessionState
   }
 
-  public void closeIfNotDefault(
+  public static void closeIfNotDefault(
       TezSessionState tezSessionState, boolean keepTmpDir) throws Exception {
     LOG.info("Closing tez session default? " + tezSessionState.isDefault());
     if (!tezSessionState.isDefault()) {
@@ -325,7 +338,7 @@ public class TezSessionPoolManager {
   }
 
   public void stop() throws Exception {
-    if ((sessionPool == null) || (this.inited == false)) {
+    if ((sessionPool == null) || !this.hasInitialSessions) {
       return;
     }
 
@@ -378,7 +391,7 @@ public class TezSessionPoolManager {
    * sessions for e.g. when a CLI session is started. The CLI session could 
re-use the
    * same tez session eliminating the latencies of new AM and containers.
    */
-  private boolean canWorkWithSameSession(TezSessionState session, HiveConf 
conf)
+  private static boolean canWorkWithSameSession(TezSessionState session, 
HiveConf conf)
        throws HiveException {
     if (session == null || conf == null) {
       return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/d3af8215/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index b03f063..956fd29 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -152,7 +152,7 @@ public class TestTezSessionPool {
       poolManager.setupPool(conf);
       poolManager.startPool();
     } catch (Exception e) {
-      e.printStackTrace();
+      LOG.error("Initialization error", e);
       fail();
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/d3af8215/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java 
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 9c76683..958458f 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -509,6 +509,13 @@ public class HiveServer2 extends CompositeService {
       maxAttempts = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
       HiveServer2 server = null;
       try {
+        // Initialize the pool before we start the server; don't start yet.
+        TezSessionPoolManager sessionPool = null;
+        if 
(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+          sessionPool = TezSessionPoolManager.getInstance();
+          sessionPool.setupPool(hiveConf);
+        }
+
         // Cleanup the scratch dir before starting
         ServerUtils.cleanUpScratchDir(hiveConf);
         server = new HiveServer2();
@@ -528,9 +535,8 @@ public class HiveServer2 extends CompositeService {
         if 
(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
           server.addServerInstanceToZooKeeper(hiveConf);
         }
-        if 
(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
-          TezSessionPoolManager sessionPool = 
TezSessionPoolManager.getInstance();
-          sessionPool.setupPool(hiveConf);
+
+        if (sessionPool != null) {
           sessionPool.startPool();
         }
 

Reply via email to