This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new b2a25bec2912 [SPARK-49333][SQL] Shutdown timeout thread while cleaning 
up SparkExecuteStatementOperation
b2a25bec2912 is described below

commit b2a25bec2912a8cd7cc624c567ec3215f02a7ffa
Author: Yuming Wang <[email protected]>
AuthorDate: Fri Aug 23 08:21:38 2024 +0800

    [SPARK-49333][SQL] Shutdown timeout thread while cleaning up 
SparkExecuteStatementOperation
    
    ### What changes were proposed in this pull request?
    
    Shutdown timeout thread while cleaning up `SparkExecuteStatementOperation`.
    
    ### Why are the changes needed?
    
    Avoid Spark driver memory leak if query timeout is configured. For example, 
there are 4127 `SparkExecuteStatementOperation` instances in the Spark driver:
    ```
    jmap  -histo 398 | grep SparkExecuteStatementOperation
    
     308:          4127        1122544  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation
     563:          4127         363176  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$ErrRowCountType$
     876:          4127         132064  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1
    2101:           333           7992  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$5
    3106:            32           1024  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2
    3303:            32            768  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$3755/0x00000008021fe800
    3304:            32            768  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3
    3961:             9            360  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5398/0x0000000802523900
    3962:             9            360  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5399/0x0000000802523bd8
    20239:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$
    20240:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$$Lambda$5397/0x000000080251e180
    20241:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$11228/0x000000080306ba38
    20242:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$11230/0x00000008032962d8
    20243:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$11231/0x00000008032966b8
    20244:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5363/0x0000000802509470
    20245:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5367/0x000000080250a618
    20246:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$6475/0x00000008026fda40
    20247:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$7355/0x00000008028aa180
    20248:             1             16  
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$7356/0x00000008028aa560
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manual test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47826 from wangyum/SPARK-49333.
    
    Authored-by: Yuming Wang <[email protected]>
    Signed-off-by: Yuming Wang <[email protected]>
    (cherry picked from commit 853731dc5731e4f185d7364c500cc633cfd52d07)
    Signed-off-by: Yuming Wang <[email protected]>
---
 .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e6b4c70bb395..47ec242c9da9 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.security.PrivilegedExceptionAction
 import java.util.{Collections, Map => JMap}
-import java.util.concurrent.{Executors, RejectedExecutionException, TimeUnit}
+import java.util.concurrent.{Executors, RejectedExecutionException, 
ScheduledExecutorService, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
@@ -60,6 +60,7 @@ private[hive] class SparkExecuteStatementOperation(
       queryTimeout
     }
   }
+  private var timeoutExecutor: ScheduledExecutorService = _
 
   private val forceCancel = 
sqlContext.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL)
 
@@ -132,7 +133,7 @@ private[hive] class SparkExecuteStatementOperation(
     setHasResultSet(true) // avoid no resultset for async run
 
     if (timeout > 0) {
-      val timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
+      timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
       timeoutExecutor.schedule(new Runnable {
         override def run(): Unit = {
           try {
@@ -306,6 +307,11 @@ private[hive] class SparkExecuteStatementOperation(
     if (statementId != null) {
       sqlContext.sparkContext.cancelJobGroup(statementId)
     }
+    // Shutdown the timeout thread if any, while cleaning up this operation
+    if (timeoutExecutor != null &&
+      getStatus.getState != OperationState.TIMEDOUT && 
getStatus.getState.isTerminal) {
+      timeoutExecutor.shutdownNow()
+    }
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to