Repository: spark
Updated Branches:
  refs/heads/master 32471ba0a -> d81f29eca


[SPARK-23881][CORE][TEST] Fix flaky test JobCancellationSuite."interruptible 
iterator of shuffle reader"

## What changes were proposed in this pull request?

The test case JobCancellationSuite."interruptible iterator of shuffle reader" 
has been flaky because `KillTask` event is handled asynchronously, so it can 
happen that the semaphore is released but the task is still running.
Actually we only have to check if the total number of processed elements is 
less than the input elements number, so we know the task get cancelled.

## How was this patch tested?

The new test case still fails without the purposed patch, and succeeded in 
current master.

Author: Xingbo Jiang <xingbo.ji...@databricks.com>

Closes #20993 from jiangxb1987/JobCancellationSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d81f29ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d81f29ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d81f29ec

Branch: refs/heads/master
Commit: d81f29ecafe8fc9816e36087e3b8acdc93d6cc1b
Parents: 32471ba
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Mon Apr 9 10:19:22 2018 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Mon Apr 9 10:19:22 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/JobCancellationSuite.scala | 31 ++++++++++++--------
 1 file changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d81f29ec/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 3b793bb..61da413 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -332,13 +332,15 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
     import JobCancellationSuite._
     sc = new SparkContext("local[2]", "test interruptible iterator")
 
+    // Increase the number of elements to be proceeded to avoid this test 
being flaky.
+    val numElements = 10000
     val taskCompletedSem = new Semaphore(0)
 
     sc.addSparkListener(new SparkListener {
       override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
         // release taskCancelledSemaphore when cancelTasks event has been 
posted
         if (stageCompleted.stageInfo.stageId == 1) {
-          taskCancelledSemaphore.release(1000)
+          taskCancelledSemaphore.release(numElements)
         }
       }
 
@@ -349,28 +351,31 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
       }
     })
 
-    val f = sc.parallelize(1 to 1000).map { i => (i, i) }
+    // Explicitly disable interrupt task thread on cancelling tasks, so the 
task thread can only be
+    // interrupted by `InterruptibleIterator`.
+    sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
+
+    val f = sc.parallelize(1 to numElements).map { i => (i, i) }
       .repartitionAndSortWithinPartitions(new HashPartitioner(1))
       .mapPartitions { iter =>
         taskStartedSemaphore.release()
         iter
       }.foreachAsync { x =>
-        if (x._1 >= 10) {
-          // This block of code is partially executed. It will be blocked when 
x._1 >= 10 and the
-          // next iteration will be cancelled if the source iterator is 
interruptible. Then in this
-          // case, the maximum num of increment would be 10(|1...10|)
-          taskCancelledSemaphore.acquire()
-        }
+        // Block this code from being executed, until the job get cancelled. 
In this case, if the
+        // source iterator is interruptible, the max number of increment 
should be under
+        // `numElements`.
+        taskCancelledSemaphore.acquire()
         executionOfInterruptibleCounter.getAndIncrement()
     }
 
     taskStartedSemaphore.acquire()
     // Job is cancelled when:
     // 1. task in reduce stage has been started, guaranteed by previous line.
-    // 2. task in reduce stage is blocked after processing at most 10 records 
as
-    //    taskCancelledSemaphore is not released until cancelTasks event is 
posted
-    // After job being cancelled, task in reduce stage will be cancelled and 
no more iteration are
-    // executed.
+    // 2. task in reduce stage is blocked as taskCancelledSemaphore is not 
released until
+    //    JobCancelled event is posted.
+    // After job being cancelled, task in reduce stage will be cancelled 
asynchronously, thus
+    // partial of the inputs should not get processed (It's very unlikely that 
Spark can process
+    // 10000 elements between JobCancelled is posted and task is really 
killed).
     f.cancel()
 
     val e = intercept[SparkException](f.get()).getCause
@@ -378,7 +383,7 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
 
     // Make sure tasks are indeed completed.
     taskCompletedSem.acquire()
-    assert(executionOfInterruptibleCounter.get() <= 10)
+    assert(executionOfInterruptibleCounter.get() < numElements)
  }
 
   def testCount() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to