This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7a07db20f feat: Support right expression (#3207)
7a07db20f is described below
commit 7a07db20f216739c7b7d2de3be2594fdef98aa96
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Tue Feb 10 04:03:08 2026 +0530
feat: Support right expression (#3207)
---
.../org/apache/comet/serde/QueryPlanSerde.scala | 1 +
.../scala/org/apache/comet/serde/strings.scala | 46 +++++++++++++-
.../org/apache/comet/CometExpressionSuite.scala | 71 ++++++++++++++++++++++
3 files changed, 117 insertions(+), 1 deletion(-)
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 73b88ae93..fc8803bff 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -173,6 +173,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[StringTrimLeft] -> CometScalarFunction("ltrim"),
classOf[StringTrimRight] -> CometScalarFunction("rtrim"),
classOf[Left] -> CometLeft,
+ classOf[Right] -> CometRight,
classOf[Substring] -> CometSubstring,
classOf[Upper] -> CometUpper)
diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala
b/spark/src/main/scala/org/apache/comet/serde/strings.scala
index ea42b245a..5bf15fd6a 100644
--- a/spark/src/main/scala/org/apache/comet/serde/strings.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala
@@ -21,8 +21,9 @@ package org.apache.comet.serde
import java.util.Locale
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat,
Expression, InitCap, Left, Length, Like, Literal, Lower, RegExpReplace, RLike,
StringLPad, StringRepeat, StringRPad, Substring, Upper}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat,
Expression, If, InitCap, IsNull, Left, Length, Like, Literal, Lower,
RegExpReplace, Right, RLike, StringLPad, StringRepeat, StringRPad, Substring,
Upper}
import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -143,6 +144,49 @@ object CometLeft extends CometExpressionSerde[Left] {
}
}
+object CometRight extends CometExpressionSerde[Right] {
+
+ override def convert(expr: Right, inputs: Seq[Attribute], binding: Boolean):
Option[Expr] = {
+ expr.len match {
+ case Literal(lenValue, _) =>
+ val lenInt = lenValue.asInstanceOf[Int]
+ if (lenInt <= 0) {
+ // Match Spark's behavior: If(IsNull(str), NULL, "")
+ // This ensures NULL propagation: RIGHT(NULL, 0) -> NULL,
RIGHT("hello", 0) -> ""
+ val isNullExpr = IsNull(expr.str)
+ val nullLiteral = Literal.create(null, StringType)
+ val emptyStringLiteral = Literal(UTF8String.EMPTY_UTF8, StringType)
+ val ifExpr = If(isNullExpr, nullLiteral, emptyStringLiteral)
+
+ // Serialize the If expression using existing infrastructure
+ exprToProtoInternal(ifExpr, inputs, binding)
+ } else {
+ exprToProtoInternal(expr.str, inputs, binding) match {
+ case Some(strExpr) =>
+ val builder = ExprOuterClass.Substring.newBuilder()
+ builder.setChild(strExpr)
+ builder.setStart(-lenInt)
+ builder.setLen(lenInt)
+
Some(ExprOuterClass.Expr.newBuilder().setSubstring(builder).build())
+ case None =>
+ withInfo(expr, expr.str)
+ None
+ }
+ }
+ case _ =>
+ withInfo(expr, "RIGHT len must be a literal")
+ None
+ }
+ }
+
+ override def getSupportLevel(expr: Right): SupportLevel = {
+ expr.str.dataType match {
+ case _: StringType => Compatible()
+ case _ => Unsupported(Some(s"RIGHT does not support
${expr.str.dataType}"))
+ }
+ }
+}
+
object CometConcat extends CometScalarFunction[Concat]("concat") {
val unsupportedReason = "CONCAT supports only string input parameters"
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 91428bb61..206ac1726 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -569,6 +569,77 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
+ test("RIGHT function") {
+ withParquetTable((0 until 10).map(i => (s"test$i", i)), "tbl") {
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, 2) FROM tbl")
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, 4) FROM tbl")
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, 0) FROM tbl")
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, -1) FROM tbl")
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, 100) FROM tbl")
+ checkSparkAnswerAndOperator("SELECT RIGHT(CAST(NULL AS STRING), 2) FROM
tbl LIMIT 1")
+ }
+ }
+
+ test("RIGHT function with unicode") {
+ val data = Seq("café", "hello世界", "😀emoji", "తెలుగు")
+ withParquetTable(data.zipWithIndex, "unicode_tbl") {
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, 2) FROM unicode_tbl")
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, 3) FROM unicode_tbl")
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, 0) FROM unicode_tbl")
+ }
+ }
+
+ test("RIGHT function equivalence with SUBSTRING negative pos") {
+ withParquetTable((0 until 20).map(i => Tuple1(s"test$i")), "equiv_tbl") {
+ val df = spark.sql("""
+ SELECT _1,
+ RIGHT(_1, 3) as right_result,
+ SUBSTRING(_1, -3, 3) as substring_result
+ FROM equiv_tbl
+ """)
+ checkAnswer(
+ df.filter(
+ "right_result != substring_result OR " +
+ "(right_result IS NULL AND substring_result IS NOT NULL) OR " +
+ "(right_result IS NOT NULL AND substring_result IS NULL)"),
+ Seq.empty)
+ }
+ }
+
+ test("RIGHT function with dictionary") {
+ val data = (0 until 1000)
+ .map(_ % 5)
+ .map(i => s"value$i")
+ withParquetTable(data.zipWithIndex, "dict_tbl") {
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, 3) FROM dict_tbl")
+ }
+ }
+
+ test("RIGHT function NULL handling") {
+ // Test NULL propagation with len = 0 (critical edge case)
+ withParquetTable((0 until 5).map(i => (s"test$i", i)), "null_tbl") {
+ checkSparkAnswerAndOperator("SELECT RIGHT(CAST(NULL AS STRING), 0) FROM
null_tbl LIMIT 1")
+ checkSparkAnswerAndOperator("SELECT RIGHT(CAST(NULL AS STRING), -1) FROM
null_tbl LIMIT 1")
+ checkSparkAnswerAndOperator("SELECT RIGHT(CAST(NULL AS STRING), -5) FROM
null_tbl LIMIT 1")
+ }
+
+ // Test non-NULL strings with len <= 0 (should return empty string)
+ withParquetTable((0 until 5).map(i => (s"test$i", i)), "edge_tbl") {
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, 0) FROM edge_tbl")
+ checkSparkAnswerAndOperator("SELECT _1, RIGHT(_1, -1) FROM edge_tbl")
+ }
+
+ // Test mixed NULL and non-NULL values with a table
+ val table = "right_null_edge"
+ withTable(table) {
+ sql(s"create table $table(str string) using parquet")
+ sql(s"insert into $table values('hello'), (NULL), (''), ('world')")
+ checkSparkAnswerAndOperator(s"SELECT str, RIGHT(str, 0) FROM $table")
+ checkSparkAnswerAndOperator(s"SELECT str, RIGHT(str, -1) FROM $table")
+ checkSparkAnswerAndOperator(s"SELECT str, RIGHT(str, 2) FROM $table")
+ }
+ }
+
test("hour, minute, second") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]