This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new d6dcee4 [SPARK-30806][SQL] Evaluate once per group in UnboundedWindowFunctionFrame d6dcee4 is described below commit d6dcee487d4d353eea78ee567c256025b6bb0eff Author: wangguangxin.cn <wangguangxin...@gmail.com> AuthorDate: Mon Feb 17 18:15:54 2020 +0100 [SPARK-30806][SQL] Evaluate once per group in UnboundedWindowFunctionFrame ### What changes were proposed in this pull request? We only need to do aggregate evaluation once per group in `UnboundedWindowFunctionFrame` ### Why are the changes needed? Currently, in `UnboundedWindowFunctionFrame.write`,it re-evaluate the processor for each row in a group, which is not necessary in fact which I'll address later. It hurts performance when the evaluation is time-consuming (for example, Percentile's eval need to sort its buffer and do some calculation). In our production, there is a percentile with window operation sql, it costs more than 10 hours in SparkSQL while 10min in Hive. In fact, `UnboundedWindowFunctionFrame` can be treated as `SlidingWindowFunctionFrame` with `lbound = UnboundedPreceding` and `ubound = UnboundedFollowing`, just as its comments. In that case, `SlidingWindowFunctionFrame` also only do evaluation once for each group. The performance issue can be reproduced by running the follow scripts in local spark-shell ``` spark.range(100*100).map(i => (i, "India")).toDF("uv", "country").createOrReplaceTempView("test") sql("select uv, country, percentile(uv, 0.95) over (partition by country) as ptc95 from test").collect.foreach(println) ``` Before this patch, the sql costs **128048 ms**. With this patch, the sql costs **3485 ms**. If we increase the data size to 1000*1000 for example, then spark cannot even produce result without this patch(I'v waited for several hours). ### Does this PR introduce any user-facing change? NO ### How was this patch tested? Existing UT Closes #27558 from WangGuangxin/windows. Authored-by: wangguangxin.cn <wangguangxin...@gmail.com> Signed-off-by: herman <her...@databricks.com> --- .../apache/spark/sql/execution/window/WindowFunctionFrame.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala index d5f2ffa..181ff42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala @@ -277,6 +277,8 @@ final class UnboundedWindowFunctionFrame( while (iterator.hasNext) { processor.update(iterator.next()) } + + processor.evaluate(target) } upperBound = rows.length @@ -284,11 +286,8 @@ final class UnboundedWindowFunctionFrame( /** Write the frame columns for the current row to the given target row. */ override def write(index: Int, current: InternalRow): Unit = { - // Unfortunately we cannot assume that evaluation is deterministic. So we need to re-evaluate - // for each row. - if (processor != null) { - processor.evaluate(target) - } + // The results are the same for each row in the partition, and have been evaluated in prepare. + // Don't need to recalculate here. } override def currentLowerBound(): Int = lowerBound --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org