This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new db7d59a11 fix: Fix Spark SQL AQE exchange reuse test failures (#1811)
db7d59a11 is described below

commit db7d59a1121aa04bdfa5edcce675fda7bd678d77
Author: B Vadlamani <v_vadlam...@apple.com>
AuthorDate: Mon Jun 2 08:28:17 2025 -0700

    fix: Fix Spark SQL AQE exchange reuse test failures (#1811)
---
 dev/diffs/3.5.5.diff                               | 34 ++--------------------
 .../rules/EliminateRedundantTransitions.scala      |  3 ++
 .../shuffle/CometShuffleExchangeExec.scala         |  3 +-
 3 files changed, 6 insertions(+), 34 deletions(-)

diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff
index 1dce8b247..269840989 100644
--- a/dev/diffs/3.5.5.diff
+++ b/dev/diffs/3.5.5.diff
@@ -355,7 +355,7 @@ index f32b32ffc5a..447d7c6416e 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..bd2e5ef267e 100644
+index f33432ddb6f..fe9f74ff8f1 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -376,37 +376,7 @@ index f33432ddb6f..bd2e5ef267e 100644
        case _ => Nil
      }
    }
-@@ -755,7 +759,8 @@ abstract class DynamicPartitionPruningSuiteBase
-     }
-   }
- 
--  test("partition pruning in broadcast hash joins") {
-+  test("partition pruning in broadcast hash joins",
-+    IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) {
-     Given("disable broadcast pruning and disable subquery duplication")
-     withSQLConf(
-       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
-@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
-   }
- 
-   test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
--    "canonicalization and exchange reuse") {
-+    "canonicalization and exchange reuse",
-+    IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) {
-     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
-       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
-         val df = sql(
-@@ -1454,7 +1460,8 @@ abstract class DynamicPartitionPruningSuiteBase
-     }
-   }
- 
--  test("SPARK-35568: Fix UnsupportedOperationException when enabling both AQE 
and DPP") {
-+  test("SPARK-35568: Fix UnsupportedOperationException when enabling both AQE 
and DPP",
-+    IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) {
-     val df = sql(
-       """
-         |SELECT s.store_id, f.product_id
-@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
diff --git 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
index d8e2a2536..ecc0823d6 100644
--- 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
+++ 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
@@ -63,6 +63,9 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
 
   private def _apply(plan: SparkPlan): SparkPlan = {
     val eliminatedPlan = plan transformUp {
+      case ColumnarToRowExec(shuffleExchangeExec: CometShuffleExchangeExec)
+          if (plan.conf.adaptiveExecutionEnabled) =>
+        shuffleExchangeExec
       case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) =>
         if (sparkToColumnar.child.supportsColumnar) {
           // For Spark Columnar to Comet Columnar, we should keep the 
ColumnarToRowExec
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index 1abd41924..df67e7b7f 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -164,8 +164,7 @@ case class CometShuffleExchangeExec(
     }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException(
-      "CometShuffleExchangeExec.doExecute should not be executed.")
+    ColumnarToRowExec(this).doExecute()
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to