Repository: spark Updated Branches: refs/heads/branch-1.1 eba399b3c -> 44856654c
[HOTFIX][Streaming] Handle port collisions in flume polling test This is failing my tests in #1777. @tdas Author: Andrew Or <andrewo...@gmail.com> Closes #1803 from andrewor14/fix-flaky-streaming-test and squashes the following commits: ea11a03 [Andrew Or] Catch all exceptions caused by BindExceptions 54a0ca0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-flaky-streaming-test 664095c [Andrew Or] Tone down bind exception message af3ddc9 [Andrew Or] Handle port collisions in flume polling test Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f91e9dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f91e9dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f91e9dc Branch: refs/heads/branch-1.1 Commit: 3f91e9dc2563f3c5c473c781bd3078cc620ff880 Parents: eba399b Author: Andrew Or <andrewo...@gmail.com> Authored: Wed Aug 6 16:34:53 2014 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Aug 20 18:33:16 2014 -0700 ---------------------------------------------------------------------- .../flume/FlumePollingStreamSuite.scala | 32 +++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3f91e9dc/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 2e4ac7c..e3a5bdc 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext} import org.apache.spark.streaming.flume.sink._ +import org.apache.spark.util.Utils class FlumePollingStreamSuite extends TestSuiteBase { @@ -45,8 +46,37 @@ class FlumePollingStreamSuite extends TestSuiteBase { val eventsPerBatch = 100 val totalEventsPerChannel = batchCount * eventsPerBatch val channelCapacity = 5000 + val maxAttempts = 5 test("flume polling test") { + testMultipleTimes(testFlumePolling) + } + + test("flume polling test multiple hosts") { + testMultipleTimes(testFlumePollingMultipleHost) + } + + /** + * Run the given test until no more java.net.BindException's are thrown. + * Do this only up to a certain attempt limit. + */ + private def testMultipleTimes(test: () => Unit): Unit = { + var testPassed = false + var attempt = 0 + while (!testPassed && attempt < maxAttempts) { + try { + test() + testPassed = true + } catch { + case e: Exception if Utils.isBindCollision(e) => + logWarning("Exception when running flume polling test: " + e) + attempt += 1 + } + } + assert(testPassed, s"Test failed after $attempt attempts!") + } + + private def testFlumePolling(): Unit = { val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) @@ -80,7 +110,7 @@ class FlumePollingStreamSuite extends TestSuiteBase { channel.stop() } - test("flume polling test multiple hosts") { + private def testFlumePollingMultipleHost(): Unit = { val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org