This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8940dad7e33d [SPARK-53469][SQL] Ability to cleanup shuffle in Thrift
server
8940dad7e33d is described below
commit 8940dad7e33d6c2619b3ec473dfc491931c624f0
Author: Karuppayya Rajendran <[email protected]>
AuthorDate: Thu Nov 27 11:34:40 2025 +0800
[SPARK-53469][SQL] Ability to cleanup shuffle in Thrift server
### What changes were proposed in this pull request?
We have the ability top clean up shuffle in
`spark.sql.classic.shuffleDependency.fileCleanup.enabled`.
Honoring this in Thrift server and cleaning up shuffle.
Related PR comment
[here](https://github.com/apache/spark/pull/51458#discussion_r2305783667)
### Why are the changes needed?
This is to bring the behavior in par with other modes of sql
execution(classic, connect)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
NA
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52213 from karuppayya/SPARK-53469.
Authored-by: Karuppayya Rajendran <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++++
.../spark/sql/hive/thriftserver/SparkSQLDriver.scala | 20 +++++++++++++++++---
2 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8b50abbe4052..2f7706c859ba 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3822,6 +3822,14 @@ object SQLConf {
.version("4.1.0")
.fallbackConf(SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)
+ val THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED =
+ buildConf("spark.sql.thriftserver.shuffleDependency.fileCleanup.enabled")
+ .doc("When enabled, shuffle files will be cleaned up at the end of
Thrift server " +
+ "SQL executions.")
+ .version("4.2.0")
+ .booleanConf
+ .createWithDefault(Utils.isTesting)
+
val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
.internal()
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 8b9b7352fdca..7a220d516757 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -30,7 +30,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.COMMAND
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.CommandResult
-import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.execution.{QueryExecution,
QueryExecutionException, SQLExecution}
import org.apache.spark.sql.execution.HiveResult.hiveResultString
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
@@ -67,8 +66,23 @@ private[hive] class SparkSQLDriver(val sparkSession:
SparkSession = SparkSQLEnv.
new VariableSubstitution().substitute(command)
}
sparkSession.sparkContext.setJobDescription(substitutorCommand)
- val execution =
sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan)
- // The SQL command has been executed above via `executePlan`, therefore
we don't need to
+
+ val logicalPlan =
sparkSession.sessionState.sqlParser.parsePlan(substitutorCommand)
+ val conf = sparkSession.sessionState.conf
+
+ val shuffleCleanupMode =
+ if
(conf.getConf(SQLConf.THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)) {
+ org.apache.spark.sql.execution.RemoveShuffleFiles
+ } else {
+ org.apache.spark.sql.execution.DoNotCleanup
+ }
+
+ val execution = new QueryExecution(
+ sparkSession.asInstanceOf[org.apache.spark.sql.classic.SparkSession],
+ logicalPlan,
+ shuffleCleanupMode = shuffleCleanupMode)
+
+ // the above execution already has an execution ID, therefore we don't
need to
// wrap it again with a new execution ID when getting Hive result.
execution.logical match {
case _: CommandResult =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]