This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 70ceb5075 [VL] Support lead/lag window function with negative input
offset (#5026)
70ceb5075 is described below
commit 70ceb5075d4ab01d25ffd58b3256b2256ff3f2d5
Author: PHILO-HE <[email protected]>
AuthorDate: Wed Mar 20 16:24:25 2024 +0800
[VL] Support lead/lag window function with negative input offset (#5026)
---
.../execution/VeloxFunctionsValidateSuite.scala | 14 ++++++++++++
.../backendsapi/SparkPlanExecApi.scala | 25 +++++-----------------
.../execution/WindowExecTransformer.scala | 2 +-
.../expression/WindowFunctionsBuilder.scala | 15 +++++++++++--
4 files changed, 33 insertions(+), 23 deletions(-)
diff --git
a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala
b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala
index 4cffa121d..e5bee9761 100644
---
a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala
+++
b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala
@@ -524,4 +524,18 @@ class VeloxFunctionsValidateSuite extends
VeloxWholeStageTransformerSuite {
}
}
+ test("lag/lead window function with negative input offset") {
+ runQueryAndCompare(
+ "select lag(l_orderkey, -2) over" +
+ " (partition by l_suppkey order by l_orderkey) from lineitem ") {
+ checkOperatorMatch[WindowExecTransformer]
+ }
+
+ runQueryAndCompare(
+ "select lead(l_orderkey, -2) over" +
+ " (partition by l_suppkey order by l_orderkey) from lineitem ") {
+ checkOperatorMatch[WindowExecTransformer]
+ }
+ }
+
}
diff --git
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
index 1fa18634d..4f40a8a23 100644
---
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
@@ -536,26 +536,11 @@ trait SparkPlanExecApi {
attributeSeq = originalInputAttributes)
.doTransform(args))
// Spark only accepts foldable offset. Converts it to LongType
literal.
- var offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int]
- if (wf.isInstanceOf[Lead]) {
- if (offset < 0) {
- // Velox always expects non-negative offset.
- throw new UnsupportedOperationException(
- s"${wf.nodeName} does not support negative offset: $offset")
- }
- } else {
- // For Lag
- // Spark would use `-inputOffset` as offset, so here we forbid
positive offset.
- // Which means the inputOffset is negative.
- if (offset > 0) {
- // Velox always expects non-negative offset.
- throw new UnsupportedOperationException(
- s"${wf.nodeName} does not support negative offset: $offset")
- }
- // Revert the Spark change and use the original input offset
- offset = -offset
- }
- val offsetNode = ExpressionBuilder.makeLiteral(offset.toLong,
LongType, false)
+ val offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int]
+ // Velox only allows negative offset.
WindowFunctionsBuilder#create converts
+ // lag/lead with negative offset to the function with positive
offset. So just
+ // makes offsetNode store positive value.
+ val offsetNode =
ExpressionBuilder.makeLiteral(Math.abs(offset.toLong), LongType, false)
childrenNodeList.add(offsetNode)
// NullType means Null is the default value. Don't pass it to
native.
if (offsetWf.default.dataType != NullType) {
diff --git
a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala
b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala
index 2b81fbec9..e4053b7a5 100644
---
a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala
@@ -183,7 +183,7 @@ case class WindowExecTransformer(
val childCtx = child.asInstanceOf[TransformSupport].doTransform(context)
val operatorId = context.nextOperatorId(this.nodeName)
if (windowExpression == null || windowExpression.isEmpty) {
- // The computing for this project is not needed.
+ // The computing for this operator is not needed.
context.registerEmptyRelToOperator(operatorId)
return childCtx
}
diff --git
a/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala
b/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala
index 07ad3ae23..c610d9705 100644
---
a/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala
@@ -18,16 +18,27 @@ package io.glutenproject.expression
import io.glutenproject.exception.GlutenNotSupportException
import io.glutenproject.expression.ConverterUtils.FunctionConfig
+import io.glutenproject.expression.ExpressionNames.{LAG, LEAD}
import io.glutenproject.substrait.expression.ExpressionBuilder
-import org.apache.spark.sql.catalyst.expressions.{Expression,
WindowExpression, WindowFunction}
+import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, Lag,
Lead, WindowExpression, WindowFunction}
import scala.util.control.Breaks.{break, breakable}
object WindowFunctionsBuilder {
def create(args: java.lang.Object, windowFunc: WindowFunction): Long = {
val functionMap = args.asInstanceOf[java.util.HashMap[String,
java.lang.Long]]
- val substraitFunc =
ExpressionMappings.expressionsMap.get(windowFunc.getClass)
+ val substraitFunc = windowFunc match {
+ // Handle lag with negative inputOffset, e.g., converts lag(c1, -1) to
lead(c1, 1).
+ // Spark uses `-inputOffset` as `offset` for Lag function.
+ case lag: Lag if lag.offset.eval(EmptyRow).asInstanceOf[Int] > 0 =>
+ Some(LEAD)
+ // Handle lead with negative offset, e.g., converts lead(c1, -1) to
lag(c1, 1).
+ case lead: Lead if lead.offset.eval(EmptyRow).asInstanceOf[Int] < 0 =>
+ Some(LAG)
+ case _ =>
+ ExpressionMappings.expressionsMap.get(windowFunc.getClass)
+ }
if (substraitFunc.isEmpty) {
throw new GlutenNotSupportException(
s"not currently supported: ${windowFunc.getClass.getName}.")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]