Repository: spark
Updated Branches:
refs/heads/branch-2.3 be184d16e -> 9b562d6fe
[SPARK-24022][TEST] Make SparkContextSuite not flaky
## What changes were proposed in this pull request?
SparkContextSuite.test("Cancelling stages/jobs with custom reasons.") could
stay in an infinite loop because of the problem found and fixed in
[SPARK-23775](https://issues.apache.org/jira/browse/SPARK-23775).
This PR solves this mentioned flakyness by removing shared variable usages when
cancel happens in a loop and using wait and CountDownLatch for synhronization.
## How was this patch tested?
Existing unit test.
Author: Gabor Somogyi <[email protected]>
Closes #21105 from gaborgsomogyi/SPARK-24022.
(cherry picked from commit e55953b0bf2a80b34127ba123417ee54955a6064)
Signed-off-by: Marcelo Vanzin <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b562d6f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b562d6f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b562d6f
Branch: refs/heads/branch-2.3
Commit: 9b562d6fef765cb8357dbc31390e60b5947a9069
Parents: be184d1
Author: Gabor Somogyi <[email protected]>
Authored: Thu Apr 19 15:06:27 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Thu Apr 19 15:06:38 2018 -0700
----------------------------------------------------------------------
.../org/apache/spark/SparkContextSuite.scala | 61 +++++++++-----------
1 file changed, 26 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9b562d6f/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index b30bd74..ce9f2be 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark
import java.io.File
import java.net.{MalformedURLException, URI}
import java.nio.charset.StandardCharsets
-import java.util.concurrent.{Semaphore, TimeUnit}
+import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
import scala.concurrent.duration._
@@ -498,45 +498,36 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
test("Cancelling stages/jobs with custom reasons.") {
sc = new SparkContext(new
SparkConf().setAppName("test").setMaster("local"))
+ sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
val REASON = "You shall not pass"
- val slices = 10
- val listener = new SparkListener {
- override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
- if (SparkContextSuite.cancelStage) {
- eventually(timeout(10.seconds)) {
- assert(SparkContextSuite.isTaskStarted)
+ for (cancelWhat <- Seq("stage", "job")) {
+ // This countdown latch used to make sure stage or job canceled in
listener
+ val latch = new CountDownLatch(1)
+
+ val listener = cancelWhat match {
+ case "stage" =>
+ new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit
= {
+ sc.cancelStage(taskStart.stageId, REASON)
+ latch.countDown()
+ }
}
- sc.cancelStage(taskStart.stageId, REASON)
- SparkContextSuite.cancelStage = false
- SparkContextSuite.semaphore.release(slices)
- }
- }
-
- override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
- if (SparkContextSuite.cancelJob) {
- eventually(timeout(10.seconds)) {
- assert(SparkContextSuite.isTaskStarted)
+ case "job" =>
+ new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ sc.cancelJob(jobStart.jobId, REASON)
+ latch.countDown()
+ }
}
- sc.cancelJob(jobStart.jobId, REASON)
- SparkContextSuite.cancelJob = false
- SparkContextSuite.semaphore.release(slices)
- }
}
- }
- sc.addSparkListener(listener)
-
- for (cancelWhat <- Seq("stage", "job")) {
- SparkContextSuite.semaphore.drainPermits()
- SparkContextSuite.isTaskStarted = false
- SparkContextSuite.cancelStage = (cancelWhat == "stage")
- SparkContextSuite.cancelJob = (cancelWhat == "job")
+ sc.addSparkListener(listener)
val ex = intercept[SparkException] {
- sc.range(0, 10000L, numSlices = slices).mapPartitions { x =>
- SparkContextSuite.isTaskStarted = true
- // Block waiting for the listener to cancel the stage or job.
- SparkContextSuite.semaphore.acquire()
+ sc.range(0, 10000L, numSlices = 10).mapPartitions { x =>
+ x.synchronized {
+ x.wait()
+ }
x
}.count()
}
@@ -550,9 +541,11 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
fail("Expected the cause to be SparkException, got " +
cause.toString() + " instead.")
}
+ latch.await(20, TimeUnit.SECONDS)
eventually(timeout(20.seconds)) {
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum
== 0)
}
+ sc.removeSparkListener(listener)
}
}
@@ -637,8 +630,6 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
}
object SparkContextSuite {
- @volatile var cancelJob = false
- @volatile var cancelStage = false
@volatile var isTaskStarted = false
@volatile var taskKilled = false
@volatile var taskSucceeded = false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]