Repository: spark
Updated Branches:
refs/heads/master a9066478f -> 0c94e48bc
[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
## What changes were proposed in this pull request?
DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays
sometimes in an infinite loop and times out the build.
There were multiple issues with the test:
1. The first valid stageId is zero when the test started alone and not in a
suite and the following code waits until timeout:
```
eventually(timeout(10.seconds), interval(1.millis)) {
assert(DataFrameRangeSuite.stageToKill > 0)
}
```
2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread
after the reset which ended up in canceling the same stage 2 times. This caused
the infinite wait.
This PR solves this mentioned flakyness by removing the shared
`DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for
synhronization.
## How was this patch tested?
Existing unit test.
Author: Gabor Somogyi <[email protected]>
Closes #20888 from gaborgsomogyi/SPARK-23775.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c94e48b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c94e48b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c94e48b
Branch: refs/heads/master
Commit: 0c94e48bc50717e1627c0d2acd5382d9adc73c97
Parents: a906647
Author: Gabor Somogyi <[email protected]>
Authored: Wed Apr 18 16:37:41 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Wed Apr 18 16:37:41 2018 -0700
----------------------------------------------------------------------
.../apache/spark/sql/DataFrameRangeSuite.scala | 78 +++++++++++---------
1 file changed, 45 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0c94e48b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 57a930d..a0fd740 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -17,14 +17,16 @@
package org.apache.spark.sql
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
import scala.concurrent.duration._
import scala.math.abs
import scala.util.Random
import org.scalatest.concurrent.Eventually
-import org.apache.spark.{SparkException, TaskContext}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -152,39 +154,53 @@ class DataFrameRangeSuite extends QueryTest with
SharedSQLContext with Eventuall
}
test("Cancelling stage in a query with Range.") {
- val listener = new SparkListener {
- override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
- eventually(timeout(10.seconds), interval(1.millis)) {
- assert(DataFrameRangeSuite.stageToKill > 0)
+ // Save and restore the value because SparkContext is shared
+ val savedInterruptOnCancel = sparkContext
+ .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+
+ try {
+
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
"true")
+
+ for (codegen <- Seq(true, false)) {
+ // This countdown latch used to make sure with all the stages
cancelStage called in listener
+ val latch = new CountDownLatch(2)
+
+ val listener = new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ sparkContext.cancelStage(taskStart.stageId)
+ latch.countDown()
+ }
}
- sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
- }
- }
- sparkContext.addSparkListener(listener)
- for (codegen <- Seq(true, false)) {
- withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key ->
codegen.toString()) {
- DataFrameRangeSuite.stageToKill = -1
- val ex = intercept[SparkException] {
- spark.range(0, 100000000000L, 1, 1).map { x =>
- DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
- x
- }.toDF("id").agg(sum("id")).collect()
+ sparkContext.addSparkListener(listener)
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key ->
codegen.toString()) {
+ val ex = intercept[SparkException] {
+ sparkContext.range(0, 10000L, numSlices = 10).mapPartitions { x =>
+ x.synchronized {
+ x.wait()
+ }
+ x
+ }.toDF("id").agg(sum("id")).collect()
+ }
+ ex.getCause() match {
+ case null =>
+ assert(ex.getMessage().contains("cancelled"))
+ case cause: SparkException =>
+ assert(cause.getMessage().contains("cancelled"))
+ case cause: Throwable =>
+ fail("Expected the cause to be SparkException, got " +
cause.toString() + " instead.")
+ }
}
- ex.getCause() match {
- case null =>
- assert(ex.getMessage().contains("cancelled"))
- case cause: SparkException =>
- assert(cause.getMessage().contains("cancelled"))
- case cause: Throwable =>
- fail("Expected the cause to be SparkException, got " +
cause.toString() + " instead.")
+ latch.await(20, TimeUnit.SECONDS)
+ eventually(timeout(20.seconds)) {
+
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum
== 0)
}
+ sparkContext.removeSparkListener(listener)
}
- eventually(timeout(20.seconds)) {
-
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum
== 0)
- }
+ } finally {
+ sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
+ savedInterruptOnCancel)
}
- sparkContext.removeSparkListener(listener)
}
test("SPARK-20430 Initialize Range parameters in a driver side") {
@@ -204,7 +220,3 @@ class DataFrameRangeSuite extends QueryTest with
SharedSQLContext with Eventuall
}
}
}
-
-object DataFrameRangeSuite {
- @volatile var stageToKill = -1
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]