This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 0050ed81e feat: Improve shuffle fallback reporting (#2194)
0050ed81e is described below

commit 0050ed81eedff8681016b5c1b3383e3c23ee9ec4
Author: Andy Grove <agr...@apache.org>
AuthorDate: Fri Aug 22 13:11:03 2025 -0600

    feat: Improve shuffle fallback reporting (#2194)
---
 .../org/apache/comet/rules/CometExecRule.scala     | 129 +++++++++++++++------
 1 file changed, 91 insertions(+), 38 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index 87daf2212..c7af3ab77 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -22,7 +22,7 @@ package org.apache.comet.rules
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, 
EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, 
GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, 
NamedExpression, Remainder}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Divide, 
DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, 
GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, 
NamedExpression, Remainder, SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
 import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
RangePartitioning, RoundRobinPartitioning, SinglePartition}
@@ -793,34 +793,52 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
       return false
     }
 
+    if (!checkSupportedShuffleDataTypes(s)) {
+      return false
+    }
+
     val inputs = s.child.output
     val partitioning = s.outputPartitioning
     val conf = SQLConf.get
     partitioning match {
       case HashPartitioning(expressions, _) =>
-        // native shuffle currently does not support complex types as 
partition keys
-        // due to lack of hashing support for those types
-        val supported =
-          expressions.map(QueryPlanSerde.exprToProto(_, 
inputs)).forall(_.isDefined) &&
-            expressions.forall(e => 
supportedHashPartitionKeyDataType(e.dataType)) &&
-            inputs.forall(attr => supportedShuffleDataType(attr.dataType)) &&
-            
CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED.get(conf)
-        if (!supported) {
-          withInfo(s, s"unsupported Spark partitioning: $expressions")
+        var supported = true
+        if 
(!CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED.get(conf)) {
+          withInfo(
+            s,
+            
s"${CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED.key} is 
disabled")
+          supported = false
+        }
+        for (expr <- expressions) {
+          if (QueryPlanSerde.exprToProto(expr, inputs).isEmpty) {
+            withInfo(s, s"unsupported hash partitioning expression: $expr")
+            supported = false
+          }
+        }
+        for (dt <- expressions.map(_.dataType).distinct) {
+          if (!supportedHashPartitionKeyDataType(dt)) {
+            // native shuffle currently does not support complex types as 
partition keys
+            // due to lack of hashing support for those types
+            withInfo(s, s"unsupported hash partitioning data type for native 
shuffle: $dt")
+            supported = false
+          }
         }
         supported
       case SinglePartition =>
-        inputs.forall(attr => supportedShuffleDataType(attr.dataType))
-      case RangePartitioning(ordering, _) =>
-        val supported = ordering.map(QueryPlanSerde.exprToProto(_, 
inputs)).forall(_.isDefined) &&
-          inputs.forall(attr => supportedShuffleDataType(attr.dataType)) &&
-          
CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.get(conf)
-        if (!supported) {
-          withInfo(s, s"unsupported Spark partitioning: $ordering")
+        // we already checked that the input types are supported
+        true
+      case RangePartitioning(orderings, _) =>
+        if 
(!CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.get(conf)) {
+          // do not encourage the users to enable the config because we know 
that
+          // the experimental implementation is not correct yet
+          withInfo(s, "Range partitioning is not supported by native shuffle")
+          return false
         }
-        supported
+        rangePartitioningSupported(s, inputs, orderings)
       case _ =>
-        withInfo(s, s"unsupported Spark partitioning: 
${partitioning.getClass.getName}")
+        withInfo(
+          s,
+          s"unsupported Spark partitioning for native shuffle: 
${partitioning.getClass.getName}")
         false
     }
   }
@@ -851,38 +869,69 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
     }
 
     val inputs = s.child.output
+    if (!checkSupportedShuffleDataTypes(s)) {
+      return false
+    }
+
     val partitioning = s.outputPartitioning
     partitioning match {
       case HashPartitioning(expressions, _) =>
-        // columnar shuffle supports the same data types (including complex 
types) both for
-        // partition keys and for other columns
-        val supported =
-          expressions.map(QueryPlanSerde.exprToProto(_, 
inputs)).forall(_.isDefined) &&
-            expressions.forall(e => supportedShuffleDataType(e.dataType)) &&
-            inputs.forall(attr => supportedShuffleDataType(attr.dataType))
-        if (!supported) {
-          withInfo(s, s"unsupported Spark partitioning expressions: 
$expressions")
+        var supported = true
+        for (expr <- expressions) {
+          if (QueryPlanSerde.exprToProto(expr, inputs).isEmpty) {
+            withInfo(s, s"unsupported hash partitioning expression: $expr")
+            supported = false
+          }
         }
         supported
       case SinglePartition =>
-        inputs.forall(attr => supportedShuffleDataType(attr.dataType))
+        // we already checked that the input types are supported
+        true
       case RoundRobinPartitioning(_) =>
-        inputs.forall(attr => supportedShuffleDataType(attr.dataType))
+        // we already checked that the input types are supported
+        true
       case RangePartitioning(orderings, _) =>
-        val supported =
-          orderings.map(QueryPlanSerde.exprToProto(_, 
inputs)).forall(_.isDefined) &&
-            orderings.forall(e => supportedShuffleDataType(e.dataType)) &&
-            inputs.forall(attr => supportedShuffleDataType(attr.dataType))
-        if (!supported) {
-          withInfo(s, s"unsupported Spark partitioning expressions: 
$orderings")
-        }
-        supported
+        rangePartitioningSupported(s, inputs, orderings)
       case _ =>
-        withInfo(s, s"unsupported Spark partitioning: 
${partitioning.getClass.getName}")
+        withInfo(
+          s,
+          s"unsupported Spark partitioning for columnar shuffle: 
${partitioning.getClass.getName}")
         false
     }
   }
 
+  private def rangePartitioningSupported(
+      s: ShuffleExchangeExec,
+      inputs: Seq[Attribute],
+      orderings: Seq[SortOrder]) = {
+    var supported = true
+    for (o <- orderings) {
+      if (QueryPlanSerde.exprToProto(o, inputs).isEmpty) {
+        withInfo(s, s"unsupported range partitioning sort order: $o")
+        supported = false
+      }
+    }
+    for (dt <- orderings.map(_.dataType).distinct) {
+      if (!supportedShuffleDataType(dt)) {
+        withInfo(s, s"unsupported shuffle data type: $dt")
+        supported = false
+      }
+    }
+    supported
+  }
+
+  /** Check that all input types can be written to a shuffle file */
+  private def checkSupportedShuffleDataTypes(s: ShuffleExchangeExec): Boolean 
= {
+    var supported = true
+    for (input <- s.child.output) {
+      if (!supportedShuffleDataType(input.dataType)) {
+        withInfo(s, s"unsupported shuffle data type ${input.dataType} for 
input $input")
+        supported = false
+      }
+    }
+    supported
+  }
+
   /**
    * Determine which data types are supported in a shuffle.
    */
@@ -895,6 +944,10 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
       fields.forall(f => supportedShuffleDataType(f.dataType)) &&
       // Java Arrow stream reader cannot work on duplicate field name
       fields.map(f => f.name).distinct.length == fields.length
+
+    // TODO add support for nested complex types
+    // https://github.com/apache/datafusion-comet/issues/2199
+
     case ArrayType(ArrayType(_, _), _) => false // TODO: nested array is not 
supported
     case ArrayType(MapType(_, _, _), _) => false // TODO: map array element is 
not supported
     case ArrayType(elementType, _) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to