CAMEL-8085 Not using the fix port of unit test of karfka

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/97d9198e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/97d9198e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/97d9198e

Branch: refs/heads/master
Commit: 97d9198e1635f151a8b0077d2a7f377cc16ca8ad
Parents: 37f0b22
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Tue Dec 2 11:40:02 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Tue Dec 2 11:40:02 2014 +0800

----------------------------------------------------------------------
 .../component/kafka/BaseEmbeddedKafkaTest.java  | 47 ++++++++++++++++++--
 .../kafka/KafkaConsumerBatchSizeTest.java       |  4 +-
 .../component/kafka/KafkaConsumerFullTest.java  |  4 +-
 .../component/kafka/KafkaProducerFullTest.java  | 12 ++---
 4 files changed, 54 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/97d9198e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index 9b5b002..4c6f18f 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -21,8 +21,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.component.kafka.embedded.EmbeddedKafkaCluster;
 import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
+import org.apache.camel.component.properties.PropertiesComponent;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -31,13 +35,22 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport 
{
 
     static EmbeddedZookeeper embeddedZookeeper;
     static EmbeddedKafkaCluster embeddedKafkaCluster;
-
+    
+    private static volatile int zookeeperPort;
+    
+    private static volatile int karfkaPort;
+   
     @BeforeClass
     public static void beforeClass() {
-        embeddedZookeeper = new EmbeddedZookeeper(2181);
+        // start from somewhere in the 23xxx range
+        zookeeperPort = AvailablePortFinder.getNextAvailable(23000);
+        // find another ports for proxy route test
+        karfkaPort = AvailablePortFinder.getNextAvailable(24000);
+        
+        embeddedZookeeper = new EmbeddedZookeeper(zookeeperPort);
         List<Integer> kafkaPorts = new ArrayList<Integer>();
         // -1 for any available port
-        kafkaPorts.add(9092);
+        kafkaPorts.add(karfkaPort);
         embeddedKafkaCluster = new 
EmbeddedKafkaCluster(embeddedZookeeper.getConnection(), new Properties(), 
kafkaPorts);
         try {
             embeddedZookeeper.startup();
@@ -54,5 +67,33 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
         embeddedKafkaCluster.shutdown();
         embeddedZookeeper.shutdown();
     }
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        Properties prop = new Properties();
+        prop.setProperty("zookeeperPort", "" + getZookeeperPort());
+        prop.setProperty("karfkaPort", "" + getKarfkaPort());
+        jndi.bind("prop", prop);
+        return jndi;
+    }
+    
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.addComponent("properties", new 
PropertiesComponent("ref:prop"));
+        return context;
+    }
+    
+
+    protected static int getZookeeperPort() {
+        return zookeeperPort;
+    }
+    
+    protected static int getKarfkaPort() {
+        return karfkaPort;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/97d9198e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index acac628..198994d 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -33,7 +33,7 @@ public class KafkaConsumerBatchSizeTest extends 
BaseEmbeddedKafkaTest {
 
     public static final String TOPIC = "test";
 
-    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + 
"&zookeeperHost=localhost&zookeeperPort=2181&"
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + 
"&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}&"
         + "groupId=group1&autoOffsetReset=smallest&"
         + "autoCommitEnable=false&batchSize=3&consumerStreams=1")
     private Endpoint from;
@@ -46,7 +46,7 @@ public class KafkaConsumerBatchSizeTest extends 
BaseEmbeddedKafkaTest {
     @Before
     public void before() {
         Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:9092");
+        props.put("metadata.broker.list", "localhost:" + getKarfkaPort());
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         props.put("partitioner.class", 
"org.apache.camel.component.kafka.SimplePartitioner");
         props.put("request.required.acks", "1");

http://git-wip-us.apache.org/repos/asf/camel/blob/97d9198e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 8cd17e9..c49444b 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -35,7 +35,7 @@ public class KafkaConsumerFullTest extends 
BaseEmbeddedKafkaTest {
 
     public static final String TOPIC = "test";
 
-    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + 
"&zookeeperHost=localhost&zookeeperPort=2181"
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + 
"&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}"
         + "&groupId=group1&autoOffsetReset=smallest")
     private Endpoint from;
 
@@ -47,7 +47,7 @@ public class KafkaConsumerFullTest extends 
BaseEmbeddedKafkaTest {
     @Before
     public void before() {
         Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:9092");
+        props.put("metadata.broker.list", "localhost:" + getKarfkaPort());
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         props.put("partitioner.class", 
"org.apache.camel.component.kafka.SimplePartitioner");
         props.put("request.required.acks", "1");

http://git-wip-us.apache.org/repos/asf/camel/blob/97d9198e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 95a4d8d..56e0eb2 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -30,7 +30,6 @@ import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Produce;
@@ -39,7 +38,6 @@ import org.apache.camel.builder.RouteBuilder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +48,7 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaProducerFullTest.class);
 
-    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC 
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC 
         + 
"&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder"
         + "&requestRequiredAcks=-1")
     private Endpoint to;
@@ -63,13 +61,15 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     @Before
     public void before() {
         Properties props = new Properties();
-        props.put("zookeeper.connect", "localhost:2181");
+       
+        props.put("zookeeper.connect", "localhost:" + getZookeeperPort());
         props.put("group.id", KafkaConstants.DEFAULT_GROUP);
-        props.put("zookeeper.session.timeout.ms", "400");
+        props.put("zookeeper.session.timeout.ms", "6000");
+        props.put("zookeeper.connectiontimeout.ms", "12000");
         props.put("zookeeper.sync.time.ms", "200");
         props.put("auto.commit.interval.ms", "1000");
         props.put("auto.offset.reset", "smallest");
-
+       
         kafkaConsumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
     }
 

Reply via email to