Repository: spark
Updated Branches:
  refs/heads/master 78bb7f807 -> 086b0c8f6


[SPARK-18617][SPARK-18560][TESTS] Fix flaky test: StreamingContextSuite. 
Receiver data should be deserialized properly

## What changes were proposed in this pull request?

Avoid to create multiple threads to stop StreamingContext. Otherwise, the latch 
added in #16091 can be passed too early.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #16105 from zsxwing/SPARK-18617-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/086b0c8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/086b0c8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/086b0c8f

Branch: refs/heads/master
Commit: 086b0c8f6788b205bc630d5ccf078f77b9751af3
Parents: 78bb7f8
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 1 14:22:49 2016 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 1 14:22:49 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/StreamingContextSuite.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/086b0c8f/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 35eeb9d..5645996 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
     ssc = new StreamingContext(conf, Milliseconds(100))
     val input = ssc.receiverStream(new TestReceiver)
     val latch = new CountDownLatch(1)
+    @volatile var stopping = false
     input.count().foreachRDD { rdd =>
       // Make sure we can read from BlockRDD
-      if (rdd.collect().headOption.getOrElse(0L) > 0) {
+      if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) {
         // Stop StreamingContext to unblock "awaitTerminationOrTimeout"
+        stopping = true
         new Thread() {
           setDaemon(true)
           override def run(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to