This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new acbdeac79 fix: skip Comet columnar shuffle for stages with DPP scans
(#3879)
acbdeac79 is described below
commit acbdeac797cc934ab5fe6ac51d139a3c6b1d6d8d
Author: Andy Grove <[email protected]>
AuthorDate: Thu Apr 2 11:01:58 2026 -0600
fix: skip Comet columnar shuffle for stages with DPP scans (#3879)
When a scan uses Dynamic Partition Pruning (DPP) and falls back to
Spark, Comet was still wrapping the stage with columnar shuffle,
creating inefficient row-to-columnar transitions:
CometShuffleWriter → CometRowToColumnar → SparkFilter →
SparkColumnarToRow → SparkScan
This adds a check in columnarShuffleSupported() that walks the child
plan tree to detect FileSourceScanExec nodes with dynamic pruning
filters. When found, the shuffle is not converted to Comet, allowing
the entire stage to fall back to Spark.
---
.../shuffle/CometShuffleExchangeExec.scala | 23 ++++++++++++-
.../org/apache/comet/exec/CometExecSuite.scala | 38 +++++++++++++++++++++-
2 files changed, 59 insertions(+), 2 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index d65a6b21f..df2dca033 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference,
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference,
Expression, PlanExpression, UnsafeProjection, UnsafeRow}
import
org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical._
@@ -454,6 +454,11 @@ object CometShuffleExchangeExec
return false
}
+ if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s))
{
+ withInfo(s, "Stage contains a scan with Dynamic Partition Pruning")
+ return false
+ }
+
if (!isCometJVMShuffleMode(s.conf)) {
withInfo(s, "Comet columnar shuffle not enabled")
return false
@@ -546,6 +551,22 @@ object CometShuffleExchangeExec
}
}
+ /**
+ * Returns true if the stage (the subtree rooted at this shuffle) contains a
scan with Dynamic
+ * Partition Pruning (DPP). When DPP is present, the scan falls back to
Spark, and wrapping the
+ * stage with Comet shuffle creates inefficient row-to-columnar transitions.
+ */
+ private def stageContainsDPPScan(s: ShuffleExchangeExec): Boolean = {
+ def isDynamicPruningFilter(e: Expression): Boolean =
+ e.exists(_.isInstanceOf[PlanExpression[_]])
+
+ s.child.exists {
+ case scan: FileSourceScanExec =>
+ scan.partitionFilters.exists(isDynamicPruningFilter)
+ case _ => false
+ }
+ }
+
def isCometShuffleEnabledWithInfo(op: SparkPlan): Boolean = {
if (!COMET_EXEC_SHUFFLE_ENABLED.get(op.conf)) {
withInfo(
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index aff181626..39d34a699 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -139,8 +139,44 @@ class CometExecSuite extends CometTestBase {
val (_, cometPlan) = checkSparkAnswer(df)
val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
assert(infos.contains("Dynamic Partition Pruning is not supported"))
+ }
+ }
+ }
+ }
+
+ test("DPP fallback avoids inefficient Comet shuffle (#3874)") {
+ withTempDir { path =>
+ val factPath = s"${path.getAbsolutePath}/fact.parquet"
+ val dimPath = s"${path.getAbsolutePath}/dim.parquet"
+ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
+ val one_day = 24 * 60 * 60000
+ val fact = Range(0, 100)
+ .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i *
one_day), i.toString))
+ .toDF("fact_id", "fact_date", "fact_str")
+ fact.write.partitionBy("fact_date").parquet(factPath)
+ val dim = Range(0, 10)
+ .map(i => (i, new java.sql.Date(System.currentTimeMillis() + i *
one_day), i.toString))
+ .toDF("dim_id", "dim_date", "dim_str")
+ dim.write.parquet(dimPath)
+ }
+
+ // Force sort-merge join to get a shuffle exchange above the DPP scan
+ Seq("parquet").foreach { v1List =>
+ withSQLConf(
+ SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
+ spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact2")
+ spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim2")
+ val df =
+ spark.sql(
+ "select * from dpp_fact2 join dpp_dim2 on fact_date = dim_date
where dim_id > 7")
+ val (_, cometPlan) = checkSparkAnswer(df)
- assert(infos.contains("Comet accelerated"))
+ // Verify no CometShuffleExchangeExec wraps the DPP stage
+ assert(
+ !cometPlan.toString().contains("CometColumnarShuffle"),
+ "Should not use Comet columnar shuffle for stages with DPP scans")
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]