Repository: spark Updated Branches: refs/heads/master 302556ff9 -> a06d9c8e7
[SPARK-8404] [STREAMING] [TESTS] Use thread-safe collections to make the tests more reliable KafkaStreamSuite, DirectKafkaStreamSuite, JavaKafkaStreamSuite and JavaDirectKafkaStreamSuite use non-thread-safe collections to collect data in one thread and check it in another thread. It may fail the tests. This PR changes them to thread-safe collections. Note: I cannot reproduce the test failures in my environment. But at least, this PR should make the tests more reliable. Author: zsxwing <[email protected]> Closes #6852 from zsxwing/fix-KafkaStreamSuite and squashes the following commits: d464211 [zsxwing] Use thread-safe collections to make the tests more reliable Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a06d9c8e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a06d9c8e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a06d9c8e Branch: refs/heads/master Commit: a06d9c8e76bb904d48764802aa3affff93b00baa Parents: 302556f Author: zsxwing <[email protected]> Authored: Wed Jun 17 15:00:03 2015 -0700 Committer: Tathagata Das <[email protected]> Committed: Wed Jun 17 15:00:03 2015 -0700 ---------------------------------------------------------------------- .../streaming/kafka/JavaDirectKafkaStreamSuite.java | 6 ++---- .../spark/streaming/kafka/JavaKafkaStreamSuite.java | 6 ++---- .../streaming/kafka/DirectKafkaStreamSuite.scala | 14 ++++++++------ .../spark/streaming/kafka/KafkaStreamSuite.scala | 7 ++----- 4 files changed, 14 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a06d9c8e/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 4c1d6a0..c0669fb 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -18,9 +18,7 @@ package org.apache.spark.streaming.kafka; import java.io.Serializable; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Arrays; +import java.util.*; import scala.Tuple2; @@ -116,7 +114,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { ); JavaDStream<String> unifiedStream = stream1.union(stream2); - final HashSet<String> result = new HashSet<String>(); + final Set<String> result = Collections.synchronizedSet(new HashSet<String>()); unifiedStream.foreachRDD( new Function<JavaRDD<String>, Void>() { @Override http://git-wip-us.apache.org/repos/asf/spark/blob/a06d9c8e/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 540f4ce..e4c6592 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -18,9 +18,7 @@ package org.apache.spark.streaming.kafka; import java.io.Serializable; -import java.util.HashMap; -import java.util.List; -import java.util.Random; +import java.util.*; import scala.Tuple2; @@ -94,7 +92,7 @@ public class JavaKafkaStreamSuite implements Serializable { topics, StorageLevel.MEMORY_ONLY_SER()); - final HashMap<String, Long> result = new HashMap<String, Long>(); + final Map<String, Long> result = Collections.synchronizedMap(new HashMap<String, Long>()); JavaDStream<String> words = stream.map( new Function<Tuple2<String, String>, String>() { http://git-wip-us.apache.org/repos/asf/spark/blob/a06d9c8e/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 47bbfb6..212eb35 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -99,7 +99,8 @@ class DirectKafkaStreamSuite ssc, kafkaParams, topics) } - val allReceived = new ArrayBuffer[(String, String)] + val allReceived = + new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] stream.foreachRDD { rdd => // Get the offset ranges in the RDD @@ -162,7 +163,7 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() + val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) @@ -208,7 +209,7 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() + val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] stream.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) @@ -324,7 +325,8 @@ class DirectKafkaStreamSuite ssc, kafkaParams, Set(topic)) } - val allReceived = new ArrayBuffer[(String, String)] + val allReceived = + new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] stream.foreachRDD { rdd => allReceived ++= rdd.collect() } ssc.start() @@ -350,8 +352,8 @@ class DirectKafkaStreamSuite } object DirectKafkaStreamSuite { - val collectedData = new mutable.ArrayBuffer[String]() - var total = -1L + val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] + @volatile var total = -1L class InputInfoCollector extends StreamingListener { val numRecordsSubmitted = new AtomicLong(0L) http://git-wip-us.apache.org/repos/asf/spark/blob/a06d9c8e/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 8ee2cc6..797b07f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -65,7 +65,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - val result = new mutable.HashMap[String, Long]() + val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long] stream.map(_._2).countByValue().foreachRDD { r => val ret = r.collect() ret.toMap.foreach { kv => @@ -77,10 +77,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter ssc.start() eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - assert(sent.size === result.size) - sent.keys.foreach { k => - assert(sent(k) === result(k).toInt) - } + assert(sent === result) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
