[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717492#comment-16717492
 ] 

ASF GitHub Bot commented on SPARK-24561:
----------------------------------------

icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240686671
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
 ##########
 @@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, 
IntegerType, TimestampType}
+
+private[sql] abstract class WindowExecBase(
+    windowExpression: Seq[NamedExpression],
+    partitionSpec: Seq[Expression],
+    orderSpec: Seq[SortOrder],
+    child: SparkPlan) extends UnaryExecNode {
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  protected def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+    val references = expressions.zipWithIndex.map { case (e, i) =>
+      // Results of window expressions will be on the right side of child's 
output
+      BoundReference(child.output.size + i, e.dataType, e.nullable)
+    }
+    val unboundToRefMap = expressions.zip(references).toMap
+    val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+    UnsafeProjection.create(
+      child.output ++ patchedWindowExpression,
+      child.output)
+  }
+
+  /**
+   * Create a bound ordering object for a given frame type and offset. A bound 
ordering object is
+   * used to determine which input row lies within the frame boundaries of an 
output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frame to evaluate. This can either be a Row or Range frame.
+   * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
+   * @return a bound ordering object.
+   */
+  protected def createBoundOrdering(
+      frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
+    (frame, bound) match {
+      case (RowFrame, CurrentRow) =>
+        RowBoundOrdering(0)
+
+      case (RowFrame, IntegerLiteral(offset)) =>
+        RowBoundOrdering(offset)
+
+      case (RangeFrame, CurrentRow) =>
+        val ordering = newOrdering(orderSpec, child.output)
+        RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection)
+
+      case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
+        // Use only the first order expression when the offset is non-null.
+        val sortExpr = orderSpec.head
+        val expr = sortExpr.child
+
+        // Create the projection which returns the current 'value'.
+        val current = newMutableProjection(expr :: Nil, child.output)
+
+        // Flip the sign of the offset when processing the order is descending
+        val boundOffset = sortExpr.direction match {
+          case Descending => UnaryMinus(offset)
+          case Ascending => offset
+        }
+
+        // Create the projection which returns the current 'value' modified by 
adding the offset.
+        val boundExpr = (expr.dataType, boundOffset.dataType) match {
+          case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+          case (TimestampType, CalendarIntervalType) =>
+            TimeAdd(expr, boundOffset, Some(timeZone))
+          case (a, b) if a== b => Add(expr, boundOffset)
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --------------------------------------------------------------
>
>                 Key: SPARK-24561
>                 URL: https://issues.apache.org/jira/browse/SPARK-24561
>             Project: Spark
>          Issue Type: Sub-task
>          Components: PySpark
>    Affects Versions: 2.3.1
>            Reporter: Li Jin
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to