Added EC2 live test for KafkaCluster
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/317363ab Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/317363ab Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/317363ab Branch: refs/heads/0.5.0 Commit: 317363ab745e7f660d27d29abf4808650483fcd3 Parents: 64486e4 Author: Andrew Kennedy <[email protected]> Authored: Wed Apr 3 12:37:09 2013 +0100 Committer: Andrew Kennedy <[email protected]> Committed: Fri Apr 19 10:36:07 2013 +0100 ---------------------------------------------------------------------- .../entity/messaging/kafka/KafkaLiveTest.java | 37 ++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/317363ab/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java index 6229b4e..8f61400 100644 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java @@ -15,17 +15,50 @@ */ package brooklyn.entity.messaging.kafka; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.concurrent.Callable; + import brooklyn.entity.AbstractEc2LiveTest; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.proxying.EntitySpecs; +import brooklyn.entity.trait.Startable; import brooklyn.location.Location; +import brooklyn.test.Asserts; +import brooklyn.util.MutableMap; + +import com.google.common.collect.ImmutableList; public class KafkaLiveTest extends AbstractEc2LiveTest { /** - * Test Kafka cluster operation. + * Test that can install, start and use a Kafka cluster with two brokers. */ @Override protected void doTest(Location loc) throws Exception { - throw new UnsupportedOperationException(); + final KafkaCluster cluster = app.createAndManageChild(EntitySpecs.spec(KafkaCluster.class) + .configure("startTimeout", 300) // 5 minutes + .configure("initialSize", 2)); + app.start(ImmutableList.of(loc)); + + Asserts.succeedsEventually(MutableMap.of("timeout", 300000l), new Callable<Void>() { + @Override + public Void call() { + assertTrue(cluster.getAttribute(Startable.SERVICE_UP)); + assertTrue(cluster.getZookeeper().getAttribute(Startable.SERVICE_UP)); + assertEquals(cluster.getCurrentSize().intValue(), 2); + return null; + } + }); + + Entities.dumpInfo(cluster); + + KafkaSupport support = new KafkaSupport(cluster); + + support.sendMessage("brooklyn", "TEST_MESSAGE"); + String message = support.getMessage("brooklyn"); + assertEquals(message, "TEST_MESSAGE"); } }
