This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new acbdeac79 fix: skip Comet columnar shuffle for stages with DPP scans 
(#3879)
acbdeac79 is described below

commit acbdeac797cc934ab5fe6ac51d139a3c6b1d6d8d
Author: Andy Grove <[email protected]>
AuthorDate: Thu Apr 2 11:01:58 2026 -0600

    fix: skip Comet columnar shuffle for stages with DPP scans (#3879)
    
    When a scan uses Dynamic Partition Pruning (DPP) and falls back to
    Spark, Comet was still wrapping the stage with columnar shuffle,
    creating inefficient row-to-columnar transitions:
    
      CometShuffleWriter → CometRowToColumnar → SparkFilter →
        SparkColumnarToRow → SparkScan
    
    This adds a check in columnarShuffleSupported() that walks the child
    plan tree to detect FileSourceScanExec nodes with dynamic pruning
    filters. When found, the shuffle is not converted to Comet, allowing
    the entire stage to fall back to Spark.
---
 .../shuffle/CometShuffleExchangeExec.scala         | 23 ++++++++++++-
 .../org/apache/comet/exec/CometExecSuite.scala     | 38 +++++++++++++++++++++-
 2 files changed, 59 insertions(+), 2 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index d65a6b21f..df2dca033 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, 
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, 
Expression, PlanExpression, UnsafeProjection, UnsafeRow}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.catalyst.plans.physical._
@@ -454,6 +454,11 @@ object CometShuffleExchangeExec
       return false
     }
 
+    if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) 
{
+      withInfo(s, "Stage contains a scan with Dynamic Partition Pruning")
+      return false
+    }
+
     if (!isCometJVMShuffleMode(s.conf)) {
       withInfo(s, "Comet columnar shuffle not enabled")
       return false
@@ -546,6 +551,22 @@ object CometShuffleExchangeExec
     }
   }
 
+  /**
+   * Returns true if the stage (the subtree rooted at this shuffle) contains a 
scan with Dynamic
+   * Partition Pruning (DPP). When DPP is present, the scan falls back to 
Spark, and wrapping the
+   * stage with Comet shuffle creates inefficient row-to-columnar transitions.
+   */
+  private def stageContainsDPPScan(s: ShuffleExchangeExec): Boolean = {
+    def isDynamicPruningFilter(e: Expression): Boolean =
+      e.exists(_.isInstanceOf[PlanExpression[_]])
+
+    s.child.exists {
+      case scan: FileSourceScanExec =>
+        scan.partitionFilters.exists(isDynamicPruningFilter)
+      case _ => false
+    }
+  }
+
   def isCometShuffleEnabledWithInfo(op: SparkPlan): Boolean = {
     if (!COMET_EXEC_SHUFFLE_ENABLED.get(op.conf)) {
       withInfo(
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index aff181626..39d34a699 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -139,8 +139,44 @@ class CometExecSuite extends CometTestBase {
           val (_, cometPlan) = checkSparkAnswer(df)
           val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
           assert(infos.contains("Dynamic Partition Pruning is not supported"))
+        }
+      }
+    }
+  }
+
+  test("DPP fallback avoids inefficient Comet shuffle (#3874)") {
+    withTempDir { path =>
+      val factPath = s"${path.getAbsolutePath}/fact.parquet"
+      val dimPath = s"${path.getAbsolutePath}/dim.parquet"
+      withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
+        val one_day = 24 * 60 * 60000
+        val fact = Range(0, 100)
+          .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * 
one_day), i.toString))
+          .toDF("fact_id", "fact_date", "fact_str")
+        fact.write.partitionBy("fact_date").parquet(factPath)
+        val dim = Range(0, 10)
+          .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * 
one_day), i.toString))
+          .toDF("dim_id", "dim_date", "dim_str")
+        dim.write.parquet(dimPath)
+      }
+
+      // Force sort-merge join to get a shuffle exchange above the DPP scan
+      Seq("parquet").foreach { v1List =>
+        withSQLConf(
+          SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
+          SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+          CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
+          spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact2")
+          spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim2")
+          val df =
+            spark.sql(
+              "select * from dpp_fact2 join dpp_dim2 on fact_date = dim_date 
where dim_id > 7")
+          val (_, cometPlan) = checkSparkAnswer(df)
 
-          assert(infos.contains("Comet accelerated"))
+          // Verify no CometShuffleExchangeExec wraps the DPP stage
+          assert(
+            !cometPlan.toString().contains("CometColumnarShuffle"),
+            "Should not use Comet columnar shuffle for stages with DPP scans")
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to