Repository: spark
Updated Branches:
  refs/heads/master 1c4d10b10 -> 0417ce878


[SPARK-19514] Enhancing the test for Range interruption.

Improve the test for SPARK-19514, so that it's clear which stage is being 
cancelled.

Author: Ala Luszczak <a...@databricks.com>

Closes #16914 from ala/fix-range-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0417ce87
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0417ce87
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0417ce87

Branch: refs/heads/master
Commit: 0417ce8787245791342d5609446f0e2fc4c219b1
Parents: 1c4d10b
Author: Ala Luszczak <a...@databricks.com>
Authored: Mon Feb 13 20:07:39 2017 +0100
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Feb 13 20:07:39 2017 +0100

----------------------------------------------------------------------
 .../apache/spark/sql/DataFrameRangeSuite.scala  | 21 ++++++++++----------
 1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0417ce87/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 03bf2d7..acf393a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -23,8 +23,8 @@ import scala.util.Random
 
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.SparkException
-import org.apache.spark.scheduler.{SparkListener, 
SparkListenerExecutorMetricsUpdate, SparkListenerTaskStart}
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -137,23 +137,23 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 
   test("Cancelling stage in a query with Range.") {
     val listener = new SparkListener {
-      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
         eventually(timeout(10.seconds)) {
-          assert(DataFrameRangeSuite.isTaskStarted)
+          assert(DataFrameRangeSuite.stageToKill > 0)
         }
-        sparkContext.cancelStage(taskStart.stageId)
+        sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
       }
     }
 
     sparkContext.addSparkListener(listener)
     for (codegen <- Seq(true, false)) {
       withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-        DataFrameRangeSuite.isTaskStarted = false
+        DataFrameRangeSuite.stageToKill = -1
         val ex = intercept[SparkException] {
-          spark.range(100000L).mapPartitions { x =>
-            DataFrameRangeSuite.isTaskStarted = true
+          spark.range(1000000000L).map { x =>
+            DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
             x
-          }.crossJoin(spark.range(100L)).toDF("a", "b").agg(sum("a"), 
sum("b")).collect()
+          }.toDF("id").agg(sum("id")).collect()
         }
         ex.getCause() match {
           case null =>
@@ -172,6 +172,5 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 }
 
 object DataFrameRangeSuite {
-  @volatile var isTaskStarted = false
+  @volatile var stageToKill = -1
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to