This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new a67008b8e chore: Introduce `exprHandlers` map in QueryPlanSerde (#1903) a67008b8e is described below commit a67008b8e7040e9fa23b7a7bb46f6c6081029bed Author: Andy Grove <agr...@apache.org> AuthorDate: Wed Jun 25 02:56:45 2025 -0600 chore: Introduce `exprHandlers` map in QueryPlanSerde (#1903) --- docs/source/user-guide/expressions.md | 26 ++--- .../org/apache/comet/serde/QueryPlanSerde.scala | 108 ++++++++------------- 2 files changed, 56 insertions(+), 78 deletions(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 5720f3304..7177276bd 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -127,7 +127,7 @@ The following Spark expressions are currently available. Any known compatibility | Log10 | | | Pow | | | Round | | -| Signum | Signum does not differentiate between `0.0` and `-0.0` | +| Signum | | | Sin | | | Sqrt | | | Tan | | @@ -186,16 +186,20 @@ The following Spark expressions are currently available. Any known compatibility ## Arrays -| Expression | Notes | -|-------------------|--------------| -| ArrayAppend | Experimental | -| ArrayContains | Experimental | -| ArrayIntersect | Experimental | -| ArrayJoin | Experimental | -| ArrayRemove | | -| ArraysOverlap | Experimental | -| ElementAt | Arrays only | -| GetArrayItem | | +| Expression | Notes | +| -------------- | ------------ | +| ArrayAppend | Experimental | +| ArrayExcept | Experimental | +| ArrayCompact | Experimental | +| ArrayContains | Experimental | +| ArrayInsert | Experimental | +| ArrayIntersect | Experimental | +| ArrayJoin | Experimental | +| ArrayRemove | | +| ArrayRepeat | Experimental | +| ArraysOverlap | Experimental | +| ElementAt | Arrays only | +| GetArrayItem | | ## Structs diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a97b17774..9671ef9d7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -61,6 +61,40 @@ import org.apache.comet.shims.CometExprShim * An utility object for query plan and expression serialization. */ object QueryPlanSerde extends Logging with CometExprShim { + + /** + * Mapping of Spark expression class to Comet expression handler. + */ + private val exprSerdeMap: Map[Class[_], CometExpressionSerde] = Map( + classOf[ArrayAppend] -> CometArrayAppend, + classOf[ArrayContains] -> CometArrayContains, + classOf[ArrayExcept] -> CometArrayExcept, + classOf[ArrayInsert] -> CometArrayInsert, + classOf[ArrayIntersect] -> CometArrayIntersect, + classOf[ArrayJoin] -> CometArrayJoin, + classOf[ArrayMax] -> CometArrayMax, + classOf[ArrayRemove] -> CometArrayRemove, + classOf[ArrayRepeat] -> CometArrayRepeat, + classOf[ArraysOverlap] -> CometArraysOverlap, + classOf[Ascii] -> CometAscii, + classOf[ConcatWs] -> CometConcatWs, + classOf[Chr] -> CometChr, + classOf[InitCap] -> CometInitCap, + classOf[BitLength] -> CometBitLength, + classOf[Length] -> CometLength, + classOf[StringInstr] -> CometStringInstr, + classOf[StringRepeat] -> CometStringRepeat, + classOf[StringReplace] -> CometStringReplace, + classOf[StringTranslate] -> CometStringTranslate, + classOf[StringTrim] -> CometTrim, + classOf[StringTrimLeft] -> CometStringTrimLeft, + classOf[StringTrimRight] -> CometStringTrimRight, + classOf[StringTrimBoth] -> CometStringTrimBoth, + classOf[Upper] -> CometUpper, + classOf[Lower] -> CometLower, + classOf[Murmur3Hash] -> CometMurmur3Hash, + classOf[XxHash64] -> CometXxHash64) + def emitWarning(reason: String): Unit = { logWarning(s"Comet native execution is disabled due to: $reason") } @@ -1457,9 +1491,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val optExpr = scalarFunctionExprToProto("tan", childExpr) optExprWithInfo(optExpr, expr, child) - case _: Ascii => - CometAscii.convert(expr, inputs, binding) - case Expm1(child) => val childExpr = exprToProtoInternal(child, inputs, binding) val optExpr = scalarFunctionExprToProto("expm1", childExpr) @@ -1514,9 +1545,6 @@ object QueryPlanSerde extends Logging with CometExprShim { None } - case _: BitLength => - CometBitLength.convert(expr, inputs, binding) - case If(predicate, trueValue, falseValue) => val predicateExpr = exprToProtoInternal(predicate, inputs, binding) val trueExpr = exprToProtoInternal(trueValue, inputs, binding) @@ -1570,17 +1598,6 @@ object QueryPlanSerde extends Logging with CometExprShim { withInfo(expr, allBranches: _*) None } - case _: ConcatWs => - CometConcatWs.convert(expr, inputs, binding) - - case _: Chr => - CometChr.convert(expr, inputs, binding) - - case _: InitCap => - CometInitCap.convert(expr, inputs, binding) - - case _: Length => - CometLength.convert(expr, inputs, binding) case Md5(child) => val childExpr = exprToProtoInternal(child, inputs, binding) @@ -1599,36 +1616,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val optExpr = scalarFunctionExprToProto("reverse", childExpr) optExprWithInfo(optExpr, expr, castExpr) - case _: StringInstr => - CometStringInstr.convert(expr, inputs, binding) - - case _: StringRepeat => - CometStringRepeat.convert(expr, inputs, binding) - - case _: StringReplace => - CometStringReplace.convert(expr, inputs, binding) - - case _: StringTranslate => - CometStringTranslate.convert(expr, inputs, binding) - - case _: StringTrim => - CometTrim.convert(expr, inputs, binding) - - case _: StringTrimLeft => - CometStringTrimLeft.convert(expr, inputs, binding) - - case _: StringTrimRight => - CometStringTrimRight.convert(expr, inputs, binding) - - case _: StringTrimBoth => - CometStringTrimBoth.convert(expr, inputs, binding) - - case _: Upper => - CometUpper.convert(expr, inputs, binding) - - case _: Lower => - CometLower.convert(expr, inputs, binding) - case BitwiseAnd(left, right) => createBinaryExpr( expr, @@ -1847,10 +1834,6 @@ object QueryPlanSerde extends Logging with CometExprShim { None } - case _: Murmur3Hash => CometMurmur3Hash.convert(expr, inputs, binding) - - case _: XxHash64 => CometXxHash64.convert(expr, inputs, binding) - case Sha2(left, numBits) => if (!numBits.foldable) { withInfo(expr, "non literal numBits is not supported") @@ -1942,8 +1925,6 @@ object QueryPlanSerde extends Logging with CometExprShim { None } - case expr if expr.prettyName == "array_insert" => convert(CometArrayInsert) - case ElementAt(child, ordinal, defaultValue, failOnError) if child.dataType.isInstanceOf[ArrayType] => val childExpr = exprToProtoInternal(child, inputs, binding) @@ -1989,18 +1970,8 @@ object QueryPlanSerde extends Logging with CometExprShim { withInfo(expr, "unsupported arguments for GetArrayStructFields", child) None } - case _: ArrayRemove => convert(CometArrayRemove) - case _: ArrayContains => convert(CometArrayContains) - case _: ArrayMax => convert(CometArrayMax) - case _: ArrayAppend => convert(CometArrayAppend) - case _: ArrayIntersect => convert(CometArrayIntersect) - case _: ArrayJoin => convert(CometArrayJoin) - case _: ArraysOverlap => convert(CometArraysOverlap) - case _: ArrayRepeat => convert(CometArrayRepeat) case _ @ArrayFilter(_, func) if func.children.head.isInstanceOf[IsNotNull] => convert(CometArrayCompact) - case _: ArrayExcept => - convert(CometArrayExcept) case mk: MapKeys => val childExpr = exprToProtoInternal(mk.child, inputs, binding) scalarFunctionExprToProto("map_keys", childExpr) @@ -2011,11 +1982,14 @@ object QueryPlanSerde extends Logging with CometExprShim { val mapExpr = exprToProtoInternal(gmv.child, inputs, binding) val keyExpr = exprToProtoInternal(gmv.key, inputs, binding) scalarFunctionExprToProto("map_extract", mapExpr, keyExpr) - case _ => - withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) - None + case expr => + QueryPlanSerde.exprSerdeMap.get(expr.getClass) match { + case Some(handler) => convert(handler) + case _ => + withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) + None + } } - } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org