CAMEL-8085 Not using the fix port of unit test of karfka
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/97d9198e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/97d9198e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/97d9198e Branch: refs/heads/master Commit: 97d9198e1635f151a8b0077d2a7f377cc16ca8ad Parents: 37f0b22 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue Dec 2 11:40:02 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue Dec 2 11:40:02 2014 +0800 ---------------------------------------------------------------------- .../component/kafka/BaseEmbeddedKafkaTest.java | 47 ++++++++++++++++++-- .../kafka/KafkaConsumerBatchSizeTest.java | 4 +- .../component/kafka/KafkaConsumerFullTest.java | 4 +- .../component/kafka/KafkaProducerFullTest.java | 12 ++--- 4 files changed, 54 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/97d9198e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java index 9b5b002..4c6f18f 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java @@ -21,8 +21,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import org.apache.camel.CamelContext; import org.apache.camel.component.kafka.embedded.EmbeddedKafkaCluster; import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper; +import org.apache.camel.component.properties.PropertiesComponent; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -31,13 +35,22 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport { static EmbeddedZookeeper embeddedZookeeper; static EmbeddedKafkaCluster embeddedKafkaCluster; - + + private static volatile int zookeeperPort; + + private static volatile int karfkaPort; + @BeforeClass public static void beforeClass() { - embeddedZookeeper = new EmbeddedZookeeper(2181); + // start from somewhere in the 23xxx range + zookeeperPort = AvailablePortFinder.getNextAvailable(23000); + // find another ports for proxy route test + karfkaPort = AvailablePortFinder.getNextAvailable(24000); + + embeddedZookeeper = new EmbeddedZookeeper(zookeeperPort); List<Integer> kafkaPorts = new ArrayList<Integer>(); // -1 for any available port - kafkaPorts.add(9092); + kafkaPorts.add(karfkaPort); embeddedKafkaCluster = new EmbeddedKafkaCluster(embeddedZookeeper.getConnection(), new Properties(), kafkaPorts); try { embeddedZookeeper.startup(); @@ -54,5 +67,33 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport { embeddedKafkaCluster.shutdown(); embeddedZookeeper.shutdown(); } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + + Properties prop = new Properties(); + prop.setProperty("zookeeperPort", "" + getZookeeperPort()); + prop.setProperty("karfkaPort", "" + getKarfkaPort()); + jndi.bind("prop", prop); + return jndi; + } + + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.addComponent("properties", new PropertiesComponent("ref:prop")); + return context; + } + + + protected static int getZookeeperPort() { + return zookeeperPort; + } + + protected static int getKarfkaPort() { + return karfkaPort; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/97d9198e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java index acac628..198994d 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java @@ -33,7 +33,7 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest { public static final String TOPIC = "test"; - @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181&" + @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}&" + "groupId=group1&autoOffsetReset=smallest&" + "autoCommitEnable=false&batchSize=3&consumerStreams=1") private Endpoint from; @@ -46,7 +46,7 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest { @Before public void before() { Properties props = new Properties(); - props.put("metadata.broker.list", "localhost:9092"); + props.put("metadata.broker.list", "localhost:" + getKarfkaPort()); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner"); props.put("request.required.acks", "1"); http://git-wip-us.apache.org/repos/asf/camel/blob/97d9198e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java index 8cd17e9..c49444b 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java @@ -35,7 +35,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { public static final String TOPIC = "test"; - @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181" + @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}" + "&groupId=group1&autoOffsetReset=smallest") private Endpoint from; @@ -47,7 +47,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { @Before public void before() { Properties props = new Properties(); - props.put("metadata.broker.list", "localhost:9092"); + props.put("metadata.broker.list", "localhost:" + getKarfkaPort()); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner"); props.put("request.required.acks", "1"); http://git-wip-us.apache.org/repos/asf/camel/blob/97d9198e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index 95a4d8d..56e0eb2 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -30,7 +30,6 @@ import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; - import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; import org.apache.camel.Produce; @@ -39,7 +38,6 @@ import org.apache.camel.builder.RouteBuilder; import org.junit.After; import org.junit.Before; import org.junit.Test; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +48,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class); - @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder" + "&requestRequiredAcks=-1") private Endpoint to; @@ -63,13 +61,15 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { @Before public void before() { Properties props = new Properties(); - props.put("zookeeper.connect", "localhost:2181"); + + props.put("zookeeper.connect", "localhost:" + getZookeeperPort()); props.put("group.id", KafkaConstants.DEFAULT_GROUP); - props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.session.timeout.ms", "6000"); + props.put("zookeeper.connectiontimeout.ms", "12000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); - + kafkaConsumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); }