This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 0856ab433d5c [SPARK-52741][SQL] RemoveFiles ShuffleCleanup mode doesnt work with non-adaptive execution 0856ab433d5c is described below commit 0856ab433d5cc1f7abba841dbc3ddbf4f46d4151 Author: Karuppayya Rajendran <karuppayya.rajend...@salesforce.com> AuthorDate: Fri Aug 15 16:55:02 2025 +0800 [SPARK-52741][SQL] RemoveFiles ShuffleCleanup mode doesnt work with non-adaptive execution ### What changes were proposed in this pull request? Currently, shuffle cleanup only works for adaptive execution plans. Non-adaptive execution plans are not cleaned up. Thing change cleans it. ### Why are the changes needed? - To cleanup shuffle files of non-adaptive query executions - Consistency in behavior between adaptive and non-adaptive shuffle cleanup based on the cleanup mode ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modified existing unit tests to cover this case ### Was this patch authored or co-authored using generative AI tooling? No Closes #51432 from karuppayya/SPARK-52741. Lead-authored-by: Karuppayya Rajendran <karuppayya.rajend...@salesforce.com> Co-authored-by: Karuppayya Rajendran <karuppayya1...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 337a67ff58ad91f2f86751bcb9a5e50e1de5cef2) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/SQLExecution.scala | 8 ++- .../spark/sql/execution/QueryExecutionSuite.scala | 63 +++++++++++++--------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 9dcb38f8ff10..c5c2f9bb6a6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PRE import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.SQL_EVENT_TRUNCATE_LENGTH @@ -178,8 +179,11 @@ object SQLExecution extends Logging { val shuffleIds = queryExecution.executedPlan match { case ae: AdaptiveSparkPlanExec => ae.context.shuffleIds.asScala.keys - case _ => - Iterable.empty + case nonAdaptivePlan => + nonAdaptivePlan.collect { + case exec: ShuffleExchangeLike => + exec.shuffleId + } } shuffleIds.foreach { shuffleId => queryExecution.shuffleCleanupMode match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index a64902437086..47d5ff67b840 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -327,36 +327,51 @@ class QueryExecutionSuite extends SharedSparkSession { } test("SPARK-47764: Cleanup shuffle dependencies - DoNotCleanup mode") { - val plan = spark.range(100).repartition(10).logicalPlan - val df = Dataset.ofRows(spark, plan, DoNotCleanup) - df.collect() - - val blockManager = spark.sparkContext.env.blockManager - assert(blockManager.migratableResolver.getStoredShuffles().nonEmpty) - assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty) - cleanupShuffles() + Seq(true, false).foreach { adaptiveEnabled => { + withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) { + val plan = spark.range(100).repartition(10).logicalPlan + val df = Dataset.ofRows(spark, plan, DoNotCleanup) + df.collect() + + val blockManager = spark.sparkContext.env.blockManager + assert(blockManager.migratableResolver.getStoredShuffles().nonEmpty) + assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty) + cleanupShuffles() + } + } + } } test("SPARK-47764: Cleanup shuffle dependencies - SkipMigration mode") { - val plan = spark.range(100).repartition(10).logicalPlan - val df = Dataset.ofRows(spark, plan, SkipMigration) - df.collect() - - val blockManager = spark.sparkContext.env.blockManager - assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) - assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty) - cleanupShuffles() + Seq(true, false).foreach { adaptiveEnabled => { + withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) { + val plan = spark.range(100).repartition(10).logicalPlan + val df = Dataset.ofRows(spark, plan, SkipMigration) + df.collect() + + val blockManager = spark.sparkContext.env.blockManager + assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) + assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty) + cleanupShuffles() + } + } + } } test("SPARK-47764: Cleanup shuffle dependencies - RemoveShuffleFiles mode") { - val plan = spark.range(100).repartition(10).logicalPlan - val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles) - df.collect() - - val blockManager = spark.sparkContext.env.blockManager - assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) - assert(blockManager.diskBlockManager.getAllBlocks().isEmpty) - cleanupShuffles() + Seq(true, false).foreach { adaptiveEnabled => { + withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) { + val plan = spark.range(100).repartition(10).logicalPlan + val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles) + df.collect() + + val blockManager = spark.sparkContext.env.blockManager + assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) + assert(blockManager.diskBlockManager.getAllBlocks().isEmpty) + cleanupShuffles() + } + } + } } test("SPARK-35378: Return UnsafeRow in CommandResultExecCheck execute methods") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org