Repository: spark
Updated Branches:
  refs/heads/branch-1.4 5e7973df0 -> 5aedfa2ce


[SPARK-8404] [STREAMING] [TESTS] Use thread-safe collections to make the tests 
more reliable

KafkaStreamSuite, DirectKafkaStreamSuite, JavaKafkaStreamSuite and 
JavaDirectKafkaStreamSuite use non-thread-safe collections to collect data in 
one thread and check it in another thread. It may fail the tests.

This PR changes them to thread-safe collections.

Note: I cannot reproduce the test failures in my environment. But at least, 
this PR should make the tests more reliable.

Author: zsxwing <[email protected]>

Closes #6852 from zsxwing/fix-KafkaStreamSuite and squashes the following 
commits:

d464211 [zsxwing] Use thread-safe collections to make the tests more reliable

(cherry picked from commit a06d9c8e76bb904d48764802aa3affff93b00baa)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5aedfa2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5aedfa2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5aedfa2c

Branch: refs/heads/branch-1.4
Commit: 5aedfa2ceb5f9a9d22994a5709f663ee6d9a607e
Parents: 5e7973d
Author: zsxwing <[email protected]>
Authored: Wed Jun 17 15:00:03 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Wed Jun 17 15:00:17 2015 -0700

----------------------------------------------------------------------
 .../streaming/kafka/JavaDirectKafkaStreamSuite.java   |  6 ++----
 .../spark/streaming/kafka/JavaKafkaStreamSuite.java   |  6 ++----
 .../streaming/kafka/DirectKafkaStreamSuite.scala      | 14 ++++++++------
 .../spark/streaming/kafka/KafkaStreamSuite.scala      |  7 ++-----
 4 files changed, 14 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5aedfa2c/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 4c1d6a0..c0669fb 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -18,9 +18,7 @@
 package org.apache.spark.streaming.kafka;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Arrays;
+import java.util.*;
 
 import scala.Tuple2;
 
@@ -116,7 +114,7 @@ public class JavaDirectKafkaStreamSuite implements 
Serializable {
     );
     JavaDStream<String> unifiedStream = stream1.union(stream2);
 
-    final HashSet<String> result = new HashSet<String>();
+    final Set<String> result = Collections.synchronizedSet(new 
HashSet<String>());
     unifiedStream.foreachRDD(
         new Function<JavaRDD<String>, Void>() {
           @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/5aedfa2c/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 540f4ce..e4c6592 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,9 +18,7 @@
 package org.apache.spark.streaming.kafka;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
+import java.util.*;
 
 import scala.Tuple2;
 
@@ -94,7 +92,7 @@ public class JavaKafkaStreamSuite implements Serializable {
       topics,
       StorageLevel.MEMORY_ONLY_SER());
 
-    final HashMap<String, Long> result = new HashMap<String, Long>();
+    final Map<String, Long> result = Collections.synchronizedMap(new 
HashMap<String, Long>());
 
     JavaDStream<String> words = stream.map(
       new Function<Tuple2<String, String>, String>() {

http://git-wip-us.apache.org/repos/asf/spark/blob/5aedfa2c/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 47bbfb6..212eb35 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -99,7 +99,8 @@ class DirectKafkaStreamSuite
         ssc, kafkaParams, topics)
     }
 
-    val allReceived = new ArrayBuffer[(String, String)]
+    val allReceived =
+      new ArrayBuffer[(String, String)] with 
mutable.SynchronizedBuffer[(String, String)]
 
     stream.foreachRDD { rdd =>
     // Get the offset ranges in the RDD
@@ -162,7 +163,7 @@ class DirectKafkaStreamSuite
       "Start offset not from latest"
     )
 
-    val collectedData = new mutable.ArrayBuffer[String]()
+    val collectedData = new mutable.ArrayBuffer[String]() with 
mutable.SynchronizedBuffer[String]
     stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
     ssc.start()
     val newData = Map("b" -> 10)
@@ -208,7 +209,7 @@ class DirectKafkaStreamSuite
       "Start offset not from latest"
     )
 
-    val collectedData = new mutable.ArrayBuffer[String]()
+    val collectedData = new mutable.ArrayBuffer[String]() with 
mutable.SynchronizedBuffer[String]
     stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
     ssc.start()
     val newData = Map("b" -> 10)
@@ -324,7 +325,8 @@ class DirectKafkaStreamSuite
         ssc, kafkaParams, Set(topic))
     }
 
-    val allReceived = new ArrayBuffer[(String, String)]
+    val allReceived =
+      new ArrayBuffer[(String, String)] with 
mutable.SynchronizedBuffer[(String, String)]
 
     stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
     ssc.start()
@@ -350,8 +352,8 @@ class DirectKafkaStreamSuite
 }
 
 object DirectKafkaStreamSuite {
-  val collectedData = new mutable.ArrayBuffer[String]()
-  var total = -1L
+  val collectedData = new mutable.ArrayBuffer[String]() with 
mutable.SynchronizedBuffer[String]
+  @volatile var total = -1L
 
   class InputInfoCollector extends StreamingListener {
     val numRecordsSubmitted = new AtomicLong(0L)

http://git-wip-us.apache.org/repos/asf/spark/blob/5aedfa2c/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 8ee2cc6..797b07f 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -65,7 +65,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually 
with BeforeAndAfter
 
     val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](
       ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
-    val result = new mutable.HashMap[String, Long]()
+    val result = new mutable.HashMap[String, Long]() with 
mutable.SynchronizedMap[String, Long]
     stream.map(_._2).countByValue().foreachRDD { r =>
       val ret = r.collect()
       ret.toMap.foreach { kv =>
@@ -77,10 +77,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually 
with BeforeAndAfter
     ssc.start()
 
     eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
-      assert(sent.size === result.size)
-      sent.keys.foreach { k =>
-        assert(sent(k) === result(k).toInt)
-      }
+      assert(sent === result)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to