Repository: spark
Updated Branches:
  refs/heads/master bb96012b7 -> 74fb2ecf7


[SPARK-3615][Streaming]Fix Kafka unit test hard coded Zookeeper port issue

Details can be seen in 
[SPARK-3615](https://issues.apache.org/jira/browse/SPARK-3615).

Author: jerryshao <[email protected]>

Closes #2483 from jerryshao/SPARK_3615 and squashes the following commits:

8555563 [jerryshao] Fix Kafka unit test hard coded Zookeeper port issue


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74fb2ecf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74fb2ecf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74fb2ecf

Branch: refs/heads/master
Commit: 74fb2ecf7afc2d314f6477f8f2e6134614387453
Parents: bb96012
Author: jerryshao <[email protected]>
Authored: Wed Sep 24 17:18:55 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Wed Sep 24 17:18:55 2014 -0700

----------------------------------------------------------------------
 .../streaming/kafka/JavaKafkaStreamSuite.java   |  2 +-
 .../streaming/kafka/KafkaStreamSuite.scala      | 46 ++++++++++++++------
 2 files changed, 34 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74fb2ecf/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 0571454..efb0099 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -81,7 +81,7 @@ public class JavaKafkaStreamSuite extends 
LocalJavaStreamingContext implements S
         Predef.<Tuple2<String, Object>>conforms()));
 
     HashMap<String, String> kafkaParams = new HashMap<String, String>();
-    kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
+    kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + 
testSuite.zkPort());
     kafkaParams.put("group.id", "test-consumer-" + 
KafkaTestUtils.random().nextInt(10000));
     kafkaParams.put("auto.offset.reset", "smallest");
 

http://git-wip-us.apache.org/repos/asf/spark/blob/74fb2ecf/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index c0b55e9..6943326 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -24,7 +24,7 @@ import java.util.{Properties, Random}
 import scala.collection.mutable
 
 import kafka.admin.CreateTopicCommand
-import kafka.common.TopicAndPartition
+import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import kafka.utils.ZKStringSerializer
 import kafka.serializer.{StringDecoder, StringEncoder}
@@ -42,14 +42,13 @@ import org.apache.spark.util.Utils
 class KafkaStreamSuite extends TestSuiteBase {
   import KafkaTestUtils._
 
-  val zkConnect = "localhost:2181"
+  val zkHost = "localhost"
+  var zkPort: Int = 0
   val zkConnectionTimeout = 6000
   val zkSessionTimeout = 6000
 
-  val brokerPort = 9092
-  val brokerProps = getBrokerConfig(brokerPort, zkConnect)
-  val brokerConf = new KafkaConfig(brokerProps)
-
+  protected var brokerPort = 9092
+  protected var brokerConf: KafkaConfig = _
   protected var zookeeper: EmbeddedZookeeper = _
   protected var zkClient: ZkClient = _
   protected var server: KafkaServer = _
@@ -59,16 +58,35 @@ class KafkaStreamSuite extends TestSuiteBase {
 
   override def beforeFunction() {
     // Zookeeper server startup
-    zookeeper = new EmbeddedZookeeper(zkConnect)
+    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+    // Get the actual zookeeper binding port
+    zkPort = zookeeper.actualPort
     logInfo("==================== 0 ====================")
-    zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
ZKStringSerializer)
+
+    zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout,
+      ZKStringSerializer)
     logInfo("==================== 1 ====================")
 
     // Kafka broker startup
-    server = new KafkaServer(brokerConf)
-    logInfo("==================== 2 ====================")
-    server.startup()
-    logInfo("==================== 3 ====================")
+    var bindSuccess: Boolean = false
+    while(!bindSuccess) {
+      try {
+        val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
+        brokerConf = new KafkaConfig(brokerProps)
+        server = new KafkaServer(brokerConf)
+        logInfo("==================== 2 ====================")
+        server.startup()
+        logInfo("==================== 3 ====================")
+        bindSuccess = true
+      } catch {
+        case e: KafkaException =>
+          if (e.getMessage != null && e.getMessage.contains("Socket server 
failed to bind to")) {
+            brokerPort += 1
+          }
+        case e: Exception => throw new Exception("Kafka server create failed", 
e)
+      }
+    }
+
     Thread.sleep(2000)
     logInfo("==================== 4 ====================")
     super.beforeFunction()
@@ -92,7 +110,7 @@ class KafkaStreamSuite extends TestSuiteBase {
     createTopic(topic)
     produceAndSendMessage(topic, sent)
 
-    val kafkaParams = Map("zookeeper.connect" -> zkConnect,
+    val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
       "group.id" -> s"test-consumer-${random.nextInt(10000)}",
       "auto.offset.reset" -> "smallest")
 
@@ -200,6 +218,8 @@ object KafkaTestUtils {
     factory.configure(new InetSocketAddress(ip, port), 16)
     factory.startup(zookeeper)
 
+    val actualPort = factory.getLocalPort
+
     def shutdown() {
       factory.shutdown()
       Utils.deleteRecursively(snapshotDir)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to