[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717418#comment-16717418
 ] 

ASF GitHub Bot commented on SPARK-24561:
----------------------------------------

icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240667535
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##########
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ *     avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ *     avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ *     avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ *     max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *       w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *       w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker (because 
the udf is
+ *     deterministic and window bounds are the same for all windows)
 
 Review comment:
   Oh this relates to the statement "The udf only needs to be evaluated once". 
If the udf is not deterministic, even the window bounds are the same, we still 
need to evaluate it once per window

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --------------------------------------------------------------
>
>                 Key: SPARK-24561
>                 URL: https://issues.apache.org/jira/browse/SPARK-24561
>             Project: Spark
>          Issue Type: Sub-task
>          Components: PySpark
>    Affects Versions: 2.3.1
>            Reporter: Li Jin
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to