This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new a051253774 [KYUUBI #6843] Fix 'query-timeout-thread' thread leak
a051253774 is described below

commit a05125377425ced017c58997f0aef37b49efa678
Author: liupeiyue <[email protected]>
AuthorDate: Fri Dec 27 18:02:16 2024 +0800

    [KYUUBI #6843] Fix 'query-timeout-thread' thread leak
    
    ### Why are the changes needed?
    
    see https://github.com/apache/kyuubi/issues/6843
    
    If the session manager's ThreadPoolExecutor refuses to execute 
asyncOperation,   then we need to shut down the query-timeout-thread in the 
catch
    
    ### How was this patch tested?
    
     1 Use jstack to view threads on the long-lived engine side
    
![image](https://github.com/user-attachments/assets/95d3a897-001d-4250-bf13-172b6997021b)
    
     2  Wait for all SQL statements in the engine to finish executing, and then 
use stack to check the number of query-timeout-thread threads, which should be 
empty.
    
![image](https://github.com/user-attachments/assets/0afbc026-7dd3-4594-afd2-92a5ef23f6cb)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    NO
    
    Closes #6844 from ASiegeLion/master.
    
    Closes #6843
    
    9107a300e [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak
    4b3417f21 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak
    ef1f66bb5 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak
    9e1a015f6 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak
    78a9fde09 [liupeiyue] [KYUUBI #6843] FIX 'query-timeout-thread' thread leak
    
    Authored-by: liupeiyue <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala  | 1 +
 .../scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala   | 1 +
 .../scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala    | 1 +
 .../org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala      | 1 +
 .../org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala      | 1 +
 .../src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala     | 1 +
 6 files changed, 6 insertions(+)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index df067a888c..95457ecaa7 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -163,6 +163,7 @@ abstract class FlinkOperation(session: Session) extends 
AbstractOperation(sessio
           val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
           setOperationException(ke)
           setState(OperationState.ERROR)
+          shutdownTimeoutMonitor()
           throw ke
         }
       }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index a3e090d232..6cf525924b 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -127,6 +127,7 @@ class ExecutePython(
           val ke =
             KyuubiSQLException("Error submitting python in background", 
rejected)
           setOperationException(ke)
+          shutdownTimeoutMonitor()
           throw ke
       }
     } else {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
index e8335e549e..7db33f7668 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -154,6 +154,7 @@ class ExecuteScala(
           val ke =
             KyuubiSQLException("Error submitting scala in background", 
rejected)
           setOperationException(ke)
+          shutdownTimeoutMonitor()
           throw ke
       }
     } else {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 53f02b2fbf..7cb2dee365 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -124,6 +124,7 @@ class ExecuteStatement(
           val ke =
             KyuubiSQLException("Error submitting query in background, query 
rejected", rejected)
           setOperationException(ke)
+          shutdownTimeoutMonitor()
           throw ke
       }
     } else {
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index 4f1b42e1d1..ad42d9b05a 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -76,6 +76,7 @@ class ExecuteStatement(
           val ke =
             KyuubiSQLException("Error submitting query in background, query 
rejected", rejected)
           setOperationException(ke)
+          shutdownTimeoutMonitor()
           throw ke
       }
     } else {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index a543bddb6c..9de1dfb2b9 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -95,6 +95,7 @@ abstract class KyuubiOperation(session: Session) extends 
AbstractOperation(sessi
           }
           setOperationException(ke)
           setState(OperationState.ERROR)
+          shutdownTimeoutMonitor()
           throw ke
         }
       }

Reply via email to