Repository: spark
Updated Branches:
  refs/heads/branch-1.1 eba399b3c -> 44856654c


[HOTFIX][Streaming] Handle port collisions in flume polling test

This is failing my tests in #1777. @tdas

Author: Andrew Or <andrewo...@gmail.com>

Closes #1803 from andrewor14/fix-flaky-streaming-test and squashes the 
following commits:

ea11a03 [Andrew Or] Catch all exceptions caused by BindExceptions
54a0ca0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
fix-flaky-streaming-test
664095c [Andrew Or] Tone down bind exception message
af3ddc9 [Andrew Or] Handle port collisions in flume polling test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f91e9dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f91e9dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f91e9dc

Branch: refs/heads/branch-1.1
Commit: 3f91e9dc2563f3c5c473c781bd3078cc620ff880
Parents: eba399b
Author: Andrew Or <andrewo...@gmail.com>
Authored: Wed Aug 6 16:34:53 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Aug 20 18:33:16 2014 -0700

----------------------------------------------------------------------
 .../flume/FlumePollingStreamSuite.scala         | 32 +++++++++++++++++++-
 1 file changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3f91e9dc/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 2e4ac7c..e3a5bdc 100644
--- 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, 
StreamingContext}
 import org.apache.spark.streaming.flume.sink._
+import org.apache.spark.util.Utils
 
 class FlumePollingStreamSuite extends TestSuiteBase {
 
@@ -45,8 +46,37 @@ class FlumePollingStreamSuite extends TestSuiteBase {
   val eventsPerBatch = 100
   val totalEventsPerChannel = batchCount * eventsPerBatch
   val channelCapacity = 5000
+  val maxAttempts = 5
 
   test("flume polling test") {
+    testMultipleTimes(testFlumePolling)
+  }
+
+  test("flume polling test multiple hosts") {
+    testMultipleTimes(testFlumePollingMultipleHost)
+  }
+
+  /**
+   * Run the given test until no more java.net.BindException's are thrown.
+   * Do this only up to a certain attempt limit.
+   */
+  private def testMultipleTimes(test: () => Unit): Unit = {
+    var testPassed = false
+    var attempt = 0
+    while (!testPassed && attempt < maxAttempts) {
+      try {
+        test()
+        testPassed = true
+      } catch {
+        case e: Exception if Utils.isBindCollision(e) =>
+          logWarning("Exception when running flume polling test: " + e)
+          attempt += 1
+      }
+    }
+    assert(testPassed, s"Test failed after $attempt attempts!")
+  }
+
+  private def testFlumePolling(): Unit = {
     val testPort = getTestPort
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
@@ -80,7 +110,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
     channel.stop()
   }
 
-  test("flume polling test multiple hosts") {
+  private def testFlumePollingMultipleHost(): Unit = {
     val testPort = getTestPort
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to