http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java deleted file mode 100644 index e6b6db5..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.activemq; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.apache.brooklyn.test.entity.TestApplication; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.basic.ApplicationBuilder; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.java.UsesJmx; -import brooklyn.entity.java.UsesJmx.JmxAgentModes; -import brooklyn.entity.trait.Startable; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -/** - * Test the operation of the {@link ActiveMQBroker} class. - */ -public class ActiveMQIntegrationTest { - private static final Logger log = LoggerFactory.getLogger(ActiveMQIntegrationTest.class); - - private TestApplication app; - private Location testLocation; - private ActiveMQBroker activeMQ; - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - app = ApplicationBuilder.newManagedApp(TestApplication.class); - testLocation = app.newLocalhostProvisioningLocation(); - } - - @AfterMethod(alwaysRun = true) - public void shutdown() throws Exception { - if (app != null) Entities.destroyAll(app.getManagementContext()); - } - - /** - * Test that the broker starts up and sets SERVICE_UP correctly. - */ - @Test(groups = "Integration") - public void canStartupAndShutdown() throws Exception { - activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)); - - activeMQ.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true); - log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL)); - activeMQ.stop(); - assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP)); - } - - /** - * Test that the broker starts up and sets SERVICE_UP correctly, - * when a jmx port is supplied - */ - @Test(groups = "Integration") - public void canStartupAndShutdownWithCustomJmx() throws Exception { - activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class) - .configure("jmxPort", "11099+")); - - activeMQ.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true); - log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL)); - activeMQ.stop(); - assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP)); - } - - @Test(groups = "Integration") - public void canStartupAndShutdownWithCustomBrokerName() throws Exception { - activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class) - .configure("jmxPort", "11099+") - .configure("brokerName", "bridge")); - - activeMQ.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true); - log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL)); - activeMQ.stop(); - assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP)); - } - - - @Test(groups = "Integration") - public void canStartTwo() throws Exception { - ActiveMQBroker activeMQ1 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)); - ActiveMQBroker activeMQ2 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)); - - activeMQ1.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ1, Startable.SERVICE_UP, true); - log.info("JMX URL is "+activeMQ1.getAttribute(UsesJmx.JMX_URL)); - - activeMQ2.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ2, Startable.SERVICE_UP, true); - log.info("JMX URL is "+activeMQ2.getAttribute(UsesJmx.JMX_URL)); - } - - /** - * Test that setting the 'queue' property causes a named queue to be created. - */ - @Test(groups = "Integration") - public void testCreatingQueuesDefault() throws Exception { - String url = testCreatingQueuesInternal(null); - // localhost default is jmxmp - Assert.assertTrue(url.contains("jmxmp"), "url="+url); - } - - @Test(groups = "Integration") - public void testCreatingQueuesRmi() throws Exception { - String url = testCreatingQueuesInternal(JmxAgentModes.JMX_RMI_CUSTOM_AGENT); - Assert.assertTrue(url.contains("rmi://"), "url="+url); - Assert.assertFalse(url.contains("rmi:///jndi"), "url="+url); - Assert.assertFalse(url.contains("jmxmp"), "url="+url); - } - - @Test(groups = "Integration") - public void testCreatingQueuesJmxmp() throws Exception { - String url = testCreatingQueuesInternal(JmxAgentModes.JMXMP); - // localhost default is rmi - Assert.assertTrue(url.contains("jmxmp"), "url="+url); - Assert.assertFalse(url.contains("rmi"), "url="+url); - } - - @Test(groups = "Integration") - public void testCreatingQueuesNoAgent() throws Exception { - String url = testCreatingQueuesInternal(JmxAgentModes.NONE); - // localhost default is rmi - Assert.assertTrue(url.contains("service:jmx:rmi"), "url="+url); - Assert.assertFalse(url.contains("jmxmp"), "url="+url); - } - - public String testCreatingQueuesInternal(JmxAgentModes mode) throws Exception { - String queueName = "testQueue"; - int number = 20; - String content = "01234567890123456789012345678901"; - - // Start broker with a configured queue - // FIXME Not yet using app.createAndManageChild because later in test do activeMQ.queueNames, - // which is not on interface - activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class) - .configure("queue", queueName) - .configure(UsesJmx.JMX_AGENT_MODE, mode)); - - activeMQ.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true); - - String jmxUrl = activeMQ.getAttribute(UsesJmx.JMX_URL); - log.info("JMX URL ("+mode+") is "+jmxUrl); - - try { - // Check queue created - assertFalse(activeMQ.getQueueNames().isEmpty()); - assertEquals(activeMQ.getQueueNames().size(), 1); - assertTrue(activeMQ.getQueueNames().contains(queueName)); - assertEquals(activeMQ.getChildren().size(), 1); - assertFalse(activeMQ.getQueues().isEmpty()); - assertEquals(activeMQ.getQueues().size(), 1); - - // Get the named queue entity - ActiveMQQueue queue = activeMQ.getQueues().get(queueName); - assertNotNull(queue); - assertEquals(queue.getName(), queueName); - - // Connect to broker using JMS and send messages - Connection connection = getActiveMQConnection(activeMQ); - clearQueue(connection, queueName); - EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0); - sendMessages(connection, number, queueName, content); - // Check messages arrived - EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number); - - // Clear the messages - assertEquals(clearQueue(connection, queueName), number); - - // Check messages cleared - EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0); - - connection.close(); - - // Close the JMS connection - } finally { - // Stop broker - activeMQ.stop(); - } - - return jmxUrl; - } - - private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception { - int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT); - String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://"+address+":"+port); - Connection connection = factory.createConnection("admin", "activemq"); - connection.start(); - return connection; - } - - private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(queueName); - MessageProducer messageProducer = session.createProducer(destination); - - for (int i = 0; i < count; i++) { - TextMessage message = session.createTextMessage(content); - messageProducer.send(message); - } - - session.close(); - } - - private int clearQueue(Connection connection, String queueName) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(queueName); - MessageConsumer messageConsumer = session.createConsumer(destination); - - int received = 0; - while (messageConsumer.receive(500) != null) received++; - - session.close(); - - return received; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java deleted file mode 100644 index 39d3e59..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.location.LocationSpec; -import org.apache.brooklyn.test.EntityTestUtils; -import org.apache.brooklyn.test.entity.TestApplication; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.basic.ApplicationBuilder; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.messaging.activemq.ActiveMQBroker; -import brooklyn.entity.trait.Startable; - -import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation; - -import brooklyn.test.Asserts; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.time.Duration; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -/** - * Test the operation of the {@link ActiveMQBroker} class. - * - * TODO test that sensors update. - */ -public class KafkaIntegrationTest { - - private TestApplication app; - private Location testLocation; - - @BeforeMethod(alwaysRun = true) - public void setup() { - app = ApplicationBuilder.newManagedApp(TestApplication.class); - LocationSpec<LocalhostMachineProvisioningLocation> locationSpec = LocationSpec.create(LocalhostMachineProvisioningLocation.class); - testLocation = app.getManagementContext().getLocationManager().createLocation(locationSpec); - } - - @AfterMethod(alwaysRun = true) - public void shutdown() { - if (app != null) Entities.destroyAll(app.getManagementContext()); - } - - /** - * Test that we can start a zookeeper. - */ - @Test(groups = "Integration") - public void testZookeeper() { - final KafkaZooKeeper zookeeper = app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class)); - - zookeeper.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), zookeeper, Startable.SERVICE_UP, true); - - zookeeper.stop(); - assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP)); - } - - /** - * Test that we can start a broker and zookeeper together. - */ - @Test(groups = "Integration") - public void testBrokerPlusZookeeper() { - final KafkaZooKeeper zookeeper = app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class)); - final KafkaBroker broker = app.createAndManageChild(EntitySpec.create(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper)); - - zookeeper.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), zookeeper, Startable.SERVICE_UP, true); - - broker.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), broker, Startable.SERVICE_UP, true); - - zookeeper.stop(); - assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP)); - - broker.stop(); - assertFalse(broker.getAttribute(Startable.SERVICE_UP)); - } - - /** - * Test that we can start a cluster with zookeeper and one broker. - * - * Connects to the zookeeper controller and tests sending and receiving messages on a topic. - */ - @Test(groups = "Integration") - public void testTwoBrokerCluster() throws InterruptedException { - final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class) - .configure(KafkaCluster.INITIAL_SIZE, 2)); - - cluster.start(ImmutableList.of(testLocation)); - Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Callable<Void>() { - @Override - public Void call() { - assertTrue(cluster.getAttribute(Startable.SERVICE_UP)); - assertTrue(cluster.getZooKeeper().getAttribute(Startable.SERVICE_UP)); - assertEquals(cluster.getCurrentSize().intValue(), 2); - return null; - } - }); - - Entities.dumpInfo(cluster); - - final KafkaSupport support = new KafkaSupport(cluster); - - support.sendMessage("brooklyn", "TEST_MESSAGE"); - - Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_SECONDS), new Runnable() { - @Override - public void run() { - String message = support.getMessage("brooklyn"); - assertEquals(message, "TEST_MESSAGE"); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java deleted file mode 100644 index 7d122c6..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; - -import brooklyn.entity.AbstractEc2LiveTest; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.trait.Startable; -import brooklyn.test.Asserts; -import brooklyn.util.collections.MutableMap; - -import com.google.common.collect.ImmutableList; - -public class KafkaLiveTest extends AbstractEc2LiveTest { - - /** - * Test that can install, start and use a Kafka cluster with two brokers. - */ - @Override - protected void doTest(Location loc) throws Exception { - final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class) - .configure("startTimeout", 300) // 5 minutes - .configure("initialSize", 2)); - app.start(ImmutableList.of(loc)); - - Asserts.succeedsEventually(MutableMap.of("timeout", 300000l), new Callable<Void>() { - @Override - public Void call() { - assertTrue(cluster.getAttribute(Startable.SERVICE_UP)); - assertTrue(cluster.getZooKeeper().getAttribute(Startable.SERVICE_UP)); - assertEquals(cluster.getCurrentSize().intValue(), 2); - return null; - } - }); - - Entities.dumpInfo(cluster); - - KafkaSupport support = new KafkaSupport(cluster); - - support.sendMessage("brooklyn", "TEST_MESSAGE"); - String message = support.getMessage("brooklyn"); - assertEquals(message, "TEST_MESSAGE"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java deleted file mode 100644 index 179f76e..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import brooklyn.entity.basic.EntityPredicates; -import brooklyn.entity.zookeeper.ZooKeeperNode; - -import com.google.common.base.Optional; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.security.InvalidParameterException; -import java.util.Properties; - -import static java.lang.String.format; - -/** - * Kafka test framework for integration and live tests, using the Kafka Java API. - */ -public class KafkaSupport { - - private final KafkaCluster cluster; - - public KafkaSupport(KafkaCluster cluster) { - this.cluster = cluster; - } - - /** - * Send a message to the {@link KafkaCluster} on the given topic. - */ - public void sendMessage(String topic, String message) { - Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and( - Predicates.instanceOf(KafkaBroker.class), - EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true))); - if (anyBrokerNodeInCluster.isPresent()) { - KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get(); - - Properties props = new Properties(); - - props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort())); - props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort())); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - Producer<String, String> producer = new KafkaProducer<>(props); - ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic); - - ProducerRecord<String, String> data = new ProducerRecord<>(topic, message); - producer.send(data); - producer.close(); - } else { - throw new InvalidParameterException("No kafka broker node found"); - } - } - - /** - * Retrieve the next message on the given topic from the {@link KafkaCluster}. - */ - public String getMessage(String topic) { - ZooKeeperNode zookeeper = cluster.getZooKeeper(); - Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and( - Predicates.instanceOf(KafkaBroker.class), - EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true))); - if (anyBrokerNodeInCluster.isPresent()) { - KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get(); - - Properties props = new Properties(); - - props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort())); - props.put("zookeeper.connect", format(zookeeper.getHostname(), zookeeper.getZookeeperPort())); - props.put("group.id", "brooklyn"); - props.put("partition.assignment.strategy", "RoundRobin"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - KafkaConsumer consumer = new KafkaConsumer(props); - - consumer.subscribe(topic); - // FIXME unimplemented KafkaConsumer.poll -// Object consumerRecords = consumer.poll(Duration.seconds(3).toMilliseconds()).get(topic); - return "TEST_MESSAGE"; - } else { - throw new InvalidParameterException("No kafka broker node found"); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java deleted file mode 100644 index 06af482..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.testng.annotations.Test; - -import brooklyn.entity.AbstractEc2LiveTest; - -import com.google.common.collect.ImmutableList; - -public class QpidEc2LiveTest extends AbstractEc2LiveTest { - - // TODO Also check can connect (e.g. to send/receive messages) - - @Override - protected void doTest(Location loc) throws Exception { - QpidBroker qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class) - .configure("jmxPort", "9909+") - .configure("rmiRegistryPort", "9910+")); - - qpid.start(ImmutableList.of(loc)); - EntityTestUtils.assertAttributeEqualsEventually(qpid, QpidBroker.SERVICE_UP, true); - } - - @Test(enabled=false) - public void testDummy() {} // Convince testng IDE integration that this really does have test methods -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java deleted file mode 100644 index 7346297..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.apache.brooklyn.test.HttpTestUtils; -import org.apache.brooklyn.test.entity.TestApplication; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.configuration.ClientProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.basic.ApplicationBuilder; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.trait.Startable; -import brooklyn.test.Asserts; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; - -import com.google.common.collect.ImmutableList; - -/** - * Test the operation of the {@link QpidBroker} class. - */ -public class QpidIntegrationTest { - private static final Logger log = LoggerFactory.getLogger(QpidIntegrationTest.class); - - private TestApplication app; - private Location testLocation; - private QpidBroker qpid; - - @BeforeMethod(groups = "Integration") - public void setup() { - String workingDir = System.getProperty("user.dir"); - log.info("Qpid working dir: {}", workingDir); - app = ApplicationBuilder.newManagedApp(TestApplication.class); - testLocation = app.newLocalhostProvisioningLocation(); - } - - @AfterMethod(alwaysRun=true) - public void shutdown() { - if (app != null) Entities.destroyAll(app.getManagementContext()); - } - - /** - * Test that the broker starts up with JMX and RMI ports configured, and sets SERVICE_UP correctly. - */ - @Test(groups = "Integration") - public void canStartupAndShutdown() { - qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class) - .configure("jmxPort", "9909+") - .configure("rmiRegistryPort", "9910+")); - qpid.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true); - qpid.stop(); - assertFalse(qpid.getAttribute(Startable.SERVICE_UP)); - } - - /** - * Test that the broker starts up with HTTP management enabled, and we can connect to the URL. - */ - @Test(groups = "Integration") - public void canStartupAndShutdownWithHttpManagement() { - qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class) - .configure("httpManagementPort", "8888+")); - qpid.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true); - String httpUrl = "http://"+qpid.getAttribute(QpidBroker.HOSTNAME)+":"+qpid.getAttribute(QpidBroker.HTTP_MANAGEMENT_PORT)+"/management"; - HttpTestUtils.assertHttpStatusCodeEventuallyEquals(httpUrl, 200); - // TODO check actual REST output - qpid.stop(); - assertFalse(qpid.getAttribute(Startable.SERVICE_UP)); - } - - /** - * Test that the broker starts up and sets SERVICE_UP correctly when plugins are configured. - * - * FIXME the custom plugin was written against qpid 0.14, so that's the version we need to run - * this test against. However, v0.14 is no longer available from the download site. - * We should update this plugin so it works with the latest qpid. - */ - @Test(enabled = false, groups = "Integration") - public void canStartupAndShutdownWithPlugin() { - Map<String,String> qpidRuntimeFiles = MutableMap.<String,String>builder() - .put("classpath://qpid-test-config.xml", "etc/config.xml") - .put("http://developers.cloudsoftcorp.com/brooklyn/repository-test/0.7.0/QpidBroker/qpid-test-plugin.jar", "lib/plugins/sample-plugin.jar") - .build(); - qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class) - .configure(SoftwareProcess.RUNTIME_FILES, qpidRuntimeFiles) - .configure(QpidBroker.SUGGESTED_VERSION, "0.14")); - qpid.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true); - qpid.stop(); - assertFalse(qpid.getAttribute(Startable.SERVICE_UP)); - } - - /** - * Test that setting the 'queue' property causes a named queue to be created. - * - * This test is disabled, pending further investigation. Issue with AMQP 0-10 queue names. - * - * FIXME disabled becausing failing in jenkins CI (in QpidIntegrationTest.getQpidConnection()). - * url=amqp://admin:********@brooklyn/localhost?brokerlist='tcp://localhost:5672' - * Was previously enabled, dispite comment above about "test is disabled". - */ - @Test(enabled = false, groups = { "Integration", "WIP" }) - public void testCreatingQueues() { - final String queueName = "testQueue"; - final int number = 20; - final String content = "01234567890123456789012345678901"; - - // Start broker with a configured queue - // FIXME Can't use app.createAndManageChild, because of QpidDestination reffing impl directly - qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class) - .configure("queue", queueName)); - qpid.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true); - - try { - // Check queue created - assertFalse(qpid.getQueueNames().isEmpty()); - assertEquals(qpid.getQueueNames().size(), 1); - assertTrue(qpid.getQueueNames().contains(queueName)); - assertEquals(qpid.getChildren().size(), 1); - assertFalse(qpid.getQueues().isEmpty()); - assertEquals(qpid.getQueues().size(), 1); - - // Get the named queue entity - final QpidQueue queue = qpid.getQueues().get(queueName); - assertNotNull(queue); - - // Connect to broker using JMS and send messages - Connection connection = getQpidConnection(qpid); - clearQueue(connection, queue.getQueueName()); - Asserts.succeedsEventually(new Runnable() { - @Override - public void run() { - assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(0)); - } - }); - sendMessages(connection, number, queue.getQueueName(), content); - - // Check messages arrived - Asserts.succeedsEventually(new Runnable() { - @Override - public void run() { - assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(number)); - assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), Integer.valueOf(number * content.length())); - } - }); - - //TODO clearing the queue currently returns 0 -// // Clear the messages -- should get 20 -// assertEquals clearQueue(connection, queue.queueName), 20 -// -// // Check messages cleared -// executeUntilSucceeds { -// assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), 0 -// assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), 0 -// } - - // Close the JMS connection - connection.close(); - } catch (JMSException jmse) { - log.warn("JMS exception caught", jmse); - throw Exceptions.propagate(jmse); - } finally { - // Stop broker - qpid.stop(); - qpid = null; - app = null; - } - } - - private Connection getQpidConnection(QpidBroker qpid) { - int port = qpid.getAttribute(Attributes.AMQP_PORT); - System.setProperty(ClientProperties.AMQP_VERSION, "0-10"); - System.setProperty(ClientProperties.DEST_SYNTAX, "ADDR"); - String connectionUrl = String.format("amqp://admin:admin@brooklyn/localhost?brokerlist='tcp://localhost:%d'", port); - try { - AMQConnectionFactory factory = new AMQConnectionFactory(connectionUrl); - Connection connection = factory.createConnection(); - connection.start(); - return connection; - } catch (Exception e) { - log.error(String.format("Error connecting to qpid: %s", connectionUrl), e); - throw Exceptions.propagate(e); - } - } - - private void sendMessages(Connection connection, int count, String queueName, String content) throws JMSException { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(queueName); - MessageProducer messageProducer = session.createProducer(destination); - - for (int i = 0; i < count; i++) { - TextMessage message = session.createTextMessage(content); - messageProducer.send(message); - } - - session.close(); - } - - private int clearQueue(Connection connection, String queueName) throws JMSException { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(queueName); - MessageConsumer messageConsumer = session.createConsumer(destination); - - int received = 0; - while (messageConsumer.receive(500) != null) received++; - - session.close(); - - return received; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java deleted file mode 100644 index 8eaa991..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.rabbit; - -import static org.testng.Assert.assertEquals; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.SkipException; -import org.testng.annotations.Test; - -import brooklyn.entity.AbstractEc2LiveTest; -import brooklyn.entity.messaging.MessageBroker; -import brooklyn.entity.messaging.amqp.AmqpExchange; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; - -public class RabbitEc2LiveTest extends AbstractEc2LiveTest { - - private static final Logger LOG = LoggerFactory.getLogger(RabbitEc2LiveTest.class); - - @Override - protected void doTest(Location loc) throws Exception { - RabbitBroker rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class)); - rabbit.start(ImmutableList.of(loc)); - EntityTestUtils.assertAttributeEqualsEventually(rabbit, RabbitBroker.SERVICE_UP, true); - - byte[] content = "MessageBody".getBytes(Charsets.UTF_8); - String queue = "queueName"; - Channel producer = null; - Channel consumer = null; - try { - producer = getAmqpChannel(rabbit); - consumer = getAmqpChannel(rabbit); - - producer.queueDeclare(queue, true, false, false, Maps.<String,Object>newHashMap()); - producer.queueBind(queue, AmqpExchange.DIRECT, queue); - producer.basicPublish(AmqpExchange.DIRECT, queue, null, content); - - QueueingConsumer queueConsumer = new QueueingConsumer(consumer); - consumer.basicConsume(queue, true, queueConsumer); - - QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery(); - assertEquals(delivery.getBody(), content); - } finally { - if (producer != null) producer.close(); - if (consumer != null) consumer.close(); - } - } - - private Channel getAmqpChannel(RabbitBroker rabbit) throws Exception { - String uri = rabbit.getAttribute(MessageBroker.BROKER_URL); - LOG.warn("connecting to rabbit {}", uri); - ConnectionFactory factory = new ConnectionFactory(); - factory.setUri(uri); - Connection conn = factory.newConnection(); - Channel channel = conn.createChannel(); - return channel; - } - - @Override - public void test_CentOS_5() throws SkipException { - // Not supported. The EPEL repository described here at [1] does not contain erlang, and the - // Erlang repository at [1] requires old versions of rpmlib. Additionally, [2] suggests that - // Centos 5 is not supported - // [1]:http://www.rabbitmq.com/install-rpm.html - // [2]: https://www.erlang-solutions.com/downloads/download-erlang-otp - throw new SkipException("Centos 5 is not supported"); - } - - @Test(enabled=false) - public void testDummy() {} // Convince testng IDE integration that this really does have test methods -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java deleted file mode 100644 index c5f6f22..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.rabbit; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; - -import java.io.IOException; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.apache.brooklyn.test.entity.TestApplication; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.basic.ApplicationBuilder; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.messaging.MessageBroker; -import brooklyn.entity.messaging.amqp.AmqpExchange; -import brooklyn.entity.trait.Startable; - -import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; - -/** - * Test the operation of the {@link RabbitBroker} class. - * - * TODO If you're having problems running this test successfully, here are a few tips: - * - * - Is `erl` on your path for a non-interactive ssh session? - * Look in rabbit's $RUN_DIR/console-err.log (e.g. /tmp/brooklyn-aled/apps/someappid/entities/RabbitBroker_2.8.7_JROYTcSL/console-err.log) - * I worked around that by adding to my ~/.brooklyn/brooklyn.properties: - * brooklyn.ssh.config.scriptHeader=#!/bin/bash -e\nif [ -f ~/.bashrc ] ; then . ~/.bashrc ; fi\nif [ -f ~/.profile ] ; then . ~/.profile ; fi\necho $PATH > /tmp/mypath.txt - * - * - Is the hostname resolving properly? - * Look in $RUN_DIR/console-out.log; is there a message like: - * ERROR: epmd error for host "Aleds-MacBook-Pro": timeout (timed out establishing tcp connection) - * I got around that with disabling my wifi and running when not connected to the internet. - */ -public class RabbitIntegrationTest { - private static final Logger log = LoggerFactory.getLogger(RabbitIntegrationTest.class); - - private TestApplication app; - private Location testLocation; - private RabbitBroker rabbit; - - @BeforeMethod(groups = "Integration") - public void setup() { - app = ApplicationBuilder.newManagedApp(TestApplication.class); - testLocation = new LocalhostMachineProvisioningLocation(); - } - - @AfterMethod(alwaysRun = true) - public void shutdown() { - if (app != null) Entities.destroyAll(app.getManagementContext()); - } - - /** - * Test that the broker starts up and sets SERVICE_UP correctly. - */ - @Test(groups = {"Integration", "WIP"}) - public void canStartupAndShutdown() throws Exception { - rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class)); - rabbit.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(rabbit, Startable.SERVICE_UP, true); - rabbit.stop(); - assertFalse(rabbit.getAttribute(Startable.SERVICE_UP)); - } - - /** - * Test that an AMQP client can connect to and use the broker. - */ - @Test(groups = {"Integration", "WIP"}) - public void testClientConnection() throws Exception { - rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class)); - rabbit.start(ImmutableList.of(testLocation)); - EntityTestUtils.assertAttributeEqualsEventually(rabbit, Startable.SERVICE_UP, true); - - byte[] content = "MessageBody".getBytes(Charsets.UTF_8); - String queue = "queueName"; - Channel producer = null; - Channel consumer = null; - try { - producer = getAmqpChannel(rabbit); - consumer = getAmqpChannel(rabbit); - - producer.queueDeclare(queue, true, false, false, ImmutableMap.<String,Object>of()); - producer.queueBind(queue, AmqpExchange.DIRECT, queue); - producer.basicPublish(AmqpExchange.DIRECT, queue, null, content); - - QueueingConsumer queueConsumer = new QueueingConsumer(consumer); - consumer.basicConsume(queue, true, queueConsumer); - - QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery(60 * 1000l); // one minute timeout - assertEquals(delivery.getBody(), content); - } finally { - closeSafely(producer, 10*1000); - closeSafely(consumer, 10*1000); - } - } - - /** - * Closes the channel, guaranteeing the call won't hang this thread forever! - * - * Saw this during jenkins testing: - * "main" prio=10 tid=0x00007f69c8008000 nid=0x5d70 in Object.wait() [0x00007f69d1318000] - * java.lang.Thread.State: WAITING (on object monitor) - * at java.lang.Object.wait(Native Method) - * - waiting on <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException) - * at java.lang.Object.wait(Object.java:502) - * at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:50) - * - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException) - * at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:65) - * - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException) - * at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111) - * - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException) - * at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37) - * at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:349) - * at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:543) - * at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:480) - * at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:473) - * at com.rabbitmq.client.Channel$close.call(Unknown Source) - * at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:42) - * at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108) - * at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:112) - * at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callSafe(AbstractCallSite.java:75) - * at brooklyn.entity.messaging.rabbit.RabbitIntegrationTest.testClientConnection(RabbitIntegrationTest.groovy:107) - */ - private void closeSafely(final Channel channel, int timeoutMs) throws InterruptedException { - if (channel == null) return; - Thread t = new Thread(new Runnable() { - @Override public void run() { - try { - channel.close(); - } catch (IOException e) { - log.error("Error closing RabbitMQ Channel; continuing", e); - } - }}); - try { - t.start(); - t.join(timeoutMs); - - if (t.isAlive()) { - log.error("Timeout when closing RabbitMQ Channel "+channel+"; aborting close and continuing"); - } - } finally { - t.interrupt(); - t.join(1*1000); - if (t.isAlive()) t.stop(); - } - } - - private Channel getAmqpChannel(RabbitBroker rabbit) throws Exception { - String uri = rabbit.getAttribute(MessageBroker.BROKER_URL); - log.warn("connecting to rabbit {}", uri); - ConnectionFactory factory = new ConnectionFactory(); - factory.setUri(uri); - Connection conn = factory.newConnection(); - Channel channel = conn.createChannel(); - return channel; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/storm/LocalhostLiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/LocalhostLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/storm/LocalhostLiveTest.java deleted file mode 100644 index 8254dea..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/LocalhostLiveTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import org.testng.annotations.Test; - -@Test(groups="Live") -public class LocalhostLiveTest extends StormAbstractCloudLiveTest { - - private static final String NAMED_LOCATION = "localhost"; - - public String getLocation() { - return NAMED_LOCATION; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java deleted file mode 100644 index a6c1a3c..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import org.testng.annotations.Test; - -@Test(groups="Live") -public class SoftLayerLiveTest extends StormAbstractCloudLiveTest { - - private static final String NAMED_LOCATION = "softlayer"; - - @Override - public String getLocation() { - return NAMED_LOCATION; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java deleted file mode 100644 index d79b2da..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import static brooklyn.entity.messaging.storm.Storm.NIMBUS_HOSTNAME; -import static brooklyn.entity.messaging.storm.Storm.ZOOKEEPER_ENSEMBLE; -import static brooklyn.entity.messaging.storm.Storm.Role.NIMBUS; -import static brooklyn.entity.messaging.storm.Storm.Role.SUPERVISOR; -import static brooklyn.entity.messaging.storm.Storm.Role.UI; -import static brooklyn.event.basic.DependentConfiguration.attributeWhenReady; - -import java.io.File; -import java.util.Map; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.core.management.internal.LocalManagementContext; -import org.apache.brooklyn.core.util.ResourceUtils; -import org.apache.brooklyn.core.util.file.ArchiveBuilder; -import org.apache.brooklyn.test.EntityTestUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import backtype.storm.Config; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.generated.StormTopology; -import backtype.storm.testing.TestWordSpout; -import backtype.storm.topology.TopologyBuilder; -import brooklyn.entity.BrooklynAppLiveTestSupport; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.messaging.storm.topologies.ExclamationBolt; -import brooklyn.entity.trait.Startable; -import brooklyn.entity.zookeeper.ZooKeeperEnsemble; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.os.Os; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; - -public abstract class StormAbstractCloudLiveTest extends BrooklynAppLiveTestSupport { - - protected static final Logger log = LoggerFactory - .getLogger(StormAbstractCloudLiveTest.class); - private Location location; - private ZooKeeperEnsemble zooKeeperEnsemble; - private Storm nimbus; - private Storm supervisor; - private Storm ui; - - @BeforeClass(alwaysRun = true) - public void beforeClass() throws Exception { - mgmt = new LocalManagementContext(); - location = mgmt.getLocationRegistry() - .resolve(getLocation(), getFlags()); - super.setUp(); - } - - @AfterClass(alwaysRun = true) - public void afterClass() throws Exception { - // Entities.destroyAll(mgmt); - } - - public abstract String getLocation(); - - public Map<String, ?> getFlags() { - return MutableMap.of(); - } - - @Test(groups = {"Live","WIP"}) // needs repair to avoid hard dependency on Andrea's environment - public void deployStorm() throws Exception { - try { - zooKeeperEnsemble = app.createAndManageChild(EntitySpec.create( - ZooKeeperEnsemble.class).configure( - ZooKeeperEnsemble.INITIAL_SIZE, 3)); - nimbus = app.createAndManageChild(EntitySpec - .create(Storm.class) - .configure(Storm.ROLE, NIMBUS) - .configure(NIMBUS_HOSTNAME, "localhost") - .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble) - ); - supervisor = app.createAndManageChild(EntitySpec - .create(Storm.class) - .configure(Storm.ROLE, SUPERVISOR) - .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble) - .configure(NIMBUS_HOSTNAME, - attributeWhenReady(nimbus, Attributes.HOSTNAME))); - ui = app.createAndManageChild(EntitySpec - .create(Storm.class) - .configure(Storm.ROLE, UI) - .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble) - .configure(NIMBUS_HOSTNAME, - attributeWhenReady(nimbus, Attributes.HOSTNAME))); - log.info("Started Storm deployment on '" + getLocation() + "'"); - app.start(ImmutableList.of(location)); - Entities.dumpInfo(app); - EntityTestUtils.assertAttributeEqualsEventually(app, Startable.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(zooKeeperEnsemble, Startable.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(nimbus, Startable.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(supervisor, Startable.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(ui, Startable.SERVICE_UP, true); - - StormTopology stormTopology = createTopology(); - submitTopology(stormTopology, "myExclamation", 3, true, 60000); - } catch (Exception e) { - log.error("Failed to deploy Storm", e); - Assert.fail(); - throw e; - } - } - - private StormTopology createTopology() - throws AlreadyAliveException, InvalidTopologyException { - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("word", new TestWordSpout(), 10); - builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); - builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); - - return builder.createTopology(); - } - - public boolean submitTopology(StormTopology stormTopology, String topologyName, int numOfWorkers, boolean debug, long timeoutMs) { - if (log.isDebugEnabled()) log.debug("Connecting to NimbusClient: {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME)); - Config conf = new Config(); - conf.setDebug(debug); - conf.setNumWorkers(numOfWorkers); - - // TODO - confirm this creats the JAR correctly - String jar = createJar( - new File(Os.mergePaths(ResourceUtils.create(this).getClassLoaderDir(), "brooklyn/entity/messaging/storm/topologies")), - "brooklyn/entity/messaging/storm/"); - System.setProperty("storm.jar", jar); - long startMs = System.currentTimeMillis(); - long endMs = (timeoutMs == -1) ? Long.MAX_VALUE : (startMs + timeoutMs); - long currentTime = startMs; - Throwable lastError = null; - int attempt = 0; - while (currentTime <= endMs) { - currentTime = System.currentTimeMillis(); - if (attempt != 0) Time.sleep(Duration.ONE_SECOND); - if (log.isTraceEnabled()) log.trace("trying connection to {} at time {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME), currentTime); - - try { - StormSubmitter.submitTopology(topologyName, conf, stormTopology); - return true; - } catch (Exception e) { - if (shouldRetryOn(e)) { - if (log.isDebugEnabled()) log.debug("Attempt {} failed connecting to {} ({})", new Object[] {attempt + 1, nimbus.getConfig(Storm.NIMBUS_HOSTNAME), e.getMessage()}); - lastError = e; - } else { - throw Throwables.propagate(e); - } - } - attempt++; - } - log.warn("unable to connect to Nimbus client: ", lastError); - Assert.fail(); - return false; - } - - private boolean shouldRetryOn(Exception e) { - if (e.getMessage().equals("org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused")) return true; - return false; - } - - private String createJar(File dir, String parentDirInJar) { - if (dir.isDirectory()) { - File jarFile = ArchiveBuilder.jar().addAt(dir, parentDirInJar).create(Os.newTempDir(getClass())+"/topologies.jar"); - return jarFile.getAbsolutePath(); - } else { - return dir.getAbsolutePath(); // An existing Jar archive? - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormEc2LiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormEc2LiveTest.java deleted file mode 100644 index d949450..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormEc2LiveTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.testng.annotations.Test; - -import brooklyn.entity.AbstractEc2LiveTest; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.trait.Startable; -import brooklyn.entity.zookeeper.ZooKeeperNode; - -import com.google.common.collect.ImmutableList; - -public class StormEc2LiveTest extends AbstractEc2LiveTest { - - /** - * Test that can install, start and use a Storm cluster: 1 nimbus, 1 zookeeper, 1 supervisor (worker node). - */ - @Override - protected void doTest(Location loc) throws Exception { - ZooKeeperNode zookeeper = app.createAndManageChild(EntitySpec.create(ZooKeeperNode.class)); - Storm nimbus = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role", - Storm.Role.NIMBUS)); - Storm supervisor = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role", - Storm.Role.SUPERVISOR)); - Storm ui = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role", - Storm.Role.UI)); - app.start(ImmutableList.of(loc)); - Entities.dumpInfo(app); - - EntityTestUtils.assertAttributeEqualsEventually(zookeeper, Startable.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(nimbus, Startable.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(supervisor, Startable.SERVICE_UP, true); - EntityTestUtils.assertAttributeEqualsEventually(ui, Startable.SERVICE_UP, true); - } - - @Test(enabled=false) - public void testDummy() {} // Convince testng IDE integration that this really does have test methods -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormGceLiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormGceLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormGceLiveTest.java deleted file mode 100644 index 1504a56..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/StormGceLiveTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import java.util.Map; - -import org.testng.annotations.Test; - -import brooklyn.util.collections.MutableMap; - -@Test(groups="Live") -public class StormGceLiveTest extends StormAbstractCloudLiveTest { - - private static final String NAMED_LOCATION = "gce-europe-west1"; - private static final String LOCATION_ID = "gce-europe-west1-a"; - private static final String URI = "https://www.googleapis.com/compute/v1beta15/projects/google/global/images/centos-6-v20130325"; - private static final String IMAGE_ID = "centos-6-v20130325"; - - @Override - public String getLocation() { - return NAMED_LOCATION; - } - - @Override - public Map<String, ?> getFlags() { - return MutableMap.of( - "locationId", LOCATION_ID, - "imageId", IMAGE_ID, - "uri", URI + IMAGE_ID, - "groupId", "storm-test", - "stopIptables", "true" - ); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java b/software/messaging/src/test/java/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java deleted file mode 100644 index d7f1d9e..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm.topologies; - -import java.util.Map; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -public class ExclamationBolt extends BaseRichBolt { - OutputCollector _collector; - - @Override - public void prepare(Map conf, TopologyContext context, - OutputCollector collector) { - _collector = collector; - } - - @Override - public void execute(Tuple tuple) { - _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); - _collector.ack(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java deleted file mode 100644 index a6c7d2b..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.zookeeper; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.testng.annotations.Test; - -import brooklyn.entity.AbstractEc2LiveTest; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.trait.Startable; -import brooklyn.entity.zookeeper.ZooKeeperNode; - -import com.google.common.collect.ImmutableList; - -public class ZooKeeperEc2LiveTest extends AbstractEc2LiveTest { - - /** - * Test that can install, start and use a Zookeeper instance. - */ - @Override - protected void doTest(Location loc) throws Exception { - ZooKeeperNode zookeeper = app.createAndManageChild(EntitySpec.create(ZooKeeperNode.class).configure("jmxPort", "31001+")); - app.start(ImmutableList.of(loc)); - Entities.dumpInfo(zookeeper); - EntityTestUtils.assertAttributeEqualsEventually(zookeeper, Startable.SERVICE_UP, true); - } - - @Test(enabled=false) - public void testDummy() {} // Convince testng IDE integration that this really does have test methods -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java deleted file mode 100644 index cb21c18..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.zookeeper; - -import brooklyn.entity.basic.ApplicationBuilder; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.trait.Startable; -import brooklyn.entity.zookeeper.ZooKeeperEnsemble; -import brooklyn.entity.zookeeper.ZooKeeperNode; - -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Uninterruptibles; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.apache.brooklyn.test.entity.TestApplication; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.net.Socket; -import java.util.concurrent.TimeUnit; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -/** - * A live test of the {@link brooklyn.entity.zookeeper.ZooKeeperEnsemble} entity. - * - * Tests that a 3 node cluster can be started on Amazon EC2 and data written on one {@link brooklyn.entity.zookeeper.ZooKeeperEnsemble} - * can be read from another, using the Astyanax API. - */ -public class ZooKeeperEnsembleLiveTest { - - private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleLiveTest.class); - - private String provider = - "gce-europe-west1"; -// "aws-ec2:eu-west-1"; -// "named:hpcloud-compute-at"; -// "localhost"; - - protected TestApplication app; - protected Location testLocation; - protected ZooKeeperEnsemble cluster; - - @BeforeMethod(alwaysRun = true) - public void setup() { - app = ApplicationBuilder.newManagedApp(TestApplication.class); - testLocation = app.getManagementContext().getLocationRegistry().resolve(provider); - } - - @AfterMethod(alwaysRun = true) - public void shutdown() { - Entities.destroyAll(app.getManagementContext()); - } - - /** - * Test that a two node cluster starts up and allows access through both nodes. - */ - @Test(groups = "Live") - public void testStartUpConnectAndResize() throws Exception { - try { - cluster = app.createAndManageChild(EntitySpec.create(ZooKeeperEnsemble.class) - .configure("initialSize", 3) - .configure("clusterName", "ZooKeeperEnsembleLiveTest")); - assertEquals(cluster.getCurrentSize().intValue(), 0); - - app.start(ImmutableList.of(testLocation)); - - EntityTestUtils.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE, 3); - Entities.dumpInfo(app); - - EntityTestUtils.assertAttributeEqualsEventually(cluster, Startable.SERVICE_UP, true); - for(Entity zkNode : cluster.getMembers()) { - assertTrue(isSocketOpen((ZooKeeperNode) zkNode)); - } - cluster.resize(1); - EntityTestUtils.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE, 1); - Entities.dumpInfo(app); - EntityTestUtils.assertAttributeEqualsEventually(cluster, Startable.SERVICE_UP, true); - for (Entity zkNode : cluster.getMembers()) { - assertTrue(isSocketOpen((ZooKeeperNode) zkNode)); - } - } catch (Throwable e) { - throw Throwables.propagate(e); - } - } - - protected static boolean isSocketOpen(ZooKeeperNode node) { - int attempt = 0, maxAttempts = 20; - while(attempt < maxAttempts) { - try { - Socket s = new Socket(node.getAttribute(Attributes.HOSTNAME), node.getZookeeperPort()); - s.close(); - return true; - } catch (Exception e) { - attempt++; - } - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - } - return false; - } - -}
