Repository: spark
Updated Branches:
  refs/heads/branch-1.6 dfc98fac9 -> 996635793


[SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in 
StreamingListenerSuite

In StreamingListenerSuite."don't call ssc.stop in listener", after the main 
thread calls `ssc.stop()`,  `StreamingContextStoppingCollector` may call  
`ssc.stop()` in the listener bus thread, which is a dead-lock. This PR updated 
`StreamingContextStoppingCollector` to only call `ssc.stop()` in the first 
batch to avoid the dead-lock.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10011 from zsxwing/fix-test-deadlock.

(cherry picked from commit f57e6c9effdb9e282fc8ae66dc30fe053fed5272)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99663579
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99663579
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99663579

Branch: refs/heads/branch-1.6
Commit: 9966357932a50aa22f94f39201559beb8c0c6efb
Parents: dfc98fa
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Nov 27 11:50:18 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Nov 27 11:50:28 2015 -0800

----------------------------------------------------------------------
 .../streaming/StreamingListenerSuite.scala      | 25 +++++++++++++++-----
 1 file changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/99663579/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index df4575a..04cd5bd 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with 
Matchers {
     val batchCounter = new BatchCounter(_ssc)
     _ssc.start()
     // Make sure running at least one batch
-    batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, 
timeout = 10000)
+    if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 
1, timeout = 10000)) {
+      fail("The first batch cannot complete in 10 seconds")
+    }
+    // When reaching here, we can make sure 
`StreamingContextStoppingCollector` won't call
+    // `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
     _ssc.stop()
     assert(contextStoppingCollector.sparkExSeen)
   }
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
  */
 class StreamingContextStoppingCollector(val ssc: StreamingContext) extends 
StreamingListener {
   @volatile var sparkExSeen = false
+
+  private var isFirstBatch = true
+
   override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted) {
-    try {
-      ssc.stop()
-    } catch {
-      case se: SparkException =>
-        sparkExSeen = true
+    if (isFirstBatch) {
+      // We should only call `ssc.stop()` in the first batch. Otherwise, it's 
possible that the main
+      // thread is calling `ssc.stop()`, while 
StreamingContextStoppingCollector is also calling
+      // `ssc.stop()` in the listener thread, which becomes a dead-lock.
+      isFirstBatch = false
+      try {
+        ssc.stop()
+      } catch {
+        case se: SparkException =>
+          sparkExSeen = true
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to