Repository: spark
Updated Branches:
  refs/heads/master 603c4f8eb -> 9533f5390


[SPARK-6005][TESTS] Fix flaky test: 
o.a.s.streaming.kafka.DirectKafkaStreamSuite.offset recovery

## What changes were proposed in this pull request?

Because this test extracts data from `DStream.generatedRDDs` before stopping, 
it may get data before checkpointing. Then after recovering from the 
checkpoint, `recoveredOffsetRanges` may contain something not in 
`offsetRangesBeforeStop`, which will fail the test. Adding `Thread.sleep(1000)` 
before `ssc.stop()` will reproduce this failure.

This PR just moves the logic of `offsetRangesBeforeStop` (also renamed to 
`offsetRangesAfterStop`) after `ssc.stop()` to fix the flaky test.

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu <[email protected]>

Closes #12903 from zsxwing/SPARK-6005.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9533f539
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9533f539
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9533f539

Branch: refs/heads/master
Commit: 9533f5390a3ad7ab96a7bea01cdb6aed89503a51
Parents: 603c4f8
Author: Shixiong Zhu <[email protected]>
Authored: Tue May 10 13:26:53 2016 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Tue May 10 13:26:53 2016 -0700

----------------------------------------------------------------------
 .../kafka/DirectKafkaStreamSuite.scala          | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9533f539/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index f14ff67..cb782d2 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -281,14 +281,20 @@ class DirectKafkaStreamSuite
       sendDataAndWaitForReceive(i)
     }
 
+    ssc.stop()
+
     // Verify that offset ranges were generated
-    val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
-    assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+    // Since "offsetRangesAfterStop" will be used to compare with 
"recoveredOffsetRanges", we should
+    // collect offset ranges after stopping. Otherwise, because new RDDs keep 
being generated before
+    // stopping, we may not be able to get the latest RDDs, then 
"recoveredOffsetRanges" will
+    // contain something not in "offsetRangesAfterStop".
+    val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
+    assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
     assert(
-      offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+      offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
       "starting offset not zero"
     )
-    ssc.stop()
+
     logInfo("====== RESTARTING ========")
 
     // Recover context from checkpoints
@@ -298,12 +304,14 @@ class DirectKafkaStreamSuite
     // Verify offset ranges have been recovered
     val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
     assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
-    val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, 
x._2.toSet) }
+    val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
     assert(
       recoveredOffsetRanges.forall { or =>
         earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
       },
-      "Recovered ranges are not the same as the ones generated"
+      "Recovered ranges are not the same as the ones generated\n" +
+        s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
+        s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
     )
     // Restart context, give more data and verify the total at the end
     // If the total is write that means each records has been received only 
once


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to