Repository: spark
Updated Branches:
  refs/heads/branch-1.3 6e82c46bf -> f8f9a64eb


[SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite

The test was incorrect. Instead of counting the number of records, it counted 
the number of partitions of RDD generated by DStream. Which is not its 
intention. I will be testing this patch multiple times to understand its 
flakiness.

PS: This was caused by my refactoring in 
https://github.com/apache/spark/pull/4384/

koeninger check it out.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #4597 from tdas/kafka-flaky-test and squashes the following commits:

d236235 [Tathagata Das] Unignored last test.
e9a1820 [Tathagata Das] fix test

(cherry picked from commit 3912d332464dcd124c60b734724c34d9742466a4)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8f9a64e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8f9a64e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8f9a64e

Branch: refs/heads/branch-1.3
Commit: f8f9a64ebdfe581d62773a6276f66c75d4ba43e1
Parents: 6e82c46
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Feb 17 22:44:16 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Feb 17 22:44:27 2015 -0800

----------------------------------------------------------------------
 .../kafka/DirectKafkaStreamSuite.scala          | 28 +++++++++++---------
 1 file changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f8f9a64e/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 9260944..17ca9d1 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
 import java.io.File
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
 import kafka.serializer.StringDecoder
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.{Eventually, Timeouts}
+import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.util.Utils
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
 
 class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   with BeforeAndAfter with BeforeAndAfterAll with Eventually {
@@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   }
 
 
-  ignore("basic stream receiving with multiple topics and smallest starting 
offset") {
+  test("basic stream receiving with multiple topics and smallest starting 
offset") {
     val topics = Set("basic1", "basic2", "basic3")
     val data = Map("a" -> 7, "b" -> 9)
     topics.foreach { t =>
       createTopic(t)
       sendMessages(t, data)
     }
+    val totalSent = data.values.sum * topics.size
     val kafkaParams = Map(
       "metadata.broker.list" -> s"$brokerAddress",
       "auto.offset.reset" -> "smallest"
@@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
       KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
         ssc, kafkaParams, topics)
     }
-    var total = 0L
+
+    val allReceived = new ArrayBuffer[(String, String)]
 
     stream.foreachRDD { rdd =>
     // Get the offset ranges in the RDD
@@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
       collected.foreach { case (partSize, rangeSize) =>
         assert(partSize === rangeSize, "offset ranges are wrong")
       }
-      total += collected.size  // Add up all the collected items
     }
+    stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
     ssc.start()
     eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
-      assert(total === data.values.sum * topics.size, "didn't get all 
messages")
+      assert(allReceived.size === totalSent,
+        "didn't get expected number of messages, messages:\n" + 
allReceived.mkString("\n"))
     }
     ssc.stop()
   }
 
-  ignore("receiving from largest starting offset") {
+  test("receiving from largest starting offset") {
     val topic = "largest"
     val topicPartition = TopicAndPartition(topic, 0)
     val data = Map("a" -> 10)
@@ -158,7 +162,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   }
 
 
-  ignore("creating stream by offset") {
+  test("creating stream by offset") {
     val topic = "offset"
     val topicPartition = TopicAndPartition(topic, 0)
     val data = Map("a" -> 10)
@@ -204,7 +208,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   }
 
   // Test to verify the offset ranges can be recovered from the checkpoints
-  ignore("offset recovery") {
+  test("offset recovery") {
     val topic = "recovery"
     createTopic(topic)
     testDir = Utils.createTempDir()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to