Repository: spark
Updated Branches:
refs/heads/master b293afc42 -> 85cf06368
[SPARK-5559] [Streaming] [Test] Remove oppotunity we met flakiness when running
FlumeStreamSuite
When we run FlumeStreamSuite on Jenkins, sometimes we get error like as follows.
sbt.ForkMain$ForkError: The code passed to eventually never returned
normally. Attempted 52 times over 10.094849836 seconds. Last failure message:
Error connecting to localhost/127.0.0.1:23456.
at
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
at
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
at
org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
at
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:307)
at
org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
at
org.apache.spark.streaming.flume.FlumeStreamSuite.writeAndVerify(FlumeStreamSuite.scala:116)
at
org.apache.spark.streaming.flume.FlumeStreamSuite.org$apache$spark$streaming$flume$FlumeStreamSuite$$testFlumeStream(FlumeStreamSuite.scala:74)
at
org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply$mcV$sp(FlumeStreamSuite.scala:66)
at
org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
at
org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
at
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
at
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
at
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
This error is caused by check-then-act logic when it find free-port .
/** Find a free port */
private def findFreePort(): Int = {
Utils.startServiceOnPort(23456, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
}, conf)._2
}
Removing the check-then-act is not easy but we can reduce the chance of having
the error by choosing random value for initial port instead of 23456.
Author: Kousuke Saruta <[email protected]>
Closes #4337 from sarutak/SPARK-5559 and squashes the following commits:
16f109f [Kousuke Saruta] Added `require` to Utils#startServiceOnPort
c39d8b6 [Kousuke Saruta] Merge branch 'SPARK-5559' of github.com:sarutak/spark
into SPARK-5559
1610ba2 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark
into SPARK-5559
33357e3 [Kousuke Saruta] Changed "findFreePort" method in MQTTStreamSuite and
FlumeStreamSuite so that it can choose valid random port
a9029fe [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark
into SPARK-5559
9489ef9 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark
into SPARK-5559
8212e42 [Kousuke Saruta] Modified default port used in FlumeStreamSuite from
23456 to random value
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85cf0636
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85cf0636
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85cf0636
Branch: refs/heads/master
Commit: 85cf0636825d1997d64d0bdc04618f29b7222da1
Parents: b293afc
Author: Kousuke Saruta <[email protected]>
Authored: Tue Mar 24 16:13:25 2015 +0000
Committer: Sean Owen <[email protected]>
Committed: Tue Mar 24 16:20:52 2015 +0000
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++++
.../org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 5 +++--
.../scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 4 +++-
3 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/85cf0636/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d9a6716..0b5a914 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1876,6 +1876,10 @@ private[spark] object Utils extends Logging {
startService: Int => (T, Int),
conf: SparkConf,
serviceName: String = ""): (T, Int) = {
+
+ require(startPort == 0 || (1024 <= startPort && startPort < 65536),
+ "startPort should be between 1024 and 65535 (inclusive), or 0 for a
random free port.")
+
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
for (offset <- 0 to maxRetries) {
http://git-wip-us.apache.org/repos/asf/spark/blob/85cf0636/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 322de7b..51d273a 100644
---
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -28,6 +28,7 @@ import scala.language.postfixOps
import com.google.common.base.Charsets
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
import org.apache.flume.source.avro
import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.jboss.netty.channel.ChannelPipeline
@@ -40,7 +41,6 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext,
TestOutputStream}
-import org.apache.spark.streaming.scheduler.{StreamingListener,
StreamingListenerReceiverStarted}
import org.apache.spark.util.Utils
class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with
Logging {
@@ -76,7 +76,8 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter
with Matchers with L
/** Find a free port */
private def findFreePort(): Int = {
- Utils.startServiceOnPort(23456, (trialPort: Int) => {
+ val candidatePort = RandomUtils.nextInt(1024, 65536)
+ Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
http://git-wip-us.apache.org/repos/asf/spark/blob/85cf0636/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git
a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 0f3298a..24d78ec 100644
---
a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++
b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -25,6 +25,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.commons.lang3.RandomUtils
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
@@ -113,7 +114,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with
BeforeAndAfter {
}
private def findFreePort(): Int = {
- Utils.startServiceOnPort(23456, (trialPort: Int) => {
+ val candidatePort = RandomUtils.nextInt(1024, 65536)
+ Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]