This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 528822aaa chore: Improve expression fallback reporting (#2240) 528822aaa is described below commit 528822aaa3dfaf27ab308f492916013c725c007d Author: Andy Grove <agr...@apache.org> AuthorDate: Tue Aug 26 13:09:08 2025 -0600 chore: Improve expression fallback reporting (#2240) --- .../main/scala/org/apache/comet/GenerateDocs.scala | 3 +- .../org/apache/comet/expressions/CometCast.scala | 11 +---- .../org/apache/comet/serde/QueryPlanSerde.scala | 55 ++++++++++++++++++---- .../main/scala/org/apache/comet/serde/arrays.scala | 52 +++++++++++++++----- .../scala/org/apache/comet/serde/unixtime.scala | 5 +- .../scala/org/apache/comet/CometCastSuite.scala | 3 +- .../spark/sql/CometToPrettyStringSuite.scala | 3 +- 7 files changed, 95 insertions(+), 37 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala index d8cc62cf9..56fe75179 100644 --- a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala +++ b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala @@ -25,7 +25,8 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.catalyst.expressions.Cast -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.{Compatible, Incompatible} /** * Utility for generating markdown documentation from the configs. diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 337eae11d..fcf22c4a0 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -21,16 +21,7 @@ package org.apache.comet.expressions import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType} -sealed trait SupportLevel - -/** We support this feature with full compatibility with Spark */ -case class Compatible(notes: Option[String] = None) extends SupportLevel - -/** We support this feature but results can be different from Spark */ -case class Incompatible(notes: Option[String] = None) extends SupportLevel - -/** We do not support this feature */ -object Unsupported extends SupportLevel +import org.apache.comet.serde.{Compatible, Incompatible, SupportLevel, Unsupported} object CometCast { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3c817ef5d..5f8417b79 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -643,14 +643,31 @@ object QueryPlanSerde extends Logging with CometExprShim { SQLConf.get def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = { - handler match { - case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() => - withInfo( - expr, - s"$expr is not fully compatible with Spark. To enable it anyway, set " + - s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.") + handler.getSupportLevel(expr) match { + case Unsupported => + withInfo(expr, s"$expr is not supported.") None - case _ => + case Incompatible(notes) => + if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { + if (notes.isDefined) { + logWarning( + s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " + + s"but has notes: ${notes.get}") + } + handler.convert(expr, inputs, binding) + } else { + val optionalNotes = notes.map(str => s" ($str)").getOrElse("") + withInfo( + expr, + s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " + + s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " + + s"${CometConf.COMPAT_GUIDE}.") + None + } + case Compatible(notes) => + if (notes.isDefined) { + logWarning(s"Comet supports $expr but has notes: ${notes.get}") + } handler.convert(expr, inputs, binding) } } @@ -2349,6 +2366,17 @@ object QueryPlanSerde extends Logging with CometExprShim { } } +sealed trait SupportLevel + +/** We support this feature with full compatibility with Spark */ +case class Compatible(notes: Option[String] = None) extends SupportLevel + +/** We support this feature but results can be different from Spark */ +case class Incompatible(notes: Option[String] = None) extends SupportLevel + +/** We do not support this feature */ +object Unsupported extends SupportLevel + /** * Trait for providing serialization logic for operators. */ @@ -2386,6 +2414,16 @@ trait CometOperatorSerde[T <: SparkPlan] { */ trait CometExpressionSerde[T <: Expression] { + /** + * Determine the support level of the expression based on its attributes. + * + * @param expr + * The Spark expression. + * @return + * Support level (Compatible, Incompatible, or Unsupported). + */ + def getSupportLevel(expr: T): SupportLevel = Compatible(None) + /** * Convert a Spark expression into a protocol buffer representation that can be passed into * native code. @@ -2436,9 +2474,6 @@ trait CometAggregateExpressionSerde { conf: SQLConf): Option[ExprOuterClass.AggExpr] } -/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */ -trait IncompatExpr {} - /** Serde for scalar function. */ case class CometScalarFunction[T <: Expression](name: String) extends CometExpressionSerde[T] { override def convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 2a77d5fa1..d2624ca23 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -93,7 +93,10 @@ object CometArrayRemove extends CometExpressionSerde[ArrayRemove] with CometExpr } } -object CometArrayAppend extends CometExpressionSerde[ArrayAppend] with IncompatExpr { +object CometArrayAppend extends CometExpressionSerde[ArrayAppend] { + + override def getSupportLevel(expr: ArrayAppend): SupportLevel = Incompatible(None) + override def convert( expr: ArrayAppend, inputs: Seq[Attribute], @@ -149,7 +152,10 @@ object CometArrayContains extends CometExpressionSerde[ArrayContains] { } } -object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with IncompatExpr { +object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] { + + override def getSupportLevel(expr: ArrayDistinct): SupportLevel = Incompatible(None) + override def convert( expr: ArrayDistinct, inputs: Seq[Attribute], @@ -162,7 +168,10 @@ object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] with Incom } } -object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] with IncompatExpr { +object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] { + + override def getSupportLevel(expr: ArrayIntersect): SupportLevel = Incompatible(None) + override def convert( expr: ArrayIntersect, inputs: Seq[Attribute], @@ -201,7 +210,10 @@ object CometArrayMin extends CometExpressionSerde[ArrayMin] { } } -object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with IncompatExpr { +object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { + + override def getSupportLevel(expr: ArraysOverlap): SupportLevel = Incompatible(None) + override def convert( expr: ArraysOverlap, inputs: Seq[Attribute], @@ -218,7 +230,10 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with Incom } } -object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatExpr { +object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] { + + override def getSupportLevel(expr: ArrayRepeat): SupportLevel = Incompatible(None) + override def convert( expr: ArrayRepeat, inputs: Seq[Attribute], @@ -232,7 +247,10 @@ object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] with IncompatE } } -object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatExpr { +object CometArrayCompact extends CometExpressionSerde[Expression] { + + override def getSupportLevel(expr: Expression): SupportLevel = Incompatible(None) + override def convert( expr: Expression, inputs: Seq[Attribute], @@ -252,10 +270,9 @@ object CometArrayCompact extends CometExpressionSerde[Expression] with IncompatE } } -object CometArrayExcept - extends CometExpressionSerde[ArrayExcept] - with CometExprShim - with IncompatExpr { +object CometArrayExcept extends CometExpressionSerde[ArrayExcept] with CometExprShim { + + override def getSupportLevel(expr: ArrayExcept): SupportLevel = Incompatible(None) @tailrec def isTypeSupported(dt: DataType): Boolean = { @@ -292,7 +309,10 @@ object CometArrayExcept } } -object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr { +object CometArrayJoin extends CometExpressionSerde[ArrayJoin] { + + override def getSupportLevel(expr: ArrayJoin): SupportLevel = Incompatible(None) + override def convert( expr: ArrayJoin, inputs: Seq[Attribute], @@ -326,7 +346,10 @@ object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with IncompatExpr } } -object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatExpr { +object CometArrayInsert extends CometExpressionSerde[ArrayInsert] { + + override def getSupportLevel(expr: ArrayInsert): SupportLevel = Incompatible(None) + override def convert( expr: ArrayInsert, inputs: Seq[Attribute], @@ -361,7 +384,10 @@ object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with IncompatE } } -object CometArrayUnion extends CometExpressionSerde[ArrayUnion] with IncompatExpr { +object CometArrayUnion extends CometExpressionSerde[ArrayUnion] { + + override def getSupportLevel(expr: ArrayUnion): SupportLevel = Incompatible(None) + override def convert( expr: ArrayUnion, inputs: Seq[Attribute], diff --git a/spark/src/main/scala/org/apache/comet/serde/unixtime.scala b/spark/src/main/scala/org/apache/comet/serde/unixtime.scala index 0171d3e1a..198c7d310 100644 --- a/spark/src/main/scala/org/apache/comet/serde/unixtime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/unixtime.scala @@ -27,7 +27,10 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn // TODO: DataFusion supports only -8334601211038 <= sec <= 8210266876799 // https://github.com/apache/datafusion/issues/16594 -object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] with IncompatExpr { +object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] { + + override def getSupportLevel(expr: FromUnixTime): SupportLevel = Incompatible(None) + override def convert( expr: FromUnixTime, inputs: Seq[Attribute], diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index fbf38e2e0..8bad71e08 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType} import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.Compatible class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { diff --git a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala index 7a5af8761..d030106c3 100644 --- a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala +++ b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql import org.apache.comet.CometConf -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible} +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.Compatible import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.TableIdentifier --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org