This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new edae5b804 [VL] Support date type in window range frame (#6653)
edae5b804 is described below
commit edae5b8046aa79ea16cf3cb3b011d6eac1fe7acd
Author: Mingliang Zhu <[email protected]>
AuthorDate: Thu Aug 1 08:46:25 2024 +0800
[VL] Support date type in window range frame (#6653)
---
.../org/apache/gluten/execution/TestOperator.scala | 7 +++
.../columnar/rewrite/PullOutPreProject.scala | 22 +++-----
.../apache/gluten/utils/PullOutProjectHelper.scala | 58 +++++++++++++++++++---
3 files changed, 65 insertions(+), 22 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 0fb5fb549..86a62a447 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -354,6 +354,13 @@ class TestOperator extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPla
Seq("sort", "streaming").foreach {
windowType =>
withSQLConf("spark.gluten.sql.columnar.backend.velox.window.type" ->
windowType) {
+ runQueryAndCompare(
+ "select max(l_partkey) over" +
+ " (partition by l_suppkey order by l_commitdate" +
+ " RANGE BETWEEN 1 PRECEDING AND CURRENT ROW) from lineitem ") {
+ checkSparkOperatorMatch[WindowExecTransformer]
+ }
+
runQueryAndCompare(
"select max(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey" +
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
index 73b8ab260..51cdb76a1 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
@@ -76,17 +76,7 @@ object PullOutPreProject extends RewriteSingleNode with
PullOutProjectHelper {
}
case _ => false
}.isDefined) ||
- window.windowExpression.exists(_.find {
- case we: WindowExpression =>
- we.windowSpec.frameSpecification match {
- case swf: SpecifiedWindowFrame
- if needPreComputeRangeFrame(swf) &&
supportPreComputeRangeFrame(
- we.windowSpec.orderSpec) =>
- true
- case _ => false
- }
- case _ => false
- }.isDefined)
+ windowNeedPreComputeRangeFrame(window)
case plan if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan)
=>
val window = SparkShimLoader.getSparkShims
.getWindowGroupLimitExecShim(plan)
@@ -176,14 +166,16 @@ object PullOutPreProject extends RewriteSingleNode with
PullOutProjectHelper {
case window: WindowExec if needsPreProject(window) =>
val expressionMap = new mutable.HashMap[Expression, NamedExpression]()
- // Handle orderSpec.
- val newOrderSpec = getNewSortOrder(window.orderSpec, expressionMap)
-
- // Handle partitionSpec.
+ // Handle foldable orderSpec and foldable partitionSpec. Spark analyzer
rule
+ // ExtractWindowExpressions will extract expressions from non-foldable
orderSpec and
+ // partitionSpec.
+ var newOrderSpec = getNewSortOrder(window.orderSpec, expressionMap)
val newPartitionSpec =
window.partitionSpec.map(replaceExpressionWithAttribute(_,
expressionMap))
// Handle windowExpressions.
+ newOrderSpec = rewriteOrderSpecs(window, newOrderSpec, expressionMap)
+
val newWindowExpressions = window.windowExpression.toIndexedSeq.map {
_.transform {
case we: WindowExpression => rewriteWindowExpression(we,
newOrderSpec, expressionMap)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/utils/PullOutProjectHelper.scala
b/gluten-core/src/main/scala/org/apache/gluten/utils/PullOutProjectHelper.scala
index 12055f9e9..85be57493 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/utils/PullOutProjectHelper.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/utils/PullOutProjectHelper.scala
@@ -22,8 +22,10 @@ import org.apache.gluten.exception.{GlutenException,
GlutenNotSupportException}
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
AggregateFunction}
import org.apache.spark.sql.execution.aggregate._
+import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.types.{ByteType, DateType, IntegerType, LongType,
ShortType}
+import java.sql.Date
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
@@ -161,14 +163,32 @@ trait PullOutProjectHelper {
case _: PreComputeRangeFrameBound => bound
case _ if !bound.foldable => bound
case _ if bound.foldable =>
+ val orderExpr = if (expressionMap.contains(orderSpec.child)) {
+ expressionMap(orderSpec.child).asInstanceOf[Alias].child
+ } else {
+ orderSpec.child
+ }
val a = expressionMap
.getOrElseUpdate(
bound.canonicalized,
- Alias(Add(orderSpec.child, bound), generatePreAliasName)())
+ Alias(Add(orderExpr, bound), generatePreAliasName)())
PreComputeRangeFrameBound(a.asInstanceOf[Alias], bound)
}
}
+ protected def windowNeedPreComputeRangeFrame(w: WindowExec): Boolean =
+ w.windowExpression.exists(_.find {
+ case we: WindowExpression =>
+ we.windowSpec.frameSpecification match {
+ case swf: SpecifiedWindowFrame
+ if needPreComputeRangeFrame(swf) && supportPreComputeRangeFrame(
+ we.windowSpec.orderSpec) =>
+ true
+ case _ => false
+ }
+ case _ => false
+ }.isDefined)
+
protected def needPreComputeRangeFrame(swf: SpecifiedWindowFrame): Boolean =
{
BackendsApiManager.getSettings.needPreComputeRangeFrameBoundary &&
swf.frameType == RangeFrame &&
@@ -185,6 +205,36 @@ trait PullOutProjectHelper {
}
}
+ /**
+ * Convert DateType to IntType for orderSpec if needPreComputeRangeFrame,
because spark's frame
+ * type does not support DateType. It does not affect the correctness of
sort.
+ */
+ protected def rewriteOrderSpecs(
+ window: WindowExec,
+ orderSpecs: Seq[SortOrder],
+ expressionMap: mutable.HashMap[Expression, NamedExpression]):
Seq[SortOrder] = {
+ if (windowNeedPreComputeRangeFrame(window)) {
+ // This is guaranteed by Spark, but we still check it here
+ if (orderSpecs.size != 1) {
+ throw new GlutenException(
+ s"A range window frame with value boundaries expects one and only
one " +
+ s"order by expression: ${orderSpecs.mkString(",")}")
+ }
+ val orderSpec = orderSpecs.head
+ orderSpec.child.dataType match {
+ case DateType =>
+ val alias = Alias(
+ DateDiff(orderSpec.child, Literal(Date.valueOf("1970-01-01"))),
+ generatePreAliasName)()
+ expressionMap.getOrElseUpdate(alias.toAttribute, alias)
+ Seq(orderSpec.copy(child = alias.toAttribute))
+ case _ => orderSpecs
+ }
+ } else {
+ orderSpecs
+ }
+ }
+
protected def rewriteWindowExpression(
we: WindowExpression,
orderSpecs: Seq[SortOrder],
@@ -202,12 +252,6 @@ trait PullOutProjectHelper {
val newWindowSpec = we.windowSpec.frameSpecification match {
case swf: SpecifiedWindowFrame if needPreComputeRangeFrame(swf) =>
- // This is guaranteed by Spark, but we still check it here
- if (orderSpecs.size != 1) {
- throw new GlutenException(
- s"A range window frame with value boundaries expects one and only
one " +
- s"order by expression: ${orderSpecs.mkString(",")}")
- }
val orderSpec = orderSpecs.head
val lowerFrameCol = preComputeRangeFrameBoundary(swf.lower, orderSpec,
expressionMap)
val upperFrameCol = preComputeRangeFrameBoundary(swf.upper, orderSpec,
expressionMap)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]