Repository: spark
Updated Branches:
  refs/heads/master ea0a5eef2 -> ee3af15fe


[SPARK-22363][SQL][TEST] Add unit test for Window spilling

## What changes were proposed in this pull request?

There is already test using window spilling, but the test coverage is not ideal.

In this PR the already existing test was fixed and additional cases added.

## How was this patch tested?

Automated: Pass the Jenkins.

Author: Gabor Somogyi <gabor.g.somo...@gmail.com>

Closes #20022 from gaborgsomogyi/SPARK-22363.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee3af15f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee3af15f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee3af15f

Branch: refs/heads/master
Commit: ee3af15fea18356a9223d61cfe6aaa98ab4dc733
Parents: ea0a5ee
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
Authored: Sun Dec 31 14:47:23 2017 +0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Sun Dec 31 14:47:23 2017 +0800

----------------------------------------------------------------------
 .../sql/DataFrameWindowFunctionsSuite.scala     | 44 +++++++++++++++++++-
 1 file changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee3af15f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index ea725af..01c988e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import java.sql.{Date, Timestamp}
 
+import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction, Window}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -518,9 +519,46 @@ class DataFrameWindowFunctionsSuite extends QueryTest with 
SharedSQLContext {
       Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0)))
   }
 
+  test("Window spill with less than the inMemoryThreshold") {
+    val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value")
+    val window = Window.partitionBy($"key").orderBy($"value")
+
+    withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "2",
+      SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "2") {
+      assertNotSpilled(sparkContext, "select") {
+        df.select($"key", sum("value").over(window)).collect()
+      }
+    }
+  }
+
+  test("Window spill with more than the inMemoryThreshold but less than the 
spillThreshold") {
+    val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value")
+    val window = Window.partitionBy($"key").orderBy($"value")
+
+    withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
+      SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "2") {
+      assertNotSpilled(sparkContext, "select") {
+        df.select($"key", sum("value").over(window)).collect()
+      }
+    }
+  }
+
+  test("Window spill with more than the inMemoryThreshold and spillThreshold") 
{
+    val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value")
+    val window = Window.partitionBy($"key").orderBy($"value")
+
+    withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
+      SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") {
+      assertSpilled(sparkContext, "select") {
+        df.select($"key", sum("value").over(window)).collect()
+      }
+    }
+  }
+
   test("SPARK-21258: complex object in combination with spilling") {
     // Make sure we trigger the spilling path.
-    withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") {
+    withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
+      SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") {
       val sampleSchema = new StructType().
         add("f0", StringType).
         add("f1", LongType).
@@ -558,7 +596,9 @@ class DataFrameWindowFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
       import testImplicits._
 
-      spark.read.schema(sampleSchema).json(input.toDS()).select(c0, 
c1).foreach { _ => () }
+      assertSpilled(sparkContext, "select") {
+        spark.read.schema(sampleSchema).json(input.toDS()).select(c0, 
c1).foreach { _ => () }
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to