Repository: spark Updated Branches: refs/heads/master 1c4d10b10 -> 0417ce878
[SPARK-19514] Enhancing the test for Range interruption. Improve the test for SPARK-19514, so that it's clear which stage is being cancelled. Author: Ala Luszczak <a...@databricks.com> Closes #16914 from ala/fix-range-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0417ce87 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0417ce87 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0417ce87 Branch: refs/heads/master Commit: 0417ce8787245791342d5609446f0e2fc4c219b1 Parents: 1c4d10b Author: Ala Luszczak <a...@databricks.com> Authored: Mon Feb 13 20:07:39 2017 +0100 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Feb 13 20:07:39 2017 +0100 ---------------------------------------------------------------------- .../apache/spark/sql/DataFrameRangeSuite.scala | 21 ++++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0417ce87/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 03bf2d7..acf393a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -23,8 +23,8 @@ import scala.util.Random import org.scalatest.concurrent.Eventually -import org.apache.spark.SparkException -import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskStart} +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -137,23 +137,23 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { eventually(timeout(10.seconds)) { - assert(DataFrameRangeSuite.isTaskStarted) + assert(DataFrameRangeSuite.stageToKill > 0) } - sparkContext.cancelStage(taskStart.stageId) + sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { - DataFrameRangeSuite.isTaskStarted = false + DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(100000L).mapPartitions { x => - DataFrameRangeSuite.isTaskStarted = true + spark.range(1000000000L).map { x => + DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() x - }.crossJoin(spark.range(100L)).toDF("a", "b").agg(sum("a"), sum("b")).collect() + }.toDF("id").agg(sum("id")).collect() } ex.getCause() match { case null => @@ -172,6 +172,5 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall } object DataFrameRangeSuite { - @volatile var isTaskStarted = false + @volatile var stageToKill = -1 } - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org