Repository: hive
Updated Branches:
  refs/heads/master af4017028 -> 99d25f024


HIVE-20682: Async query execution can potentially fail if shared sessionHive is 
closed by master thread (Sankar Hariappan, reviewed by Mahesh Kumar Behera, 
Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/99d25f02
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/99d25f02
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/99d25f02

Branch: refs/heads/master
Commit: 99d25f02421a84bf0f96660f9248fd6518dc7c8a
Parents: af40170
Author: Sankar Hariappan <[email protected]>
Authored: Tue Nov 13 16:26:04 2018 +0530
Committer: Sankar Hariappan <[email protected]>
Committed: Tue Nov 13 16:26:04 2018 +0530

----------------------------------------------------------------------
 .../hive/ql/parse/TestReplicationScenarios.java |   2 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 129 +++++++++++--------
 .../service/cli/operation/SQLOperation.java     |  20 +--
 .../service/cli/session/HiveSessionImpl.java    |  56 +++++++-
 .../cli/session/TestSessionManagerMetrics.java  |   2 +
 5 files changed, 138 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/99d25f02/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 75cd68a..5a88550 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -389,7 +389,7 @@ public class TestReplicationScenarios {
     Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
     replLoadTask.initialize(null, null, new 
DriverContext(driver.getContext()), null);
     replLoadTask.executeTask(null);
-    Hive.getThreadLocal().closeCurrent();
+    Hive.closeCurrent();
     return replLoadWork.getRootTask();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/99d25f02/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 180b41e..e185bf4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -167,35 +167,36 @@ public class Hive {
   private IMetaStoreClient metaStoreClient;
   private SynchronizedMetaStoreClient syncMetaStoreClient;
   private UserGroupInformation owner;
+  private boolean isAllowClose = true;
 
   // metastore calls timing information
   private final ConcurrentHashMap<String, Long> metaCallTimeMap = new 
ConcurrentHashMap<>();
 
-  // Static class to store thread local Hive object and allowClose flag.
+  // Static class to store thread local Hive object.
   private static class ThreadLocalHive extends ThreadLocal<Hive> {
-    private ThreadLocal<Boolean> allowClose = ThreadLocal.withInitial(() -> 
true);
-
     @Override
     protected Hive initialValue() {
       return null;
     }
 
     @Override
-    public synchronized void remove() {
-      if (allowClose() && (this.get() != null)) {
-        this.get().close();
+    public synchronized void set(Hive hiveObj) {
+      Hive currentHive = this.get();
+      if (currentHive != hiveObj) {
+        // Remove/close current thread-local Hive object before overwriting 
with new Hive object.
+        remove();
+        super.set(hiveObj);
       }
-      super.remove();
-      this.allowClose.set(true);
-    }
-
-    public synchronized void set(Hive hiveObj, boolean allowClose) {
-      super.set(hiveObj);
-      this.allowClose.set(allowClose);
     }
 
-    boolean allowClose() {
-      return this.allowClose.get();
+    @Override
+    public synchronized void remove() {
+      Hive currentHive = this.get();
+      if (currentHive != null) {
+        // Close the metastore connections before removing it from thread 
local hiveDB.
+        currentHive.close(false);
+        super.remove();
+      }
     }
   }
 
@@ -317,7 +318,12 @@ public class Hive {
     Hive db = hiveDB.get();
     if (db == null || !db.isCurrentUserOwner() || needsRefresh
         || (c != null && !isCompatible(db, c, isFastCheck))) {
-      db = create(c, false, db, doRegisterAllFns);
+      if (db != null) {
+        LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + 
needsRefresh +
+                ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
+        closeCurrent();
+      }
+      db = create(c, doRegisterAllFns);
     }
     if (c != null) {
       db.conf = c;
@@ -325,26 +331,16 @@ public class Hive {
     return db;
   }
 
-  private static Hive create(HiveConf c, boolean needsRefresh, Hive db, 
boolean doRegisterAllFns)
-      throws HiveException {
-    if (db != null) {
-      LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + 
needsRefresh +
-        ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
-      if (hiveDB.allowClose()) {
-        db.close();
-      }
-    }
-    closeCurrent();
+  private static Hive create(HiveConf c, boolean doRegisterAllFns) throws 
HiveException {
     if (c == null) {
       c = createHiveConf();
     }
     c.set("fs.scheme.class", "dfs");
     Hive newdb = new Hive(c, doRegisterAllFns);
-    hiveDB.set(newdb, true);
+    hiveDB.set(newdb);
     return newdb;
   }
 
-
   private static HiveConf createHiveConf() {
     SessionState session = SessionState.get();
     return (session == null) ? new HiveConf(Hive.class) : session.getConf();
@@ -360,6 +356,18 @@ public class Hive {
     }
   }
 
+  private boolean isCurrentUserOwner() throws HiveException {
+    try {
+      return owner == null || 
owner.equals(UserGroupInformation.getCurrentUser());
+    } catch(IOException e) {
+      throw new HiveException("Error getting current user: " + e.getMessage(), 
e);
+    }
+  }
+
+  public static Hive getThreadLocal() {
+    return hiveDB.get();
+  }
+
   public static Hive get() throws HiveException {
     return get(true);
   }
@@ -383,21 +391,13 @@ public class Hive {
   }
 
   public static void set(Hive hive) {
-    hiveDB.set(hive, true);
-  }
-
-  public static void set(Hive hive, boolean allowClose) {
-    hiveDB.set(hive, allowClose);
+    hiveDB.set(hive);
   }
 
   public static void closeCurrent() {
     hiveDB.remove();
   }
 
-  public static Hive getThreadLocal() {
-    return hiveDB.get();
-  }
-
   /**
    * Hive
    *
@@ -411,30 +411,49 @@ public class Hive {
     }
   }
 
-
-  private boolean isCurrentUserOwner() throws HiveException {
-    try {
-      return owner == null || 
owner.equals(UserGroupInformation.getCurrentUser());
-    } catch(IOException e) {
-      throw new HiveException("Error getting current user: " + e.getMessage(), 
e);
-    }
+  /**
+   * GC is attempting to destroy the object.
+   * No one references this Hive anymore, so HMS connection from this Hive 
object can be closed.
+   * @throws Throwable
+   */
+  @Override
+  protected void finalize() throws Throwable {
+    close(true);
+    super.finalize();
   }
 
+  /**
+   * Marks if the given Hive object is allowed to close metastore connections.
+   * @param allowClose
+   */
+  public void setAllowClose(boolean allowClose) {
+    isAllowClose = allowClose;
+  }
 
+  /**
+   * Gets the allowClose flag which determines if it is allowed to close 
metastore connections.
+   * @returns allowClose flag
+   */
+  public boolean allowClose() {
+    return isAllowClose;
+  }
 
   /**
-   * closes the connection to metastore for the calling thread
+   * Closes the connection to metastore for the calling thread if allow to 
close.
+   * @param forceClose - Override the isAllowClose flag to forcefully close 
the MS connections.
    */
-  private void close() {
-    LOG.debug("Closing current thread's connection to Hive Metastore.");
-    if (metaStoreClient != null) {
-      metaStoreClient.close();
-      metaStoreClient = null;
-    }
-    // syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough to 
close it once.
-    syncMetaStoreClient = null;
-    if (owner != null) {
-      owner = null;
+  public void close(boolean forceClose) {
+    if (allowClose() || forceClose) {
+      LOG.debug("Closing current thread's connection to Hive Metastore.");
+      if (metaStoreClient != null) {
+        metaStoreClient.close();
+        metaStoreClient = null;
+      }
+      // syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough 
to close it once.
+      syncMetaStoreClient = null;
+      if (owner != null) {
+        owner = null;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/99d25f02/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java 
b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 3d24884..f975199 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -299,6 +299,11 @@ public class SQLOperation extends 
ExecuteStatementOperation {
     private BackgroundWork(UserGroupInformation currentUGI,
         Hive parentHive,
         SessionState parentSessionState, boolean asyncPrepare) {
+      // Note: parentHive can be shared by multiple threads and so it should 
be protected from any
+      // thread closing metastore connections when some other thread still 
accessing it. So, it is
+      // expected that allowClose flag in parentHive is set to false by caller 
and it will be caller's
+      // responsibility to close it explicitly with forceClose flag as true.
+      // Shall refer to sessionHive in HiveSessionImpl.java for the usage.
       this.currentUGI = currentUGI;
       this.parentHive = parentHive;
       this.parentSessionState = parentSessionState;
@@ -310,7 +315,8 @@ public class SQLOperation extends ExecuteStatementOperation 
{
       PrivilegedExceptionAction<Object> doAsAction = new 
PrivilegedExceptionAction<Object>() {
         @Override
         public Object run() throws HiveSQLException {
-          Hive.set(parentHive, false);
+          assert (!parentHive.allowClose());
+          Hive.set(parentHive);
           // TODO: can this result in cross-thread reuse of session state?
           SessionState.setCurrentSessionState(parentSessionState);
           PerfLogger.setPerfLogger(SessionState.getPerfLogger());
@@ -328,13 +334,11 @@ public class SQLOperation extends 
ExecuteStatementOperation {
             LOG.error("Error running hive query: ", e);
           } finally {
             LogUtils.unregisterLoggingContext();
-            Hive hiveDb = Hive.getThreadLocal();
-            if (hiveDb != null && hiveDb != parentHive) {
-              // If new hive object is created  by the child thread, then we 
need to close it as it might
-              // have created a hms connection. Call Hive.closeCurrent() that 
closes the HMS connection, causes
-              // HMS connection leaks otherwise.
-              Hive.closeCurrent();
-            }
+
+            // If new hive object is created  by the child thread, then we 
need to close it as it might
+            // have created a hms connection. Call Hive.closeCurrent() that 
closes the HMS connection, causes
+            // HMS connection leaks otherwise.
+            Hive.closeCurrent();
           }
           return null;
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/99d25f02/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java 
b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 0018f68..a8bf876 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -185,11 +185,10 @@ public class HiveSessionImpl implements HiveSession {
       LOG.error(msg, e);
       throw new HiveSQLException(msg, e);
     }
-    try {
-      sessionHive = Hive.get(getHiveConf());
-    } catch (HiveException e) {
-      throw new HiveSQLException("Failed to get metastore connection", e);
-    }
+
+    // Set sessionHive object created based on sessionConf.
+    setSessionHive();
+
     // Process global init file: .hiverc
     processGlobalInitFile();
     // Set fetch size in session conf map
@@ -237,6 +236,28 @@ public class HiveSessionImpl implements HiveSession {
     }
   }
 
+  /**
+   * Sets sessionHive object created based on sessionConf.
+   * @throws HiveSQLException
+   */
+  private void setSessionHive() throws HiveSQLException {
+    Hive newSessionHive;
+    try {
+      newSessionHive = Hive.get(getHiveConf());
+
+      // HMS connections from sessionHive shouldn't be closed by any query 
execution thread when it
+      // recreates the Hive object. It is allowed to be closed only when 
session is closed/released.
+      newSessionHive.setAllowClose(false);
+    } catch (HiveException e) {
+      throw new HiveSQLException("Failed to get metastore connection", e);
+    }
+
+    // The previous sessionHive object might still be referred by any async 
query execution thread.
+    // So, it shouldn't be closed here explicitly. Anyways, Hive object will 
auto-close HMS connection
+    // when it is garbage collected. So, it is safe to just overwrite 
sessionHive here.
+    sessionHive = newSessionHive;
+  }
+
   private void processGlobalInitFile() {
     IHiveFileProcessor processor = new GlobalHivercFileProcessor();
 
@@ -402,7 +423,20 @@ public class HiveSessionImpl implements HiveSession {
     }
     // set the thread name with the logging prefix.
     sessionState.updateThreadName();
-    Hive.set(sessionHive);
+
+    // If the thread local Hive is different from sessionHive, it means, the 
previous query execution in
+    // master thread has re-created Hive object due to changes in MS related 
configurations in sessionConf.
+    // So, it is necessary to reset sessionHive object based on new 
sessionConf. Here, we cannot,
+    // directly set sessionHive with thread local Hive because if the previous 
command was REPL LOAD, then
+    // the config changes lives only within command execution not in session 
level.
+    // So, the safer option is to invoke Hive.get() which decides if to reuse 
Thread local Hive or re-create it.
+    if (Hive.getThreadLocal() != sessionHive) {
+      try {
+        setSessionHive();
+      } catch (HiveSQLException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
   /**
@@ -777,12 +811,20 @@ public class HiveSessionImpl implements HiveSession {
       }
       if (sessionHive != null) {
         try {
-          Hive.closeCurrent();
+          sessionHive.close(true);
         } catch (Throwable t) {
           LOG.warn("Error closing sessionHive", t);
         }
         sessionHive = null;
       }
+      try {
+        // The thread local Hive in master thread can be different from 
sessionHive if any query
+        // execution from master thread resets it to new Hive object due to 
changes in sessionConf.
+        // So, need to close it as well. If it is same as sessionHive, then it 
is just no-op.
+        Hive.closeCurrent();
+      } catch (Throwable t) {
+        LOG.warn("Error closing thread local Hive", t);
+      }
       release(true, false);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/99d25f02/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
----------------------------------------------------------------------
diff --git 
a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
 
b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
index 5655458..be8d70b 100644
--- 
a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
+++ 
b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
@@ -280,6 +280,7 @@ public class TestSessionManagerMetrics {
       @Override
       public void run() {
         try {
+          Hive.set(session.getSessionHive());
           OperationHandle handle = session.getTables("catalog", "schema", 
"table", null);
           session.closeOperation(handle);
         } catch (Exception e) {
@@ -334,6 +335,7 @@ public class TestSessionManagerMetrics {
       @Override
       public void run() {
         try {
+          Hive.set(session.getSessionHive());
           OperationHandle handle = session.getTables("catalog", "schema", 
"table", null);
           session.closeOperation(handle);
         } catch (Exception e) {

Reply via email to