This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8940dad7e33d [SPARK-53469][SQL] Ability to cleanup shuffle in Thrift 
server
8940dad7e33d is described below

commit 8940dad7e33d6c2619b3ec473dfc491931c624f0
Author: Karuppayya Rajendran <[email protected]>
AuthorDate: Thu Nov 27 11:34:40 2025 +0800

    [SPARK-53469][SQL] Ability to cleanup shuffle in Thrift server
    
    ### What changes were proposed in this pull request?
    We have the ability top clean up shuffle in 
`spark.sql.classic.shuffleDependency.fileCleanup.enabled`.
    Honoring this in Thrift server and cleaning up shuffle.
    Related PR comment 
[here](https://github.com/apache/spark/pull/51458#discussion_r2305783667)
    
    ### Why are the changes needed?
    This is to bring the behavior in par with other modes of sql 
execution(classic, connect)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    NA
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52213 from karuppayya/SPARK-53469.
    
    Authored-by: Karuppayya Rajendran <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala      |  8 ++++++++
 .../spark/sql/hive/thriftserver/SparkSQLDriver.scala | 20 +++++++++++++++++---
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8b50abbe4052..2f7706c859ba 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3822,6 +3822,14 @@ object SQLConf {
       .version("4.1.0")
       .fallbackConf(SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)
 
+  val THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED =
+    buildConf("spark.sql.thriftserver.shuffleDependency.fileCleanup.enabled")
+      .doc("When enabled, shuffle files will be cleaned up at the end of 
Thrift server " +
+        "SQL executions.")
+      .version("4.2.0")
+      .booleanConf
+      .createWithDefault(Utils.isTesting)
+
   val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
     buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
       .internal()
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 8b9b7352fdca..7a220d516757 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -30,7 +30,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.COMMAND
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.CommandResult
-import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.execution.{QueryExecution, 
QueryExecutionException, SQLExecution}
 import org.apache.spark.sql.execution.HiveResult.hiveResultString
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
@@ -67,8 +66,23 @@ private[hive] class SparkSQLDriver(val sparkSession: 
SparkSession = SparkSQLEnv.
         new VariableSubstitution().substitute(command)
       }
       sparkSession.sparkContext.setJobDescription(substitutorCommand)
-      val execution = 
sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan)
-      // The SQL command has been executed above via `executePlan`, therefore 
we don't need to
+
+      val logicalPlan = 
sparkSession.sessionState.sqlParser.parsePlan(substitutorCommand)
+      val conf = sparkSession.sessionState.conf
+
+      val shuffleCleanupMode =
+        if 
(conf.getConf(SQLConf.THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)) {
+          org.apache.spark.sql.execution.RemoveShuffleFiles
+        } else {
+          org.apache.spark.sql.execution.DoNotCleanup
+        }
+
+      val execution = new QueryExecution(
+        sparkSession.asInstanceOf[org.apache.spark.sql.classic.SparkSession],
+        logicalPlan,
+        shuffleCleanupMode = shuffleCleanupMode)
+
+      // the above execution already has an execution ID, therefore we don't 
need to
       // wrap it again with a new execution ID when getting Hive result.
       execution.logical match {
         case _: CommandResult =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to