This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 0050ed81e feat: Improve shuffle fallback reporting (#2194) 0050ed81e is described below commit 0050ed81eedff8681016b5c1b3383e3c23ee9ec4 Author: Andy Grove <agr...@apache.org> AuthorDate: Fri Aug 22 13:11:03 2025 -0600 feat: Improve shuffle fallback reporting (#2194) --- .../org/apache/comet/rules/CometExecRule.scala | 129 +++++++++++++++------ 1 file changed, 91 insertions(+), 38 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 87daf2212..c7af3ab77 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -22,7 +22,7 @@ package org.apache.comet.rules import scala.collection.mutable.ListBuffer import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, Remainder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, Remainder, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition} @@ -793,34 +793,52 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { return false } + if (!checkSupportedShuffleDataTypes(s)) { + return false + } + val inputs = s.child.output val partitioning = s.outputPartitioning val conf = SQLConf.get partitioning match { case HashPartitioning(expressions, _) => - // native shuffle currently does not support complex types as partition keys - // due to lack of hashing support for those types - val supported = - expressions.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) && - expressions.forall(e => supportedHashPartitionKeyDataType(e.dataType)) && - inputs.forall(attr => supportedShuffleDataType(attr.dataType)) && - CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED.get(conf) - if (!supported) { - withInfo(s, s"unsupported Spark partitioning: $expressions") + var supported = true + if (!CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED.get(conf)) { + withInfo( + s, + s"${CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED.key} is disabled") + supported = false + } + for (expr <- expressions) { + if (QueryPlanSerde.exprToProto(expr, inputs).isEmpty) { + withInfo(s, s"unsupported hash partitioning expression: $expr") + supported = false + } + } + for (dt <- expressions.map(_.dataType).distinct) { + if (!supportedHashPartitionKeyDataType(dt)) { + // native shuffle currently does not support complex types as partition keys + // due to lack of hashing support for those types + withInfo(s, s"unsupported hash partitioning data type for native shuffle: $dt") + supported = false + } } supported case SinglePartition => - inputs.forall(attr => supportedShuffleDataType(attr.dataType)) - case RangePartitioning(ordering, _) => - val supported = ordering.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) && - inputs.forall(attr => supportedShuffleDataType(attr.dataType)) && - CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.get(conf) - if (!supported) { - withInfo(s, s"unsupported Spark partitioning: $ordering") + // we already checked that the input types are supported + true + case RangePartitioning(orderings, _) => + if (!CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.get(conf)) { + // do not encourage the users to enable the config because we know that + // the experimental implementation is not correct yet + withInfo(s, "Range partitioning is not supported by native shuffle") + return false } - supported + rangePartitioningSupported(s, inputs, orderings) case _ => - withInfo(s, s"unsupported Spark partitioning: ${partitioning.getClass.getName}") + withInfo( + s, + s"unsupported Spark partitioning for native shuffle: ${partitioning.getClass.getName}") false } } @@ -851,38 +869,69 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } val inputs = s.child.output + if (!checkSupportedShuffleDataTypes(s)) { + return false + } + val partitioning = s.outputPartitioning partitioning match { case HashPartitioning(expressions, _) => - // columnar shuffle supports the same data types (including complex types) both for - // partition keys and for other columns - val supported = - expressions.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) && - expressions.forall(e => supportedShuffleDataType(e.dataType)) && - inputs.forall(attr => supportedShuffleDataType(attr.dataType)) - if (!supported) { - withInfo(s, s"unsupported Spark partitioning expressions: $expressions") + var supported = true + for (expr <- expressions) { + if (QueryPlanSerde.exprToProto(expr, inputs).isEmpty) { + withInfo(s, s"unsupported hash partitioning expression: $expr") + supported = false + } } supported case SinglePartition => - inputs.forall(attr => supportedShuffleDataType(attr.dataType)) + // we already checked that the input types are supported + true case RoundRobinPartitioning(_) => - inputs.forall(attr => supportedShuffleDataType(attr.dataType)) + // we already checked that the input types are supported + true case RangePartitioning(orderings, _) => - val supported = - orderings.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) && - orderings.forall(e => supportedShuffleDataType(e.dataType)) && - inputs.forall(attr => supportedShuffleDataType(attr.dataType)) - if (!supported) { - withInfo(s, s"unsupported Spark partitioning expressions: $orderings") - } - supported + rangePartitioningSupported(s, inputs, orderings) case _ => - withInfo(s, s"unsupported Spark partitioning: ${partitioning.getClass.getName}") + withInfo( + s, + s"unsupported Spark partitioning for columnar shuffle: ${partitioning.getClass.getName}") false } } + private def rangePartitioningSupported( + s: ShuffleExchangeExec, + inputs: Seq[Attribute], + orderings: Seq[SortOrder]) = { + var supported = true + for (o <- orderings) { + if (QueryPlanSerde.exprToProto(o, inputs).isEmpty) { + withInfo(s, s"unsupported range partitioning sort order: $o") + supported = false + } + } + for (dt <- orderings.map(_.dataType).distinct) { + if (!supportedShuffleDataType(dt)) { + withInfo(s, s"unsupported shuffle data type: $dt") + supported = false + } + } + supported + } + + /** Check that all input types can be written to a shuffle file */ + private def checkSupportedShuffleDataTypes(s: ShuffleExchangeExec): Boolean = { + var supported = true + for (input <- s.child.output) { + if (!supportedShuffleDataType(input.dataType)) { + withInfo(s, s"unsupported shuffle data type ${input.dataType} for input $input") + supported = false + } + } + supported + } + /** * Determine which data types are supported in a shuffle. */ @@ -895,6 +944,10 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { fields.forall(f => supportedShuffleDataType(f.dataType)) && // Java Arrow stream reader cannot work on duplicate field name fields.map(f => f.name).distinct.length == fields.length + + // TODO add support for nested complex types + // https://github.com/apache/datafusion-comet/issues/2199 + case ArrayType(ArrayType(_, _), _) => false // TODO: nested array is not supported case ArrayType(MapType(_, _, _), _) => false // TODO: map array element is not supported case ArrayType(elementType, _) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org