This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new db7d59a11 fix: Fix Spark SQL AQE exchange reuse test failures (#1811) db7d59a11 is described below commit db7d59a1121aa04bdfa5edcce675fda7bd678d77 Author: B Vadlamani <v_vadlam...@apple.com> AuthorDate: Mon Jun 2 08:28:17 2025 -0700 fix: Fix Spark SQL AQE exchange reuse test failures (#1811) --- dev/diffs/3.5.5.diff | 34 ++-------------------- .../rules/EliminateRedundantTransitions.scala | 3 ++ .../shuffle/CometShuffleExchangeExec.scala | 3 +- 3 files changed, 6 insertions(+), 34 deletions(-) diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff index 1dce8b247..269840989 100644 --- a/dev/diffs/3.5.5.diff +++ b/dev/diffs/3.5.5.diff @@ -355,7 +355,7 @@ index f32b32ffc5a..447d7c6416e 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..bd2e5ef267e 100644 +index f33432ddb6f..fe9f74ff8f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -376,37 +376,7 @@ index f33432ddb6f..bd2e5ef267e 100644 case _ => Nil } } -@@ -755,7 +759,8 @@ abstract class DynamicPartitionPruningSuiteBase - } - } - -- test("partition pruning in broadcast hash joins") { -+ test("partition pruning in broadcast hash joins", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) { - Given("disable broadcast pruning and disable subquery duplication") - withSQLConf( - SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase - } - - test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + -- "canonicalization and exchange reuse") { -+ "canonicalization and exchange reuse", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - val df = sql( -@@ -1454,7 +1460,8 @@ abstract class DynamicPartitionPruningSuiteBase - } - } - -- test("SPARK-35568: Fix UnsupportedOperationException when enabling both AQE and DPP") { -+ test("SPARK-35568: Fix UnsupportedOperationException when enabling both AQE and DPP", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) { - val df = sql( - """ - |SELECT s.store_id, f.product_id -@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) diff --git a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala index d8e2a2536..ecc0823d6 100644 --- a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala +++ b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala @@ -63,6 +63,9 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa private def _apply(plan: SparkPlan): SparkPlan = { val eliminatedPlan = plan transformUp { + case ColumnarToRowExec(shuffleExchangeExec: CometShuffleExchangeExec) + if (plan.conf.adaptiveExecutionEnabled) => + shuffleExchangeExec case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) => if (sparkToColumnar.child.supportsColumnar) { // For Spark Columnar to Comet Columnar, we should keep the ColumnarToRowExec diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 1abd41924..df67e7b7f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -164,8 +164,7 @@ case class CometShuffleExchangeExec( } protected override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException( - "CometShuffleExchangeExec.doExecute should not be executed.") + ColumnarToRowExec(this).doExecute() } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org