This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 70ceb5075 [VL] Support lead/lag window function with negative input 
offset (#5026)
70ceb5075 is described below

commit 70ceb5075d4ab01d25ffd58b3256b2256ff3f2d5
Author: PHILO-HE <[email protected]>
AuthorDate: Wed Mar 20 16:24:25 2024 +0800

    [VL] Support lead/lag window function with negative input offset (#5026)
---
 .../execution/VeloxFunctionsValidateSuite.scala    | 14 ++++++++++++
 .../backendsapi/SparkPlanExecApi.scala             | 25 +++++-----------------
 .../execution/WindowExecTransformer.scala          |  2 +-
 .../expression/WindowFunctionsBuilder.scala        | 15 +++++++++++--
 4 files changed, 33 insertions(+), 23 deletions(-)

diff --git 
a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala
 
b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala
index 4cffa121d..e5bee9761 100644
--- 
a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala
+++ 
b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala
@@ -524,4 +524,18 @@ class VeloxFunctionsValidateSuite extends 
VeloxWholeStageTransformerSuite {
     }
   }
 
+  test("lag/lead window function with negative input offset") {
+    runQueryAndCompare(
+      "select lag(l_orderkey, -2) over" +
+        " (partition by l_suppkey order by l_orderkey) from lineitem ") {
+      checkOperatorMatch[WindowExecTransformer]
+    }
+
+    runQueryAndCompare(
+      "select lead(l_orderkey, -2) over" +
+        " (partition by l_suppkey order by l_orderkey) from lineitem ") {
+      checkOperatorMatch[WindowExecTransformer]
+    }
+  }
+
 }
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
index 1fa18634d..4f40a8a23 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
@@ -536,26 +536,11 @@ trait SparkPlanExecApi {
                   attributeSeq = originalInputAttributes)
                 .doTransform(args))
             // Spark only accepts foldable offset. Converts it to LongType 
literal.
-            var offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int]
-            if (wf.isInstanceOf[Lead]) {
-              if (offset < 0) {
-                // Velox always expects non-negative offset.
-                throw new UnsupportedOperationException(
-                  s"${wf.nodeName} does not support negative offset: $offset")
-              }
-            } else {
-              // For Lag
-              // Spark would use `-inputOffset` as offset, so here we forbid 
positive offset.
-              // Which means the inputOffset is negative.
-              if (offset > 0) {
-                // Velox always expects non-negative offset.
-                throw new UnsupportedOperationException(
-                  s"${wf.nodeName} does not support negative offset: $offset")
-              }
-              // Revert the Spark change and use the original input offset
-              offset = -offset
-            }
-            val offsetNode = ExpressionBuilder.makeLiteral(offset.toLong, 
LongType, false)
+            val offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int]
+            // Velox only allows negative offset. 
WindowFunctionsBuilder#create converts
+            // lag/lead with negative offset to the function with positive 
offset. So just
+            // makes offsetNode store positive value.
+            val offsetNode = 
ExpressionBuilder.makeLiteral(Math.abs(offset.toLong), LongType, false)
             childrenNodeList.add(offsetNode)
             // NullType means Null is the default value. Don't pass it to 
native.
             if (offsetWf.default.dataType != NullType) {
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala
 
b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala
index 2b81fbec9..e4053b7a5 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala
@@ -183,7 +183,7 @@ case class WindowExecTransformer(
     val childCtx = child.asInstanceOf[TransformSupport].doTransform(context)
     val operatorId = context.nextOperatorId(this.nodeName)
     if (windowExpression == null || windowExpression.isEmpty) {
-      // The computing for this project is not needed.
+      // The computing for this operator is not needed.
       context.registerEmptyRelToOperator(operatorId)
       return childCtx
     }
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala
 
b/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala
index 07ad3ae23..c610d9705 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala
@@ -18,16 +18,27 @@ package io.glutenproject.expression
 
 import io.glutenproject.exception.GlutenNotSupportException
 import io.glutenproject.expression.ConverterUtils.FunctionConfig
+import io.glutenproject.expression.ExpressionNames.{LAG, LEAD}
 import io.glutenproject.substrait.expression.ExpressionBuilder
 
-import org.apache.spark.sql.catalyst.expressions.{Expression, 
WindowExpression, WindowFunction}
+import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, Lag, 
Lead, WindowExpression, WindowFunction}
 
 import scala.util.control.Breaks.{break, breakable}
 
 object WindowFunctionsBuilder {
   def create(args: java.lang.Object, windowFunc: WindowFunction): Long = {
     val functionMap = args.asInstanceOf[java.util.HashMap[String, 
java.lang.Long]]
-    val substraitFunc = 
ExpressionMappings.expressionsMap.get(windowFunc.getClass)
+    val substraitFunc = windowFunc match {
+      // Handle lag with negative inputOffset, e.g., converts lag(c1, -1) to 
lead(c1, 1).
+      // Spark uses `-inputOffset` as `offset` for Lag function.
+      case lag: Lag if lag.offset.eval(EmptyRow).asInstanceOf[Int] > 0 =>
+        Some(LEAD)
+      // Handle lead with negative offset, e.g., converts lead(c1, -1) to 
lag(c1, 1).
+      case lead: Lead if lead.offset.eval(EmptyRow).asInstanceOf[Int] < 0 =>
+        Some(LAG)
+      case _ =>
+        ExpressionMappings.expressionsMap.get(windowFunc.getClass)
+    }
     if (substraitFunc.isEmpty) {
       throw new GlutenNotSupportException(
         s"not currently supported: ${windowFunc.getClass.getName}.")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to