Added test using Kafka API and example application
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/c1437063 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/c1437063 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/c1437063 Branch: refs/heads/0.5.0 Commit: c14370630975d0b83741704f0df266cf7033a6a8 Parents: 37e890c Author: Andrew Kennedy <[email protected]> Authored: Wed Mar 20 19:49:50 2013 +0000 Committer: Andrew Kennedy <[email protected]> Committed: Fri Apr 19 10:36:06 2013 +0100 ---------------------------------------------------------------------- examples/simple-messaging-pubsub/README.txt | 7 +- .../java/brooklyn/demo/KafkaClusterExample.java | 40 +++++++++++ .../brooklyn/demo/StandaloneBrokerExample.java | 54 --------------- .../demo/StandaloneQpidBrokerExample.java | 54 +++++++++++++++ software/messaging/pom.xml | 73 +++++++++++++------- .../entity/messaging/kafka/KafkaBroker.java | 4 +- .../entity/messaging/kafka/KafkaBrokerImpl.java | 7 +- .../entity/messaging/kafka/KafkaZookeeper.java | 2 +- .../messaging/kafka/KafkaIntegrationTest.groovy | 21 ++++-- .../entity/messaging/kafka/KafkaSupport.java | 72 +++++++++++++++++++ 10 files changed, 240 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/README.txt ---------------------------------------------------------------------- diff --git a/examples/simple-messaging-pubsub/README.txt b/examples/simple-messaging-pubsub/README.txt index 6048867..e27992e 100644 --- a/examples/simple-messaging-pubsub/README.txt +++ b/examples/simple-messaging-pubsub/README.txt @@ -7,7 +7,7 @@ The commands below assume that the `brooklyn` script is already on your $PATH, a export BROOKLYN_CLASSPATH=$(pwd)/target/classes # Launches a qpid broker on localhost - brooklyn -v launch --app brooklyn.demo.StandaloneBrokerExample --location localhost + brooklyn -v launch --app brooklyn.demo.StandaloneQpidBrokerExample --location localhost # You can get the broker's URL from the brooklyn web-console at http://localhost:8081 # by looking at the broker entity's sensors or from the verbose output from the application startup @@ -19,6 +19,11 @@ The commands below assume that the `brooklyn` script is already on your $PATH, a # Test publishing a message to the broker java -cp "./resources/lib/*:./target/classes" brooklyn.demo.Publish ${URL} +To test a Kafka distributed messaging cluster example, use the following command: + + # Launches a Kafka cluster on AWS EC2 with two brokers + brooklyn -v launch --app brooklyn.demo.KafkaClusterExample --location aws-ec2:eu-west-1 + --- For more information, please visit: http://brooklyncentral.github.com/use/examples/messaging/ http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java ---------------------------------------------------------------------- diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java new file mode 100644 index 0000000..fae6bb6 --- /dev/null +++ b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java @@ -0,0 +1,40 @@ +package brooklyn.demo; + +import java.util.List; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.messaging.kafka.KafkaCluster; +import brooklyn.entity.proxying.BasicEntitySpec; +import brooklyn.launcher.BrooklynLauncher; +import brooklyn.util.CommandLineUtil; + +import com.google.common.collect.Lists; + +/** Kafka Cluster Application */ +public class KafkaClusterExample extends ApplicationBuilder { + + public static final String DEFAULT_LOCATION = "localhost"; + + /** Configure the application. */ + protected void doBuild() { + createChild(BasicEntitySpec.newInstance(KafkaCluster.class) + .configure("initialSize", 2)); + + appDisplayName("Kafka cluster application"); + } + + public static void main(String[] argv) { + List<String> args = Lists.newArrayList(argv); + String port = CommandLineUtil.getCommandLineOption(args, "--port", "8081+"); + String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION); + + BrooklynLauncher launcher = BrooklynLauncher.newInstance() + .application(new KafkaClusterExample()) + .webconsolePort(port) + .location(location) + .start(); + + Entities.dumpInfo(launcher.getApplications()); + } +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java ---------------------------------------------------------------------- diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java deleted file mode 100644 index 83cdc31..0000000 --- a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java +++ /dev/null @@ -1,54 +0,0 @@ -package brooklyn.demo; - -import java.util.List; - -import brooklyn.entity.basic.AbstractApplication; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.messaging.amqp.AmqpServer; -import brooklyn.entity.messaging.qpid.QpidBroker; -import brooklyn.entity.proxying.EntitySpecs; -import brooklyn.launcher.BrooklynLauncher; -import brooklyn.util.CommandLineUtil; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -/** Qpid Broker Application */ -public class StandaloneBrokerExample extends AbstractApplication { - - public static final String CUSTOM_CONFIG_PATH = "classpath://custom-config.xml"; - public static final String PASSWD_PATH = "classpath://passwd"; - public static final String QPID_BDBSTORE_JAR_PATH = "classpath://qpid-bdbstore-0.14.jar"; - public static final String BDBSTORE_JAR_PATH = "classpath://je-5.0.34.jar"; - - public static final String DEFAULT_LOCATION = "localhost"; - - @Override - public void init() { - // Configure the Qpid broker entity - QpidBroker broker = addChild(EntitySpecs.spec(QpidBroker.class) - .configure("amqpPort", 5672) - .configure("amqpVersion", AmqpServer.AMQP_0_10) - .configure("runtimeFiles", ImmutableMap.builder() - .put(QpidBroker.CONFIG_XML, CUSTOM_CONFIG_PATH) - .put(QpidBroker.PASSWD, PASSWD_PATH) - .put("lib/opt/qpid-bdbstore-0.14.jar", QPID_BDBSTORE_JAR_PATH) - .put("lib/opt/je-5.0.34.jar", BDBSTORE_JAR_PATH) - .build()) - .configure("queue", "testQueue")); - } - - public static void main(String[] argv) { - List<String> args = Lists.newArrayList(argv); - String port = CommandLineUtil.getCommandLineOption(args, "--port", "8081+"); - String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION); - - BrooklynLauncher launcher = BrooklynLauncher.newInstance() - .application(EntitySpecs.appSpec(StandaloneBrokerExample.class).displayName("Qpid app")) - .webconsolePort(port) - .location(location) - .start(); - - Entities.dumpInfo(launcher.getApplications()); - } -} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java ---------------------------------------------------------------------- diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java new file mode 100644 index 0000000..19b6c2e --- /dev/null +++ b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java @@ -0,0 +1,54 @@ +package brooklyn.demo; + +import java.util.List; + +import brooklyn.entity.basic.AbstractApplication; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.messaging.amqp.AmqpServer; +import brooklyn.entity.messaging.qpid.QpidBroker; +import brooklyn.entity.proxying.EntitySpecs; +import brooklyn.launcher.BrooklynLauncher; +import brooklyn.util.CommandLineUtil; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +/** Qpid Broker Application */ +public class StandaloneQpidBrokerExample extends AbstractApplication { + + public static final String CUSTOM_CONFIG_PATH = "classpath://custom-config.xml"; + public static final String PASSWD_PATH = "classpath://passwd"; + public static final String QPID_BDBSTORE_JAR_PATH = "classpath://qpid-bdbstore-0.14.jar"; + public static final String BDBSTORE_JAR_PATH = "classpath://je-5.0.34.jar"; + + public static final String DEFAULT_LOCATION = "localhost"; + + @Override + public void init() { + // Configure the Qpid broker entity + QpidBroker broker = addChild(EntitySpecs.spec(QpidBroker.class) + .configure("amqpPort", 5672) + .configure("amqpVersion", AmqpServer.AMQP_0_10) + .configure("runtimeFiles", ImmutableMap.builder() + .put(QpidBroker.CONFIG_XML, CUSTOM_CONFIG_PATH) + .put(QpidBroker.PASSWD, PASSWD_PATH) + .put("lib/opt/qpid-bdbstore-0.14.jar", QPID_BDBSTORE_JAR_PATH) + .put("lib/opt/je-5.0.34.jar", BDBSTORE_JAR_PATH) + .build()) + .configure("queue", "testQueue")); + } + + public static void main(String[] argv) { + List<String> args = Lists.newArrayList(argv); + String port = CommandLineUtil.getCommandLineOption(args, "--port", "8081+"); + String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION); + + BrooklynLauncher launcher = BrooklynLauncher.newInstance() + .application(EntitySpecs.appSpec(StandaloneQpidBrokerExample.class).displayName("Qpid app")) + .webconsolePort(port) + .location(location) + .start(); + + Entities.dumpInfo(launcher.getApplications()); + } +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/pom.xml ---------------------------------------------------------------------- diff --git a/software/messaging/pom.xml b/software/messaging/pom.xml index 4b0f372..7a81336 100644 --- a/software/messaging/pom.xml +++ b/software/messaging/pom.xml @@ -14,37 +14,21 @@ <relativePath>../../pom.xml</relativePath> </parent> - <dependencies> - <dependency> - <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-jms_1.1_spec</artifactId> - <version>1.1.1</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-client</artifactId> - <version>0.20</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-core</artifactId> - <version>5.7.0</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.rabbitmq</groupId> - <artifactId>amqp-client</artifactId> - <version>2.8.7</version> - </dependency> + <repositories> + <repository> + <id>clojars.org</id> + <url>http://clojars.org/repo</url> + </repository> + </repositories> + <dependencies> <dependency> <groupId>io.brooklyn</groupId> <artifactId>brooklyn-software-base</artifactId> <version>${project.version}</version> </dependency> - + + <!-- test dependencies --> <dependency> <groupId>${project.groupId}</groupId> <artifactId>brooklyn-test-support</artifactId> @@ -64,13 +48,50 @@ <classifier>tests</classifier> <scope>test</scope> </dependency> - <!-- bring in jclouds for testing --> <dependency> <groupId>io.brooklyn</groupId> <artifactId>brooklyn-locations-jclouds</artifactId> <version>${brooklyn.version}</version> <scope>test</scope> </dependency> + + <!-- for qpid --> + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + <version>1.1.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-client</artifactId> + <version>0.20</version> + <scope>test</scope> + </dependency> + + <!-- for activemq --> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-core</artifactId> + <version>5.7.0</version> + <scope>test</scope> + </dependency> + + <!-- for rabbit --> + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>2.8.7</version> + <scope>test</scope> + </dependency> + + <!-- for kafka --> + <dependency> + <groupId>storm</groupId> + <artifactId>kafka</artifactId> + <version>0.7.0-incubating</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java index 13b8d0d..01fa424 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java @@ -45,7 +45,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka @SetFromFlag("zookeeper") BasicConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "Kafka zookeeper entity"); - AttributeSensor<Long> BROKER_ID = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.id", "Kafka unique broker ID"); + AttributeSensor<Integer> BROKER_ID = new BasicAttributeSensor<Integer>(Integer.class, "kafka.broker.id", "Kafka unique broker ID"); BasicAttributeSensor<Long> FETCH_REQUEST_COUNT = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.total", "Fetch request count"); BasicAttributeSensor<Long> TOTAL_FETCH_TIME = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.time.total", "Total fetch request processing time (millis)"); @@ -60,7 +60,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka Integer getKafkaPort(); - Long getBrokerId(); + Integer getBrokerId(); KafkaZookeeper getZookeeper(); http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java index d76072e..ae21118 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java @@ -29,6 +29,7 @@ import brooklyn.entity.Entity; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.SoftwareProcessImpl; import brooklyn.entity.messaging.MessageBroker; +import brooklyn.event.basic.BasicAttributeSensor; import brooklyn.event.feed.function.FunctionFeed; import brooklyn.event.feed.function.FunctionPollConfig; import brooklyn.event.feed.jmx.JmxAttributePollConfig; @@ -45,8 +46,6 @@ import com.google.common.collect.Sets; public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker { private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class); - private static final AtomicLong brokers = new AtomicLong(0l); - public KafkaBrokerImpl() { super(); } @@ -62,14 +61,14 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke @Override public void postConstruct() { - setAttribute(BROKER_ID, brokers.incrementAndGet()); + setAttribute(BROKER_ID, hashCode()); } @Override public Integer getKafkaPort() { return getAttribute(KAFKA_PORT); } @Override - public Long getBrokerId() { return getAttribute(BROKER_ID); } + public Integer getBrokerId() { return getAttribute(BROKER_ID); } @Override public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java index 8e1b5da..a1001f3 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java @@ -35,7 +35,7 @@ public interface KafkaZookeeper extends SoftwareProcess, UsesJmx, Kafka { @SetFromFlag("zookeeperPort") PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+"); - /** Location of the configuration file template to be copied to the server.*/ + /** Location of the configuration file template to be copied to the server. */ @SetFromFlag("zookeeperConfig") BasicConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>( String.class, "kafka.config.zookeeper", "Zookeeper configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties"); http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy index 0943dcd..cbfb410 100644 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy @@ -46,6 +46,8 @@ import brooklyn.util.internal.TimeExtras /** * Test the operation of the {@link ActiveMQBroker} class. + * + * TODO test that sensors update. */ public class KafkaIntegrationTest { private static final Logger log = LoggerFactory.getLogger(KafkaIntegrationTest.class) @@ -101,20 +103,27 @@ public class KafkaIntegrationTest { /** * Test that we can start a cluster with zookeeper and one broker. + * + * Connects to the zookeeper controller and tests sending and receiving messages on a topic. */ @Test(groups = "Integration") public void testSingleBrokerCluster() { KafkaCluster cluster = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaCluster.class).configure(KafkaCluster.INITIAL_SIZE, 1)); cluster.start([ testLocation ]) - executeUntilSucceedsWithShutdown(cluster, timeout:600*TimeUnit.SECONDS) { + executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) { assertTrue cluster.getAttribute(Startable.SERVICE_UP) - Entities.dumpInfo(cluster) } - assertFalse cluster.getAttribute(Startable.SERVICE_UP) + + Entities.dumpInfo(cluster); + + Thread.sleep(5000l); + + KafkaSupport support = new KafkaSupport(cluster.getZookeeper()); + support.sendMessage("brooklyn", "TEST_MESSAGE") + List<String> messages = support.getMessage("brooklyn"); + assertEquals(messages.size(), 1); + assertEquals(messages.get(0), "TEST_MESSAGE"); } - // TODO test with API sending messages - // TODO test that sensors update - // TODO add demo application } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java new file mode 100644 index 0000000..d026f06 --- /dev/null +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java @@ -0,0 +1,72 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import static org.testng.Assert.*; + +import java.util.List; +import java.util.Properties; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaMessageStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.javaapi.producer.Producer; +import kafka.javaapi.producer.ProducerData; +import kafka.message.Message; +import kafka.producer.ProducerConfig; +import brooklyn.entity.basic.Attributes; + +import com.beust.jcommander.internal.Lists; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +public class KafkaSupport { + + private final KafkaZookeeper zookeeper; + + public KafkaSupport(KafkaZookeeper zookeeper) { + this.zookeeper = zookeeper; + } + + public void sendMessage(String topic, String message) { + Properties props = new Properties(); + props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort())); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + ProducerConfig config = new ProducerConfig(props); + Producer<String, String> producer = new Producer<String, String>(config); + ProducerData<String, String> data = new ProducerData<String, String>(topic, message); + producer.send(data); + producer.close(); + } + + public List<String> getMessage(String topic) { + Properties props = new Properties(); + props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort())); + props.put("zk.connectiontimeout.ms", "1000000"); + props.put("groupid", "test_group"); + ConsumerConfig consumerConfig = new ConsumerConfig(props); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); + List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic); + List<String> messages = Lists.newArrayList(); + for (Message msg : Iterables.getOnlyElement(streams)) { + assertTrue(msg.isValid()); + String payload = new String(msg.payload().array()); + messages.add(payload); + } + return messages; + } +}
