This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new cbbcf81da fix: Add support for StringDecode in Spark 4.0.0 (#2075) cbbcf81da is described below commit cbbcf81da8940e6c7de126e65d47228d4588480f Author: Peter Toth <peter.t...@gmail.com> AuthorDate: Fri Aug 8 19:18:58 2025 +0200 fix: Add support for StringDecode in Spark 4.0.0 (#2075) --- .../org/apache/comet/serde/QueryPlanSerde.scala | 45 ++++++++++++---------- .../org/apache/comet/shims/CometExprShim.scala | 15 ++++++++ .../org/apache/comet/shims/CometExprShim.scala | 15 ++++++++ .../org/apache/comet/shims/CometExprShim.scala | 27 +++++++++++++ .../org/apache/comet/CometFuzzTestSuite.scala | 3 -- 5 files changed, 82 insertions(+), 23 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3391a10c9..ea4739c77 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -640,7 +640,7 @@ object QueryPlanSerde extends Logging with CometExprShim { } } - expr match { + versionSpecificExprToProtoInternal(expr, inputs, binding).orElse(expr match { case a @ Alias(_, _) => val r = exprToProtoInternal(a.child, inputs, binding) if (r.isEmpty) { @@ -1275,25 +1275,6 @@ object QueryPlanSerde extends Logging with CometExprShim { optExprWithInfo(optExpr, expr, r.child) } - case s: StringDecode => - // Right child is the encoding expression. - s.charset match { - case Literal(str, DataTypes.StringType) - if str.toString.toLowerCase(Locale.ROOT) == "utf-8" => - // decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls - // for invalid strings. - // Left child is the binary expression. - castToProto( - expr, - None, - DataTypes.StringType, - exprToProtoInternal(s.bin, inputs, binding).get, - CometEvalMode.TRY) - case _ => - withInfo(expr, "Comet only supports decoding with 'utf-8'.") - None - } - case RegExpReplace(subject, pattern, replacement, startPosition) => if (!RegExp.isSupportedPattern(pattern.toString) && !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { @@ -1617,6 +1598,30 @@ object QueryPlanSerde extends Logging with CometExprShim { withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) None } + }) + } + + def stringDecode( + expr: Expression, + charset: Expression, + bin: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + charset match { + case Literal(str, DataTypes.StringType) + if str.toString.toLowerCase(Locale.ROOT) == "utf-8" => + // decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls + // for invalid strings. + // Left child is the binary expression. + castToProto( + expr, + None, + DataTypes.StringType, + exprToProtoInternal(bin, inputs, binding).get, + CometEvalMode.TRY) + case _ => + withInfo(expr, "Comet only supports decoding with 'utf-8'.") + None } } diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 5f4e3fba2..2a302d8d4 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -19,6 +19,8 @@ package org.apache.comet.shims import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.ExprOuterClass.Expr +import org.apache.comet.serde.QueryPlanSerde.stringDecode import org.apache.spark.sql.catalyst.expressions._ /** @@ -34,6 +36,19 @@ trait CometExprShim { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + + def versionSpecificExprToProtoInternal( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + expr match { + case s: StringDecode => + // Right child is the encoding expression. + stringDecode(expr, s.charset, s.bin, inputs, binding) + + case _ => None + } + } } object CometEvalModeUtil { diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 5f4e3fba2..2a302d8d4 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -19,6 +19,8 @@ package org.apache.comet.shims import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.ExprOuterClass.Expr +import org.apache.comet.serde.QueryPlanSerde.stringDecode import org.apache.spark.sql.catalyst.expressions._ /** @@ -34,6 +36,19 @@ trait CometExprShim { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + + def versionSpecificExprToProtoInternal( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + expr match { + case s: StringDecode => + // Right child is the encoding expression. + stringDecode(expr, s.charset, s.bin, inputs, binding) + + case _ => None + } + } } object CometEvalModeUtil { diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 5f4e3fba2..1b8e5aaa0 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -19,7 +19,12 @@ package org.apache.comet.shims import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.ExprOuterClass.Expr +import org.apache.comet.serde.QueryPlanSerde.stringDecode import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke +import org.apache.spark.sql.internal.types.StringTypeWithCollation +import org.apache.spark.sql.types.{BinaryType, BooleanType, StringType} /** * `CometExprShim` acts as a shim for for parsing expressions from different Spark versions. @@ -34,6 +39,28 @@ trait CometExprShim { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + + def versionSpecificExprToProtoInternal( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + expr match { + case s: StaticInvoke + if s.staticObject == classOf[StringDecode] && + s.dataType.isInstanceOf[StringType] && + s.functionName == "decode" && + s.arguments.size == 4 && + s.inputTypes == Seq( + BinaryType, + StringTypeWithCollation(supportsTrimCollation = true), + BooleanType, + BooleanType) => + val Seq(bin, charset, _, _) = s.arguments + stringDecode(expr, charset, bin, inputs, binding) + + case _ => None + } + } } object CometEvalModeUtil { diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index d1f55cbe1..a1b1812b3 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types._ -import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -272,8 +271,6 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("decode") { - // https://github.com/apache/datafusion-comet/issues/1942 - assume(!isSpark40Plus) val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") // We want to make sure that the schema generator wasn't modified to accidentally omit --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org