This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c11078b  [SPARK-32034][SQL] Port HIVE-14817: Shutdown the 
SessionManager timeoutChecker thread properly upon shutdown
c11078b is described below

commit c11078b93af09b61e57e4a653d3a33494725cc1d
Author: Kent Yao <[email protected]>
AuthorDate: Sun Jun 21 16:28:00 2020 -0700

    [SPARK-32034][SQL] Port HIVE-14817: Shutdown the SessionManager 
timeoutChecker thread properly upon shutdown
    
    ### What changes were proposed in this pull request?
    
    This PR port https://issues.apache.org/jira/browse/HIVE-14817 for spark 
thrift server.
    
    ### Why are the changes needed?
    
    When stopping the HiveServer2, the non-daemon thread stops the server from 
terminating
    
    ```sql
    "HiveServer2-Background-Pool: Thread-79" #79 prio=5 os_prio=31 
tid=0x00007fde26138800 nid=0x13713 waiting on condition [0x0000700010c32000]
       java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at 
org.apache.hive.service.cli.session.SessionManager$1.sleepInterval(SessionManager.java:178)
        at 
org.apache.hive.service.cli.session.SessionManager$1.run(SessionManager.java:156)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    ```
    
    Here is an example to reproduce:
    
https://github.com/yaooqinn/kyuubi/blob/master/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/spark/SparkSQLEngineApp.scala
    
    Also, it causes issues as HIVE-14817 described which
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    
    Passing Jenkins
    
    Closes #28870 from yaooqinn/SPARK-32034.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 9f8e15bb2e2189812ee34e3e64baede0d799ba76)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../hive/service/cli/session/SessionManager.java   | 32 ++++++++++++++++------
 .../hive/service/cli/session/SessionManager.java   | 32 ++++++++++++++++------
 2 files changed, 48 insertions(+), 16 deletions(-)

diff --git 
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
 
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
index 859f9c8..ad6fb3b 100644
--- 
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
+++ 
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -148,14 +148,20 @@ public class SessionManager extends CompositeService {
     }
   }
 
+  private final Object timeoutCheckerLock = new Object();
+
   private void startTimeoutChecker() {
     final long interval = Math.max(checkInterval, 3000L);  // minimum 3 seconds
-    Runnable timeoutChecker = new Runnable() {
+    final Runnable timeoutChecker = new Runnable() {
       @Override
       public void run() {
-        for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+        sleepFor(interval);
+        while (!shutdown) {
           long current = System.currentTimeMillis();
           for (HiveSession session : new 
ArrayList<HiveSession>(handleToSession.values())) {
+            if (shutdown) {
+              break;
+            }
             if (sessionTimeout > 0 && session.getLastAccessTime() + 
sessionTimeout <= current
                 && (!checkOperation || session.getNoOperationTime() > 
sessionTimeout)) {
               SessionHandle handle = session.getSessionHandle();
@@ -170,24 +176,34 @@ public class SessionManager extends CompositeService {
               session.closeExpiredOperations();
             }
           }
+          sleepFor(interval);
         }
       }
 
-      private void sleepInterval(long interval) {
-        try {
-          Thread.sleep(interval);
-        } catch (InterruptedException e) {
-          // ignore
+      private void sleepFor(long interval) {
+        synchronized (timeoutCheckerLock) {
+          try {
+            timeoutCheckerLock.wait(interval);
+          } catch (InterruptedException e) {
+            // Ignore, and break.
+          }
         }
       }
     };
     backgroundOperationPool.execute(timeoutChecker);
   }
 
+  private void shutdownTimeoutChecker() {
+    shutdown = true;
+    synchronized (timeoutCheckerLock) {
+      timeoutCheckerLock.notify();
+    }
+  }
+
   @Override
   public synchronized void stop() {
     super.stop();
-    shutdown = true;
+    shutdownTimeoutChecker();
     if (backgroundOperationPool != null) {
       backgroundOperationPool.shutdown();
       long timeout = hiveConf.getTimeVar(
diff --git 
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
 
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
index 49221b1..5a381d1 100644
--- 
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
+++ 
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -148,14 +148,20 @@ public class SessionManager extends CompositeService {
     }
   }
 
+  private final Object timeoutCheckerLock = new Object();
+
   private void startTimeoutChecker() {
     final long interval = Math.max(checkInterval, 3000L);  // minimum 3 seconds
-    Runnable timeoutChecker = new Runnable() {
+    final Runnable timeoutChecker = new Runnable() {
       @Override
       public void run() {
-        for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+        sleepFor(interval);
+        while (!shutdown) {
           long current = System.currentTimeMillis();
           for (HiveSession session : new 
ArrayList<HiveSession>(handleToSession.values())) {
+            if (shutdown) {
+              break;
+            }
             if (sessionTimeout > 0 && session.getLastAccessTime() + 
sessionTimeout <= current
                 && (!checkOperation || session.getNoOperationTime() > 
sessionTimeout)) {
               SessionHandle handle = session.getSessionHandle();
@@ -170,24 +176,34 @@ public class SessionManager extends CompositeService {
               session.closeExpiredOperations();
             }
           }
+          sleepFor(interval);
         }
       }
 
-      private void sleepInterval(long interval) {
-        try {
-          Thread.sleep(interval);
-        } catch (InterruptedException e) {
-          // ignore
+      private void sleepFor(long interval) {
+        synchronized (timeoutCheckerLock) {
+          try {
+            timeoutCheckerLock.wait(interval);
+          } catch (InterruptedException e) {
+            // Ignore, and break.
+          }
         }
       }
     };
     backgroundOperationPool.execute(timeoutChecker);
   }
 
+  private void shutdownTimeoutChecker() {
+    shutdown = true;
+    synchronized (timeoutCheckerLock) {
+      timeoutCheckerLock.notify();
+    }
+  }
+
   @Override
   public synchronized void stop() {
     super.stop();
-    shutdown = true;
+    shutdownTimeoutChecker();
     if (backgroundOperationPool != null) {
       backgroundOperationPool.shutdown();
       long timeout = hiveConf.getTimeVar(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to