Repository: spark Updated Branches: refs/heads/branch-2.1 4c673c656 -> 4746674ad
[SPARK-18617][SPARK-18560][TESTS] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly ## What changes were proposed in this pull request? Avoid to create multiple threads to stop StreamingContext. Otherwise, the latch added in #16091 can be passed too early. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixi...@databricks.com> Closes #16105 from zsxwing/SPARK-18617-2. (cherry picked from commit 086b0c8f6788b205bc630d5ccf078f77b9751af3) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4746674a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4746674a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4746674a Branch: refs/heads/branch-2.1 Commit: 4746674ad3acfc38bbd3e2708d75280c19ef0202 Parents: 4c673c6 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Thu Dec 1 14:22:49 2016 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Thu Dec 1 14:22:56 2016 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/streaming/StreamingContextSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4746674a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 35eeb9d..5645996 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc = new StreamingContext(conf, Milliseconds(100)) val input = ssc.receiverStream(new TestReceiver) val latch = new CountDownLatch(1) + @volatile var stopping = false input.count().foreachRDD { rdd => // Make sure we can read from BlockRDD - if (rdd.collect().headOption.getOrElse(0L) > 0) { + if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) { // Stop StreamingContext to unblock "awaitTerminationOrTimeout" + stopping = true new Thread() { setDaemon(true) override def run(): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org