Repository: spark
Updated Branches:
  refs/heads/branch-2.2 df061fd5f -> 5a0a76f16


[SPARK-21414] Refine SlidingWindowFunctionFrame to avoid OOM.

## What changes were proposed in this pull request?

In `SlidingWindowFunctionFrame`, it is now adding all rows to the buffer for 
which the input row value is equal to or less than the output row upper bound, 
then drop all rows from the buffer for which the input row value is smaller 
than the output row lower bound.
This could result in the buffer is very big though the window is small.
For example:
```
select a, b, sum(a)
over (partition by b order by a range between 1000000 following and 1000001 
following)
from table
```
We can refine the logic and just add the qualified rows into buffer.

## How was this patch tested?
Manual test:
Run sql
`select shop, shopInfo, district, sum(revenue) over(partition by district order 
by revenue range between 100 following and 200 following) from revenueList 
limit 10`
against a table with 4  columns(shop: String, shopInfo: String, district: 
String, revenue: Int). The biggest partition is around 2G bytes, containing 
200k lines.
Configure the executor with 2G bytes memory.
With the change in this pr, it works find. Without this change, below exception 
will be thrown.
```
MemoryError: Java heap space
        at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
        at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:62)
        at 
org.apache.spark.sql.execution.window.SlidingWindowFunctionFrame.write(WindowFunctionFrame.scala:201)
        at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:365)
        at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:289)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
```

Author: jinxing <[email protected]>

Closes #18634 from jinxing64/SPARK-21414.

(cherry picked from commit 4eb081cc870a9d1c42aae90418535f7d782553e9)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a0a76f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a0a76f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a0a76f1

Branch: refs/heads/branch-2.2
Commit: 5a0a76f1648729dfa7ed0522dd2cb41ba805a2cd
Parents: df061fd
Author: jinxing <[email protected]>
Authored: Wed Jul 19 21:35:26 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed Jul 19 21:35:59 2017 +0800

----------------------------------------------------------------------
 .../execution/window/WindowFunctionFrame.scala  | 22 ++++++-----
 .../sql/execution/SQLWindowFunctionSuite.scala  | 40 ++++++++++++++++++++
 2 files changed, 53 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a0a76f1/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index af2b4fb..156002e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -195,15 +195,6 @@ private[window] final class SlidingWindowFunctionFrame(
   override def write(index: Int, current: InternalRow): Unit = {
     var bufferUpdated = index == 0
 
-    // Add all rows to the buffer for which the input row value is equal to or 
less than
-    // the output row upper bound.
-    while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, 
index) <= 0) {
-      buffer.add(nextRow.copy())
-      nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
-      inputHighIndex += 1
-      bufferUpdated = true
-    }
-
     // Drop all rows from the buffer for which the input row value is smaller 
than
     // the output row lower bound.
     while (!buffer.isEmpty && lbound.compare(buffer.peek(), inputLowIndex, 
current, index) < 0) {
@@ -212,6 +203,19 @@ private[window] final class SlidingWindowFunctionFrame(
       bufferUpdated = true
     }
 
+    // Add all rows to the buffer for which the input row value is equal to or 
less than
+    // the output row upper bound.
+    while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, 
index) <= 0) {
+      if (lbound.compare(nextRow, inputLowIndex, current, index) < 0) {
+        inputLowIndex += 1
+      } else {
+        buffer.add(nextRow.copy())
+        bufferUpdated = true
+      }
+      nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
+      inputHighIndex += 1
+    }
+
     // Only recalculate and update when the buffer changes.
     if (bufferUpdated) {
       processor.initialize(input.length)

http://git-wip-us.apache.org/repos/asf/spark/blob/5a0a76f1/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index 52e4f04..a9f3fb3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -356,6 +356,46 @@ class SQLWindowFunctionSuite extends QueryTest with 
SharedSQLContext {
     spark.catalog.dropTempView("nums")
   }
 
+  test("window function: mutiple window expressions specified by range in a 
single expression") {
+    val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 
2)).toDF("x", "y")
+    nums.createOrReplaceTempView("nums")
+    withTempView("nums") {
+      val expected =
+        Row(1, 1, 1, 4, null, 8, 25) ::
+          Row(1, 3, 4, 9, 1, 12, 24) ::
+          Row(1, 5, 9, 15, 4, 16, 21) ::
+          Row(1, 7, 16, 21, 8, 9, 16) ::
+          Row(1, 9, 25, 16, 12, null, 9) ::
+          Row(0, 2, 2, 6, null, 10, 30) ::
+          Row(0, 4, 6, 12, 2, 14, 28) ::
+          Row(0, 6, 12, 18, 6, 18, 24) ::
+          Row(0, 8, 20, 24, 10, 10, 18) ::
+          Row(0, 10, 30, 18, 14, null, 10) ::
+          Nil
+
+      val actual = sql(
+        """
+          |SELECT
+          |  y,
+          |  x,
+          |  sum(x) over w1 as history_sum,
+          |  sum(x) over w2 as period_sum1,
+          |  sum(x) over w3 as period_sum2,
+          |  sum(x) over w4 as period_sum3,
+          |  sum(x) over w5 as future_sum
+          |FROM nums
+          |WINDOW
+          |  w1 AS (PARTITION BY y ORDER BY x RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW),
+          |  w2 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 PRECEDING AND 2 
FOLLOWING),
+          |  w3 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 4 PRECEDING AND 2 
PRECEDING ),
+          |  w4 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 FOLLOWING AND 4 
FOLLOWING),
+          |  w5 AS (PARTITION BY y ORDER BY x RANGE BETWEEN CURRENT ROW AND 
UNBOUNDED FOLLOWING)
+        """.stripMargin
+      )
+      checkAnswer(actual, expected)
+    }
+  }
+
   test("SPARK-7595: Window will cause resolve failed with self join") {
     checkAnswer(sql(
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to