Repository: spark
Updated Branches:
  refs/heads/branch-2.1 021952d58 -> 9a3c5bd70


[FLAKY-TEST] InputStreamsSuite.socket input stream

## What changes were proposed in this pull request?

https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream

## How was this patch tested?

Tested 2,000 times.

Author: Burak Yavuz <[email protected]>

Closes #16343 from brkyvz/sock.

(cherry picked from commit afe36516e4b4031196ee2e0a04980ac49208ea6b)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a3c5bd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a3c5bd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a3c5bd7

Branch: refs/heads/branch-2.1
Commit: 9a3c5bd7082474cfb01f021aef103e44d12e2ff1
Parents: 021952d
Author: Burak Yavuz <[email protected]>
Authored: Wed Dec 21 17:23:48 2016 -0800
Committer: Tathagata Das <[email protected]>
Committed: Wed Dec 21 17:23:58 2016 -0800

----------------------------------------------------------------------
 .../spark/streaming/InputStreamsSuite.scala     | 55 ++++++++------------
 1 file changed, 23 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9a3c5bd7/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 9ecfa48..6fb50a4 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
         val expectedOutput = input.map(_.toString)
         for (i <- input.indices) {
           testServer.send(input(i).toString + "\n")
-          Thread.sleep(500)
           clock.advance(batchDuration.milliseconds)
         }
-        // Make sure we finish all batches before "stop"
-        if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) {
-          fail("Timeout: cannot finish all batches in 30 seconds")
+
+        eventually(eventuallyTimeout) {
+          clock.advance(batchDuration.milliseconds)
+          // Verify whether data received was as expected
+          logInfo("--------------------------------")
+          logInfo("output.size = " + outputQueue.size)
+          logInfo("output")
+          outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + 
"]"))
+          logInfo("expected output.size = " + expectedOutput.size)
+          logInfo("expected output")
+          expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+          logInfo("--------------------------------")
+
+          // Verify whether all the elements received are as expected
+          // (whether the elements were received one in each interval is not 
verified)
+          val output: Array[String] = outputQueue.asScala.flatMap(x => 
x).toArray
+          assert(output.length === expectedOutput.size)
+          for (i <- output.indices) {
+            assert(output(i) === expectedOutput(i))
+          }
         }
 
-        // Ensure progress listener has been notified of all events
-        ssc.sparkContext.listenerBus.waitUntilEmpty(500)
-
-        // Verify all "InputInfo"s have been reported
-        assert(ssc.progressListener.numTotalReceivedRecords === input.size)
-        assert(ssc.progressListener.numTotalProcessedRecords === input.size)
-
-        logInfo("Stopping server")
-        testServer.stop()
-        logInfo("Stopping context")
-        ssc.stop()
-
-        // Verify whether data received was as expected
-        logInfo("--------------------------------")
-        logInfo("output.size = " + outputQueue.size)
-        logInfo("output")
-        outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-        logInfo("expected output.size = " + expectedOutput.size)
-        logInfo("expected output")
-        expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-        logInfo("--------------------------------")
-
-        // Verify whether all the elements received are as expected
-        // (whether the elements were received one in each interval is not 
verified)
-        val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
-        assert(output.length === expectedOutput.size)
-        for (i <- output.indices) {
-          assert(output(i) === expectedOutput(i))
+        eventually(eventuallyTimeout) {
+          assert(ssc.progressListener.numTotalReceivedRecords === input.length)
+          assert(ssc.progressListener.numTotalProcessedRecords === 
input.length)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to