This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new ede05ff0a chore: Refactor serde for RegExpReplace (#2548)
ede05ff0a is described below
commit ede05ff0a6da215c78f0a2da4d02a906947fd051
Author: Andy Grove <[email protected]>
AuthorDate: Mon Oct 13 22:40:09 2025 -0600
chore: Refactor serde for RegExpReplace (#2548)
---
.../org/apache/comet/serde/QueryPlanSerde.scala | 31 +----------------
.../scala/org/apache/comet/serde/strings.scala | 40 +++++++++++++++++++++-
2 files changed, 40 insertions(+), 31 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index afde79599..594f144b3 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -170,6 +170,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[Like] -> CometLike,
classOf[Lower] -> CometLower,
classOf[OctetLength] -> CometScalarFunction("octet_length"),
+ classOf[RegExpReplace] -> CometRegExpReplace,
classOf[Reverse] -> CometScalarFunction("reverse"),
classOf[RLike] -> CometRLike,
classOf[StartsWith] -> CometScalarFunction("starts_with"),
@@ -761,36 +762,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
// `PromotePrecision` is just a wrapper, don't need to serialize it.
exprToProtoInternal(child, inputs, binding)
- case RegExpReplace(subject, pattern, replacement, startPosition) =>
- if (!RegExp.isSupportedPattern(pattern.toString) &&
- !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
- withInfo(
- expr,
- s"Regexp pattern $pattern is not compatible with Spark. " +
- s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
- "to allow it anyway.")
- return None
- }
- startPosition match {
- case Literal(value, DataTypes.IntegerType) if value == 1 =>
- val subjectExpr = exprToProtoInternal(subject, inputs, binding)
- val patternExpr = exprToProtoInternal(pattern, inputs, binding)
- val replacementExpr = exprToProtoInternal(replacement, inputs,
binding)
- // DataFusion's regexp_replace stops at the first match. We need
to add the 'g' flag
- // to apply the regex globally to match Spark behavior.
- val flagsExpr = exprToProtoInternal(Literal("g"), inputs, binding)
- val optExpr = scalarFunctionExprToProto(
- "regexp_replace",
- subjectExpr,
- patternExpr,
- replacementExpr,
- flagsExpr)
- optExprWithInfo(optExpr, expr, subject, pattern, replacement,
startPosition)
- case _ =>
- withInfo(expr, "Comet only supports regexp_replace with an offset
of 1 (no offset).")
- None
- }
-
// With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called
to pad spaces for
// char types.
// See https://github.com/apache/spark/pull/38151
diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala
b/spark/src/main/scala/org/apache/comet/serde/strings.scala
index 02872eef3..795ca9b43 100644
--- a/spark/src/main/scala/org/apache/comet/serde/strings.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala
@@ -21,7 +21,7 @@ package org.apache.comet.serde
import java.util.Locale
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression,
InitCap, Length, Like, Literal, Lower, RLike, StringLPad, StringRepeat,
StringRPad, Substring, Upper}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression,
InitCap, Length, Like, Literal, Lower, RegExpReplace, RLike, StringLPad,
StringRepeat, StringRPad, Substring, Upper}
import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType}
import org.apache.comet.CometConf
@@ -204,6 +204,44 @@ object CometStringLPad extends
CometExpressionSerde[StringLPad] {
}
}
+object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] {
+ override def getSupportLevel(expr: RegExpReplace): SupportLevel = {
+ if (!RegExp.isSupportedPattern(expr.regexp.toString) &&
+ !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
+ withInfo(
+ expr,
+ s"Regexp pattern ${expr.regexp} is not compatible with Spark. " +
+ s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
+ "to allow it anyway.")
+ return Incompatible()
+ }
+ expr.pos match {
+ case Literal(value, DataTypes.IntegerType) if value == 1 => Compatible()
+ case _ =>
+ Unsupported(Some("Comet only supports regexp_replace with an offset of
1 (no offset)."))
+ }
+ }
+
+ override def convert(
+ expr: RegExpReplace,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[Expr] = {
+ val subjectExpr = exprToProtoInternal(expr.subject, inputs, binding)
+ val patternExpr = exprToProtoInternal(expr.regexp, inputs, binding)
+ val replacementExpr = exprToProtoInternal(expr.rep, inputs, binding)
+ // DataFusion's regexp_replace stops at the first match. We need to add
the 'g' flag
+ // to apply the regex globally to match Spark behavior.
+ val flagsExpr = exprToProtoInternal(Literal("g"), inputs, binding)
+ val optExpr = scalarFunctionExprToProto(
+ "regexp_replace",
+ subjectExpr,
+ patternExpr,
+ replacementExpr,
+ flagsExpr)
+ optExprWithInfo(optExpr, expr, expr.subject, expr.regexp, expr.rep,
expr.pos)
+ }
+}
+
trait CommonStringExprs {
def stringDecode(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]