Repository: spark Updated Branches: refs/heads/master ba02f6cb5 -> f57e6c9ef
[SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in StreamingListenerSuite In StreamingListenerSuite."don't call ssc.stop in listener", after the main thread calls `ssc.stop()`, `StreamingContextStoppingCollector` may call `ssc.stop()` in the listener bus thread, which is a dead-lock. This PR updated `StreamingContextStoppingCollector` to only call `ssc.stop()` in the first batch to avoid the dead-lock. Author: Shixiong Zhu <shixi...@databricks.com> Closes #10011 from zsxwing/fix-test-deadlock. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f57e6c9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f57e6c9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f57e6c9e Branch: refs/heads/master Commit: f57e6c9effdb9e282fc8ae66dc30fe053fed5272 Parents: ba02f6c Author: Shixiong Zhu <shixi...@databricks.com> Authored: Fri Nov 27 11:50:18 2015 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Fri Nov 27 11:50:18 2015 -0800 ---------------------------------------------------------------------- .../streaming/StreamingListenerSuite.scala | 25 +++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f57e6c9e/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index df4575a..04cd5bd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { val batchCounter = new BatchCounter(_ssc) _ssc.start() // Make sure running at least one batch - batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) + if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)) { + fail("The first batch cannot complete in 10 seconds") + } + // When reaching here, we can make sure `StreamingContextStoppingCollector` won't call + // `ssc.stop()`, so it's safe to call `_ssc.stop()` now. _ssc.stop() assert(contextStoppingCollector.sparkExSeen) } @@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener { */ class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener { @volatile var sparkExSeen = false + + private var isFirstBatch = true + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { - try { - ssc.stop() - } catch { - case se: SparkException => - sparkExSeen = true + if (isFirstBatch) { + // We should only call `ssc.stop()` in the first batch. Otherwise, it's possible that the main + // thread is calling `ssc.stop()`, while StreamingContextStoppingCollector is also calling + // `ssc.stop()` in the listener thread, which becomes a dead-lock. + isFirstBatch = false + try { + ssc.stop() + } catch { + case se: SparkException => + sparkExSeen = true + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org