This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 59058c65457 HIVE-27201: Inconsistency between session Hive and 
thread-local Hive may cause HS2 deadlock (Zhihua Deng, reviewed by Sai Hemanth 
Gantasala, Denys Kuzmenko)
59058c65457 is described below

commit 59058c65457fb7ab9d8575a555034e6633962661
Author: dengzh <[email protected]>
AuthorDate: Tue Apr 25 16:27:50 2023 +0800

    HIVE-27201: Inconsistency between session Hive and thread-local Hive may 
cause HS2 deadlock (Zhihua Deng, reviewed by Sai Hemanth Gantasala, Denys 
Kuzmenko)
    
    Closes #4180
---
 .../hive/service/server/TestHS2SessionHive.java    | 214 +++++++++++++++++++++
 .../hive/service/cli/operation/SQLOperation.java   |   6 +-
 .../hive/service/cli/session/HiveSessionImpl.java  |  22 ++-
 3 files changed, 232 insertions(+), 10 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2SessionHive.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2SessionHive.java
new file mode 100644
index 00000000000..022802588d3
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2SessionHive.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.rpc.thrift.TCLIService;
+import org.apache.hive.service.rpc.thrift.TGetTablesReq;
+import org.apache.hive.service.rpc.thrift.TGetTablesResp;
+import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for HIVE-27201
+ */
+public class TestHS2SessionHive {
+
+  private static MiniHS2 miniHS2 = null;
+
+  public static class DummyFilterHook extends DefaultMetaStoreFilterHookImpl {
+    private static Map<String, Hive> threadHiveMap = new ConcurrentHashMap<>();
+    private static Set<String> totalThreadCalls = 
Collections.synchronizedSet(new HashSet<>());
+
+    public DummyFilterHook(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    public List<TableMeta> filterTableMetas(String catName, String dbName, 
List<TableMeta> tableMetas)
+        throws MetaException {
+      try {
+        Assert.assertNotNull(SessionState.get());
+        totalThreadCalls.add(Thread.currentThread().getName());
+        synchronized (totalThreadCalls) {
+          totalThreadCalls.notifyAll();
+        }
+        synchronized (threadHiveMap) {
+          threadHiveMap.wait();
+        }
+        Hive localHive = Hive.get();
+        threadHiveMap.put(Thread.currentThread().getName(), localHive);
+        for (TableMeta tableMeta : tableMetas) {
+          localHive.getTable(tableMeta.getDbName(), tableMeta.getTableName());
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return super.filterTableMetas(catName, dbName, tableMetas);
+    }
+  }
+
+  @Test
+  public void testSessionHive() throws Exception {
+    Client client1 = newClient(), client2 = newClient();
+    Task task1 = new Task("task1", client1.client, client2.sessionHandle),
+         task2 = new Task("task2", client2.client, client1.sessionHandle);
+
+    while (DummyFilterHook.totalThreadCalls.size() < 2) {
+      synchronized (DummyFilterHook.totalThreadCalls) {
+        DummyFilterHook.totalThreadCalls.wait();
+      }
+    }
+    Thread.sleep(100);
+    synchronized (DummyFilterHook.threadHiveMap) {
+      DummyFilterHook.threadHiveMap.notifyAll();
+    }
+    Thread.sleep(3000L);
+    // Check to see if there are any deadlocks
+    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+    Assert.assertNull(threadBean.findDeadlockedThreads());
+    task1.join();
+    task2.join();
+    Assert.assertTrue(task1.result);
+    Assert.assertTrue(task2.result);
+  }
+
+  private class Task extends Thread {
+    TCLIService.Iface client;
+    TSessionHandle sessionHandle;
+    boolean result = false;
+    Task(String threadName,
+         TCLIService.Iface client,
+         TSessionHandle sessionHandle) {
+      this.client = client;
+      this.sessionHandle = sessionHandle;
+      this.setName(threadName);
+      this.setDaemon(true);
+      this.start();
+    }
+
+    @Override
+    public void run() {
+      try {
+        TGetTablesReq getTablesReq = new TGetTablesReq(sessionHandle);
+        getTablesReq.setCatalogName("hive");
+        getTablesReq.setSchemaName("*");
+        getTablesReq.setTableName("*");
+        TGetTablesResp resp = client.GetTables(getTablesReq);
+        result = resp.getStatus().getStatusCode() == 
TStatusCode.SUCCESS_STATUS;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static class Client {
+    TCLIService.Iface client;
+    TSessionHandle sessionHandle;
+  }
+
+  private Client newClient() throws Exception {
+    Connection conn = DriverManager.
+        getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), 
"");
+    Field clientField = HiveConnection.class.getDeclaredField("client");
+    Field sesHanField = HiveConnection.class.getDeclaredField("sessHandle");
+    clientField.setAccessible(true);
+    sesHanField.setAccessible(true);
+    try (Statement statement = conn.createStatement()) {
+      statement.execute("select * from sessionhive1");
+    }
+    Client client = new Client();
+    client.client = (TCLIService.Iface) clientField.get(conn);
+    client.sessionHandle = (TSessionHandle) sesHanField.get(conn);
+    return client;
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    MiniHS2.cleanupLocalDir();
+    try {
+      HiveConf conf = new HiveConf();
+      conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+      conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL, false);
+      conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+      MetastoreConf.setVar(conf, MetastoreConf.ConfVars.FILTER_HOOK, 
DummyFilterHook.class.getName());
+      MiniHS2.Builder builder = new MiniHS2.Builder()
+          .withConf(conf)
+          .withRemoteMetastore()
+          .cleanupLocalDirOnStartup(false);
+      miniHS2 = builder.build();
+      miniHS2.start(new HashMap<>());
+    } catch (Exception e) {
+      System.out.println("Unable to start MiniHS2: " + e);
+      throw e;
+    }
+
+    miniHS2.getHiveConf().setVar(HiveConf.ConfVars.METASTOREURIS, 
"thrift://localhost:" + miniHS2.getHmsPort());
+
+    try (Connection conn = DriverManager.
+        getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), 
"");
+        Statement statement = conn.createStatement()) {
+      statement.execute("create table sessionhive1(a int, b string)");
+      statement.execute("create table sessionhive2(a int, b string)");
+      statement.execute("create table sessionhive3(a int, b string)");
+    } catch (Exception e) {
+      System.out.println("Unable to open default connections to MiniHS2: " + 
e);
+      throw e;
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if ((miniHS2 != null) && (miniHS2.isStarted())) {
+      miniHS2.stop();
+    }
+    if (miniHS2 != null) {
+      miniHS2.cleanup();
+    }
+    MiniHS2.cleanupLocalDir();
+  }
+
+}
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java 
b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index b4550905fed..4aa142f52f3 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -319,11 +319,7 @@ public class SQLOperation extends 
ExecuteStatementOperation {
         @Override
         public Object run() throws HiveSQLException {
           assert (!parentHive.allowClose());
-          try {
-            Hive.set(parentSessionState.getHiveDb());
-          } catch (HiveException e) {
-            throw new HiveSQLException(e);
-          }
+          Hive.set(parentHive);
           // TODO: can this result in cross-thread reuse of session state?
           SessionState.setCurrentSessionState(parentSessionState);
           PerfLogger.setPerfLogger(SessionState.getPerfLogger());
diff --git 
a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java 
b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 8eacf138b8b..c14a041ba01 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -401,11 +401,11 @@ public class HiveSessionImpl implements HiveSession {
     // set the thread name with the logging prefix.
     sessionState.updateThreadName();
 
-    try {
-      setSessionHive();
-    } catch (HiveSQLException e) {
-      throw new RuntimeException(e);
-    }
+    // If Hive.get() is being shared across different sessions,
+    // sessionHive and Hive.get() may be different, in such case,
+    // the risk of deadlock on HiveMetaStoreClient#SynchronizedHandler can 
happen.
+    // Refresh the thread-local Hive to avoid the deadlock.
+    Hive.set(sessionHive);
   }
 
   /**
@@ -432,6 +432,18 @@ public class HiveSessionImpl implements HiveSession {
       sessionState.resetThreadName();
     }
 
+    // We have already set the thread-local Hive belonging to the current 
session,
+    // if the thread-local Hive has been changed/updated after running the 
operation,
+    // the Hive after should belong to the same session, and we should update 
the sessionHive.
+    // The thread-local hive would be recreated only when the underlying
+    // HiveMetaStoreClient is incompatible with the newest session conf.
+    Hive localHive = Hive.getThreadLocal();
+    if (localHive != null && localHive != sessionHive) {
+      // The previous sessionHive would be GC'ed finally, or should we force 
close it?
+      sessionHive = localHive;
+      sessionHive.setAllowClose(false);
+    }
+
     SessionState.detachSession();
     if (ThreadWithGarbageCleanup.currentThread() instanceof 
ThreadWithGarbageCleanup) {
       ThreadWithGarbageCleanup currentThread =

Reply via email to