This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new b2a25bec2912 [SPARK-49333][SQL] Shutdown timeout thread while cleaning
up SparkExecuteStatementOperation
b2a25bec2912 is described below
commit b2a25bec2912a8cd7cc624c567ec3215f02a7ffa
Author: Yuming Wang <[email protected]>
AuthorDate: Fri Aug 23 08:21:38 2024 +0800
[SPARK-49333][SQL] Shutdown timeout thread while cleaning up
SparkExecuteStatementOperation
### What changes were proposed in this pull request?
Shutdown timeout thread while cleaning up `SparkExecuteStatementOperation`.
### Why are the changes needed?
Avoid Spark driver memory leak if query timeout is configured. For example,
there are 4127 `SparkExecuteStatementOperation` instances in the Spark driver:
```
jmap -histo 398 | grep SparkExecuteStatementOperation
308: 4127 1122544
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation
563: 4127 363176
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$ErrRowCountType$
876: 4127 132064
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1
2101: 333 7992
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$5
3106: 32 1024
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2
3303: 32 768
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$3755/0x00000008021fe800
3304: 32 768
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3
3961: 9 360
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5398/0x0000000802523900
3962: 9 360
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5399/0x0000000802523bd8
20239: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$
20240: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$$Lambda$5397/0x000000080251e180
20241: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$11228/0x000000080306ba38
20242: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$11230/0x00000008032962d8
20243: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$11231/0x00000008032966b8
20244: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5363/0x0000000802509470
20245: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5367/0x000000080250a618
20246: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$6475/0x00000008026fda40
20247: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$7355/0x00000008028aa180
20248: 1 16
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$7356/0x00000008028aa560
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47826 from wangyum/SPARK-49333.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit 853731dc5731e4f185d7364c500cc633cfd52d07)
Signed-off-by: Yuming Wang <[email protected]>
---
.../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e6b4c70bb395..47ec242c9da9 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.util.{Collections, Map => JMap}
-import java.util.concurrent.{Executors, RejectedExecutionException, TimeUnit}
+import java.util.concurrent.{Executors, RejectedExecutionException,
ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -60,6 +60,7 @@ private[hive] class SparkExecuteStatementOperation(
queryTimeout
}
}
+ private var timeoutExecutor: ScheduledExecutorService = _
private val forceCancel =
sqlContext.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL)
@@ -132,7 +133,7 @@ private[hive] class SparkExecuteStatementOperation(
setHasResultSet(true) // avoid no resultset for async run
if (timeout > 0) {
- val timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
+ timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
try {
@@ -306,6 +307,11 @@ private[hive] class SparkExecuteStatementOperation(
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
+ // Shutdown the timeout thread if any, while cleaning up this operation
+ if (timeoutExecutor != null &&
+ getStatus.getState != OperationState.TIMEDOUT &&
getStatus.getState.isTerminal) {
+ timeoutExecutor.shutdownNow()
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]