http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBroker.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBroker.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBroker.java
new file mode 100644
index 0000000..3e4a92a
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBroker.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.UsesJmx;
+import org.apache.brooklyn.entity.messaging.MessageBroker;
+import org.apache.brooklyn.entity.messaging.amqp.AmqpServer;
+import org.apache.brooklyn.entity.messaging.jms.JMSBroker;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single 
Qpid broker instance, using AMQP 0-10.
+ */
+@Catalog(name="Qpid Broker", description="Apache Qpid is an open-source 
messaging system, implementing the Advanced Message Queuing Protocol (AMQP)", 
iconUrl="classpath:///qpid-logo.jpeg")
+@ImplementedBy(QpidBrokerImpl.class)
+public interface QpidBroker extends SoftwareProcess, MessageBroker, UsesJmx, 
AmqpServer, JMSBroker<QpidQueue, QpidTopic> {
+
+    /* Qpid runtime file locations for convenience. */
+
+    public static final String CONFIG_XML = "etc/config.xml";
+    public static final String VIRTUALHOSTS_XML = "etc/virtualhosts.xml";
+    public static final String PASSWD = "etc/passwd";
+
+    @SetFromFlag("version")
+    public static final ConfigKey<String> SUGGESTED_VERSION = 
ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.20");
+    
+    @SetFromFlag("downloadUrl")
+    public static final BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL 
= new BasicAttributeSensorAndConfigKey<String>(
+            Attributes.DOWNLOAD_URL, 
"http://download.nextag.com/apache/qpid/${version}/qpid-java-broker-${version}.tar.gz";);
+
+    @SetFromFlag("amqpPort")
+    public static final PortAttributeSensorAndConfigKey AMQP_PORT = 
AmqpServer.AMQP_PORT;
+
+    @SetFromFlag("virtualHost")
+    public static final BasicAttributeSensorAndConfigKey<String> 
VIRTUAL_HOST_NAME = AmqpServer.VIRTUAL_HOST_NAME;
+
+    @SetFromFlag("amqpVersion")
+    public static final BasicAttributeSensorAndConfigKey<String> AMQP_VERSION 
= new BasicAttributeSensorAndConfigKey<String>(
+            AmqpServer.AMQP_VERSION, AmqpServer.AMQP_0_10);
+    
+    @SetFromFlag("httpManagementPort")
+    public static final PortAttributeSensorAndConfigKey HTTP_MANAGEMENT_PORT = 
new PortAttributeSensorAndConfigKey("qpid.http-management.port", "Qpid HTTP 
management plugin port");
+
+    @SetFromFlag("jmxUser")
+    public static final BasicAttributeSensorAndConfigKey<String> JMX_USER = 
new BasicAttributeSensorAndConfigKey<String>(
+            UsesJmx.JMX_USER, "admin");
+    
+    @SetFromFlag("jmxPassword")
+    public static final BasicAttributeSensorAndConfigKey<String> JMX_PASSWORD 
= new BasicAttributeSensorAndConfigKey<String>(
+            UsesJmx.JMX_PASSWORD, "admin");
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
new file mode 100644
index 0000000..66ff73d
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.java.JmxSupport;
+import org.apache.brooklyn.entity.messaging.jms.JMSBrokerImpl;
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.event.feed.jmx.JmxHelper;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Objects.ToStringHelper;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single 
Qpid broker instance, using AMQP 0-10.
+ */
+public class QpidBrokerImpl extends JMSBrokerImpl<QpidQueue, QpidTopic> 
implements QpidBroker {
+    private static final Logger log = 
LoggerFactory.getLogger(QpidBrokerImpl.class);
+
+    private volatile JmxFeed jmxFeed;
+
+    public QpidBrokerImpl() {
+        super();
+    }
+
+    public String getVirtualHost() { return getAttribute(VIRTUAL_HOST_NAME); }
+    public String getAmqpVersion() { return getAttribute(AMQP_VERSION); }
+    public Integer getAmqpPort() { return getAttribute(AMQP_PORT); }
+
+    public void setBrokerUrl() {
+        String urlFormat = "amqp://guest:guest@/%s?brokerlist='tcp://%s:%d'";
+        setAttribute(BROKER_URL, format(urlFormat, 
getAttribute(VIRTUAL_HOST_NAME), getAttribute(HOSTNAME), 
getAttribute(AMQP_PORT)));
+    }
+    
+    @Override
+    public void init() {
+        super.init();
+        new JmxSupport(this, null).recommendJmxRmiCustomAgent();
+    }
+
+    public void waitForServiceUp(long duration, TimeUnit units) {
+        super.waitForServiceUp(duration, units);
+
+        // Also wait for the MBean to exist (as used when creating queue/topic)
+        JmxHelper helper = new JmxHelper(this);
+        try {
+            String virtualHost = getConfig(QpidBroker.VIRTUAL_HOST_NAME);
+            ObjectName virtualHostManager = new 
ObjectName(format("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"%s\"",
 virtualHost));
+            helper.connect();
+            helper.assertMBeanExistsEventually(virtualHostManager, 
units.toMillis(duration));
+        } catch (MalformedObjectNameException e) {
+            throw Exceptions.propagate(e);
+        } catch (IOException e) {
+            throw Exceptions.propagate(e);
+        } finally {
+            if (helper != null) helper.terminate();
+        }
+    }
+    
+    public QpidQueue createQueue(Map properties) {
+        QpidQueue result = 
addChild(EntitySpec.create(QpidQueue.class).configure(properties));
+        Entities.manage(result);
+        result.create();
+        return result;
+    }
+
+    public QpidTopic createTopic(Map properties) {
+        QpidTopic result = 
addChild(EntitySpec.create(QpidTopic.class).configure(properties));
+        Entities.manage(result);
+        result.create();
+        return result;
+    }
+
+    @Override
+    public Class getDriverInterface() {
+        return QpidDriver.class;
+    }
+
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+        String serverInfoMBeanName = 
"org.apache.qpid:type=ServerInformation,name=ServerInformation";
+
+        jmxFeed = JmxFeed.builder()
+                .entity(this)
+                .period(500, TimeUnit.MILLISECONDS)
+                .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP)
+                        .objectName(serverInfoMBeanName)
+                        .attributeName("ProductVersion")
+                        .onSuccess(new Function<Object,Boolean>() {
+                                private boolean hasWarnedOfVersionMismatch;
+                                @Override public Boolean apply(Object input) {
+                                    if (input == null) return false;
+                                    if (!hasWarnedOfVersionMismatch && 
!getConfig(QpidBroker.SUGGESTED_VERSION).equals(input)) {
+                                        log.warn("Qpid version mismatch: 
ProductVersion is {}, requested version is {}", input, 
getConfig(QpidBroker.SUGGESTED_VERSION));
+                                        hasWarnedOfVersionMismatch = true;
+                                    }
+                                    return true;
+                                }})
+                        .onException(Functions.constant(false))
+                        .suppressDuplicates(true))
+                .build();
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        if (jmxFeed != null) jmxFeed.stop();
+    }
+
+    @Override
+    protected ToStringHelper toStringHelper() {
+        return super.toStringHelper().add("amqpPort", getAmqpPort());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestination.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestination.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestination.java
new file mode 100644
index 0000000..a73d079
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestination.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
+import org.apache.brooklyn.entity.messaging.jms.JMSDestination;
+
+public interface QpidDestination extends JMSDestination, AmqpExchange {
+    
+    public void create();
+    
+    /**
+     * Return the AMQP name for the queue.
+     */
+    public String getQueueName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java
new file mode 100644
index 0000000..718ce88
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import static java.lang.String.format;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.java.UsesJmx;
+import org.apache.brooklyn.entity.messaging.amqp.AmqpServer;
+import org.apache.brooklyn.entity.messaging.jms.JMSDestinationImpl;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.event.feed.jmx.JmxHelper;
+import brooklyn.util.exceptions.Exceptions;
+
+public abstract class QpidDestinationImpl extends JMSDestinationImpl 
implements QpidDestination {
+    public static final Logger log = 
LoggerFactory.getLogger(QpidDestination.class);
+    
+    @SetFromFlag
+    String virtualHost;
+
+    protected ObjectName virtualHostManager;
+    protected ObjectName exchange;
+    protected transient JmxHelper jmxHelper;
+    protected volatile JmxFeed jmxFeed;
+
+    public QpidDestinationImpl() {
+    }
+
+    @Override
+    public QpidBroker getParent() {
+        return (QpidBroker) super.getParent();
+    }
+    
+    @Override
+    public void onManagementStarting() {
+        super.onManagementStarting();
+        
+        // TODO Would be nice to share the JmxHelper for all destinations, so 
just one connection.
+        // But tricky for if brooklyn were distributed
+        try {
+            if (virtualHost == null) virtualHost = 
getConfig(QpidBroker.VIRTUAL_HOST_NAME);
+            setAttribute(QpidBroker.VIRTUAL_HOST_NAME, virtualHost);
+            virtualHostManager = new 
ObjectName(format("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"%s\"",
 virtualHost));
+            jmxHelper = new JmxHelper((EntityLocal)getParent());
+        } catch (MalformedObjectNameException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    @Override
+    protected void disconnectSensors() {
+        if (jmxFeed != null) jmxFeed.stop();
+    }
+
+    @Override
+    public void create() {
+        jmxHelper.operation(virtualHostManager, "createNewQueue", getName(), 
getParent().getAttribute(UsesJmx.JMX_USER), true);
+        jmxHelper.operation(exchange, "createNewBinding", getName(), 
getName());
+        connectSensors();
+    }
+    
+    @Override
+    public void delete() {
+        jmxHelper.operation(exchange, "removeBinding", getName(), getName());
+        jmxHelper.operation(virtualHostManager, "deleteQueue", getName());
+        disconnectSensors();
+    }
+
+    @Override
+    public String getQueueName() {
+
+        if (AmqpServer.AMQP_0_10.equals(getParent().getAmqpVersion())) {
+            return String.format("'%s'/'%s'; { assert: never }", 
getExchangeName(), getName());
+        } else {
+            return getName();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDriver.java
new file mode 100644
index 0000000..46aeb09
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDriver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+
+public interface QpidDriver extends JavaSoftwareProcessDriver {
+
+    Integer getAmqpPort();
+
+    String getAmqpVersion();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueue.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueue.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueue.java
new file mode 100644
index 0000000..b2710eb
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueue.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+
+import org.apache.brooklyn.entity.messaging.Queue;
+
+@ImplementedBy(QpidQueueImpl.class)
+public interface QpidQueue extends QpidDestination, Queue {
+    @Override
+    public String getExchangeName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueueImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueueImpl.java
new file mode 100644
index 0000000..cec93a9
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueueImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import static java.lang.String.format;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.util.exceptions.Exceptions;
+
+public class QpidQueueImpl extends QpidDestinationImpl implements QpidQueue {
+    public QpidQueueImpl() {
+    }
+
+    @Override
+    public void onManagementStarting() {
+        super.onManagementStarting();
+        setAttribute(QUEUE_NAME, getName());
+        try {
+            exchange = new 
ObjectName(format("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"%s\",name=\"%s\",ExchangeType=direct",
 virtualHost, getExchangeName()));
+        } catch (MalformedObjectNameException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    @Override
+    protected void connectSensors() {
+        String queue = 
format("org.apache.qpid:type=VirtualHost.Queue,VirtualHost=\"%s\",name=\"%s\"", 
virtualHost, getName());
+        
+        jmxFeed = JmxFeed.builder()
+                .entity(this)
+                .helper(jmxHelper)
+                .pollAttribute(new 
JmxAttributePollConfig<Integer>(QUEUE_DEPTH_BYTES)
+                        .objectName(queue)
+                        .attributeName("QueueDepth"))
+                .pollAttribute(new 
JmxAttributePollConfig<Integer>(QUEUE_DEPTH_MESSAGES)
+                        .objectName(queue)
+                        .attributeName("MessageCount"))
+                .build();
+    }
+
+    @Override
+    public String getExchangeName() {
+        return AmqpExchange.DIRECT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidSshDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidSshDriver.java
new file mode 100644
index 0000000..3c6daf5
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidSshDriver.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import static java.lang.String.format;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+
+import com.google.common.collect.ImmutableMap;
+
+public class QpidSshDriver extends JavaSoftwareProcessSshDriver implements 
QpidDriver{
+
+    private static final Logger log = 
LoggerFactory.getLogger(QpidSshDriver.class);
+
+    public QpidSshDriver(QpidBrokerImpl entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    protected String getLogFileLocation() { return Os.mergePaths(getRunDir(), 
"log", "qpid.log"); }
+
+    @Override
+    public Integer getAmqpPort() { return 
entity.getAttribute(QpidBroker.AMQP_PORT); }
+
+    @Override
+    public String getAmqpVersion() { return 
entity.getAttribute(QpidBroker.AMQP_VERSION); }
+
+    public Integer getHttpManagementPort() { return 
entity.getAttribute(QpidBroker.HTTP_MANAGEMENT_PORT); }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), 
resolver.getUnpackedDirectoryName(format("qpid-broker-%s", getVersion()))));
+    }
+
+    @Override
+    public void install() {
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+
+        List<String> commands = new LinkedList<String>();
+        commands.addAll( BashCommands.commandsToDownloadUrlsAs(urls, saveAs));
+        commands.add(BashCommands.INSTALL_TAR);
+        commands.add("tar xzfv "+saveAs);
+
+        newScript(INSTALLING)
+                .body.append(commands)
+                .execute();
+    }
+
+    @Override
+    public void customize() {
+        Networking.checkPortsValid(MutableMap.of("jmxPort", getJmxPort(), 
"amqpPort", getAmqpPort()));
+        newScript(CUSTOMIZING)
+                .body.append(
+                        format("cp -R %s/{bin,etc,lib} .", 
getExpandedInstallDir()),
+                        "mkdir lib/opt"
+                    )
+                .execute();
+    }
+
+    @Override
+    public void launch() {
+        newScript(ImmutableMap.of(USE_PID_FILE, false), LAUNCHING)
+                .body.append("nohup ./bin/qpid-server -b '*' > 
qpid-server-launch.log 2>&1 &")
+                .execute();
+    }
+
+    public String getPidFile() { return "qpid-server.pid"; }
+    
+    @Override
+    public boolean isRunning() {
+        return newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), 
CHECK_RUNNING).execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), 
STOPPING).execute();
+    }
+
+    @Override
+    public void kill() {
+        newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), 
KILLING).execute();
+    }
+
+    @Override
+    public Map<String, Object> getCustomJavaSystemProperties() {
+        return MutableMap.<String, Object>builder()
+                .putAll(super.getCustomJavaSystemProperties())
+                .put("connector.port", getAmqpPort())
+                .put("management.enabled", "true")
+                .put("management.jmxport.registryServer", getRmiRegistryPort())
+                .put("management.jmxport.connectorServer", getJmxPort())
+                .put("management.http.enabled", 
Boolean.toString(getHttpManagementPort() != null))
+                .putIfNotNull("management.http.port", getHttpManagementPort())
+                .build();
+    }
+
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        return MutableMap.<String, String>builder()
+                .putAll(super.getShellEnvironment())
+                .put("QPID_HOME", getRunDir())
+                .put("QPID_WORK", getRunDir())
+                .renameKey("JAVA_OPTS", "QPID_OPTS")
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopic.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopic.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopic.java
new file mode 100644
index 0000000..bee9519
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopic.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+
+import org.apache.brooklyn.entity.messaging.Topic;
+
+@ImplementedBy(QpidTopicImpl.class)
+public interface QpidTopic extends QpidDestination, Topic {
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopicImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopicImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopicImpl.java
new file mode 100644
index 0000000..8de02d9
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopicImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.qpid;
+
+import static java.lang.String.format;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
+import brooklyn.util.exceptions.Exceptions;
+
+public class QpidTopicImpl extends QpidDestinationImpl implements QpidTopic {
+
+    public QpidTopicImpl() {
+    }
+
+    @Override
+    public void onManagementStarting() {
+        super.onManagementStarting();
+        setAttribute(TOPIC_NAME, getName());
+        try {
+            String virtualHost = getParent().getVirtualHost();
+            exchange = new 
ObjectName(format("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"%s\",name=\"%s\",ExchangeType=topic",
 virtualHost, getExchangeName()));
+        } catch (MalformedObjectNameException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    // TODO sensors
+    @Override
+    public void connectSensors() {
+    }
+
+    @Override
+    public String getExchangeName() { return AmqpExchange.TOPIC; }
+
+    @Override
+    public String getTopicName() { return getQueueName(); }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBroker.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBroker.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBroker.java
new file mode 100644
index 0000000..7e0f54e
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBroker.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.rabbit;
+
+import java.util.Map;
+
+import com.google.common.annotations.Beta;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import org.apache.brooklyn.entity.messaging.MessageBroker;
+import org.apache.brooklyn.entity.messaging.amqp.AmqpServer;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single 
Rabbit MQ broker instance, using AMQP 0-9-1.
+ */
+@Catalog(name="RabbitMQ Broker", description="RabbitMQ is an open source 
message broker software (i.e. message-oriented middleware) that implements the 
Advanced Message Queuing Protocol (AMQP) standard", 
iconUrl="classpath:///RabbitMQLogo.png")
+@ImplementedBy(RabbitBrokerImpl.class)
+public interface RabbitBroker extends SoftwareProcess, MessageBroker, 
AmqpServer {
+
+    @SetFromFlag("version")
+    public static final ConfigKey<String> SUGGESTED_VERSION = 
ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.8.7");
+
+    @SetFromFlag("downloadUrl")
+    public static final BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL 
= new BasicAttributeSensorAndConfigKey<String>(
+            SoftwareProcess.DOWNLOAD_URL, 
"http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/rabbitmq-server-generic-unix-${version}.tar.gz";);
+    
+    @SetFromFlag("erlangVersion")
+    public static final BasicConfigKey<String> ERLANG_VERSION = new 
BasicConfigKey<String>(String.class, "erlang.version", "Erlang runtime 
version", "R15B");
+
+    @SetFromFlag("rabbitmqConfigTemplateUrl")
+    ConfigKey<String> CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
+            "rabbitmq.templateUrl", "Template file (in freemarker format) for 
the rabbitmq.config config file",
+            
"classpath://org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config");
+
+    @SetFromFlag("amqpPort")
+    public static final PortAttributeSensorAndConfigKey AMQP_PORT = 
AmqpServer.AMQP_PORT;
+
+    @SetFromFlag("virtualHost")
+    public static final BasicAttributeSensorAndConfigKey<String> 
VIRTUAL_HOST_NAME = AmqpServer.VIRTUAL_HOST_NAME;
+
+    @SetFromFlag("amqpVersion")
+    public static final BasicAttributeSensorAndConfigKey<String> AMQP_VERSION 
= new BasicAttributeSensorAndConfigKey<String>(
+            AmqpServer.AMQP_VERSION, AmqpServer.AMQP_0_9_1);
+
+    @SetFromFlag("managmentPort")
+    public static final PortAttributeSensorAndConfigKey MANAGEMENT_PORT = new 
PortAttributeSensorAndConfigKey(
+            "rabbitmq.management.port", "Port on which management interface 
will be available", "15672+");
+
+    public static AttributeSensor<String> MANAGEMENT_URL = 
Sensors.newStringSensor(
+            "rabbitmq.management.url", "Management URL is only available if 
management plugin flag is true");
+
+    @SetFromFlag("enableManagementPlugin")
+    public static final ConfigKey<Boolean> ENABLE_MANAGEMENT_PLUGIN = 
ConfigKeys.newBooleanConfigKey(
+            "rabbitmq.management.plugin", "Management plugin will be enabled", 
false);
+
+    RabbitQueue createQueue(Map properties);
+
+    // TODO required by RabbitDestination due to close-coupling between that 
and RabbitBroker; how best to improve?
+    @Beta
+    Map<String, String> getShellEnvironment();
+    
+    @Beta
+    String getRunDir();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java
new file mode 100644
index 0000000..82347bf
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.rabbit;
+
+import static java.lang.String.format;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects.ToStringHelper;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single 
Rabbit MQ broker instance, using AMQP 0-9-1.
+ */
+public class RabbitBrokerImpl extends SoftwareProcessImpl implements 
RabbitBroker {
+    private static final Logger log = 
LoggerFactory.getLogger(RabbitBrokerImpl.class);
+
+    public String getVirtualHost() { return getAttribute(VIRTUAL_HOST_NAME); }
+    public String getAmqpVersion() { return getAttribute(AMQP_VERSION); }
+    public Integer getAmqpPort() { return getAttribute(AMQP_PORT); }
+
+    public RabbitBrokerImpl() {
+        super();
+    }
+
+    @Override
+    public RabbitDriver getDriver() {
+        return (RabbitDriver) super.getDriver();
+    }
+
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        return getDriver().getShellEnvironment();
+    }
+    
+    @Override
+    public String getRunDir() {
+        return getDriver().getRunDir();
+    }
+    
+    @Override
+    protected void postStart() {
+        super.postStart();
+
+        getDriver().configure();
+
+        // TODO implement this using AMQP connection, no external mechanism 
available
+        // queueNames.each { String name -> addQueue(name) }
+    }
+
+    public void setBrokerUrl() {
+        String urlFormat = "amqp://guest:guest@%s:%d/%s";
+        setAttribute(BROKER_URL, format(urlFormat, getAttribute(HOSTNAME), 
getAttribute(AMQP_PORT), getAttribute(VIRTUAL_HOST_NAME)));
+    }
+
+    public RabbitQueue createQueue(Map properties) {
+        RabbitQueue result = 
addChild(EntitySpec.create(RabbitQueue.class).configure(properties));
+        Entities.manage(result);
+        result.create();
+        return result;
+    }
+
+    @Override
+    public Class<? extends RabbitDriver> getDriverInterface() {
+        return RabbitDriver.class;
+    }
+
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+
+        connectServiceUpIsRunning();
+
+        setBrokerUrl();
+
+        if (getEnableManagementPlugin()) {
+            setAttribute(MANAGEMENT_URL, format("http://%s:%s/";, 
getAttribute(HOSTNAME), getAttribute(MANAGEMENT_PORT)));
+        }
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        disconnectServiceUpIsRunning();
+    }
+
+    public boolean getEnableManagementPlugin() {
+        return Boolean.TRUE.equals(getConfig(ENABLE_MANAGEMENT_PLUGIN));
+    }
+
+    public Integer getManagementPort() {
+        return getAttribute(MANAGEMENT_PORT);
+    }
+
+    @Override
+    protected ToStringHelper toStringHelper() {
+        return super.toStringHelper().add("amqpPort", getAmqpPort());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDestination.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDestination.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDestination.java
new file mode 100644
index 0000000..0f2a7e4
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDestination.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.rabbit;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.AbstractEntity;
+import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import com.google.common.base.Objects.ToStringHelper;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+
+public abstract class RabbitDestination extends AbstractEntity implements 
AmqpExchange {
+    public static final Logger log = 
LoggerFactory.getLogger(RabbitDestination.class);
+    
+    private String virtualHost;
+    private String exchange;
+    protected SshMachineLocation machine;
+    protected Map<String,String> shellEnvironment;
+
+    public RabbitDestination() {
+    }
+
+    @Override
+    public void onManagementStarting() {
+        super.onManagementStarting();
+        
+        exchange = (getConfig(EXCHANGE_NAME) != null) ? 
getConfig(EXCHANGE_NAME) : getDefaultExchangeName();
+        virtualHost = getConfig(RabbitBroker.VIRTUAL_HOST_NAME);
+        setAttribute(RabbitBroker.VIRTUAL_HOST_NAME, virtualHost);
+        
+        machine = (SshMachineLocation) 
Iterables.find(getParent().getLocations(), 
Predicates.instanceOf(SshMachineLocation.class));
+        shellEnvironment = getParent().getShellEnvironment();
+    }
+
+    // FIXME Should return RabbitBroker; won't work if gets a proxy rather 
than "real" entity
+    @Override
+    public RabbitBroker getParent() {
+        return (RabbitBroker) super.getParent();
+    }
+    
+    public void create() {
+        connectSensors();
+    }
+    
+    public void delete() {
+        disconnectSensors();
+    }
+
+    protected void connectSensors() { }
+
+    protected void disconnectSensors() { }
+
+    public String getVirtualHost() {
+        return virtualHost;
+    }
+    
+    @Override
+    public String getExchangeName() { 
+        return exchange;
+    }
+    
+    public String getDefaultExchangeName() {
+        return AmqpExchange.DIRECT;
+    }
+
+    @Override
+    protected ToStringHelper toStringHelper() {
+        return super.toStringHelper().add("virtualHost", 
getParent().getVirtualHost()).add("exchange", getExchangeName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDriver.java
new file mode 100644
index 0000000..2c989d5
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDriver.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.rabbit;
+
+import java.util.Map;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+public interface RabbitDriver extends SoftwareProcessDriver {
+    
+    public void configure();
+    
+    public Map<String, String> getShellEnvironment();
+
+    public String getRunDir();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitQueue.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitQueue.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitQueue.java
new file mode 100644
index 0000000..cb12b4f
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitQueue.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.rabbit;
+
+import org.apache.brooklyn.entity.messaging.Queue;
+import brooklyn.event.feed.ssh.SshFeed;
+import brooklyn.event.feed.ssh.SshPollConfig;
+import brooklyn.event.feed.ssh.SshPollValue;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+
+public class RabbitQueue extends RabbitDestination implements Queue {
+
+    private SshFeed sshFeed;
+
+    public RabbitQueue() {
+    }
+    
+    public String getName() {
+        return getDisplayName();
+    }
+
+    @Override
+    public void create() {
+        setAttribute(QUEUE_NAME, getName());
+        super.create();
+    }
+
+    @Override
+    protected void connectSensors() {
+        String runDir = getParent().getRunDir();
+        String cmd = String.format("%s/sbin/rabbitmqctl list_queues -p /%s  | 
grep '%s'", runDir, getVirtualHost(), getQueueName());
+        
+        sshFeed = SshFeed.builder()
+                .entity(this)
+                .machine(machine)
+                .poll(new SshPollConfig<Integer>(QUEUE_DEPTH_BYTES)
+                        .env(shellEnvironment)
+                        .command(cmd)
+                        .onFailure(Functions.constant(-1))
+                        .onSuccess(new Function<SshPollValue, Integer>() {
+                                @Override public Integer apply(SshPollValue 
input) {
+                                    return 0; // TODO parse out queue depth 
from output
+                                }}))
+                .poll(new SshPollConfig<Integer>(QUEUE_DEPTH_MESSAGES)
+                        .env(shellEnvironment)
+                        .command(cmd)
+                        .onFailure(Functions.constant(-1))
+                        .onSuccess(new Function<SshPollValue, Integer>() {
+                                @Override public Integer apply(SshPollValue 
input) {
+                                    return 0; // TODO parse out queue depth 
from output
+                                }}))
+                .build();
+    }
+    
+    @Override
+    protected void disconnectSensors() {
+        if (sshFeed != null) sshFeed.stop();
+        super.disconnectSensors();
+    }
+    
+    /**
+     * Return the AMQP name for the queue.
+     */
+    public String getQueueName() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java
new file mode 100644
index 0000000..56e248f
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.rabbit;
+
+import static brooklyn.util.ssh.BashCommands.*;
+import static java.lang.String.format;
+
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.lifecycle.ScriptHelper;
+import org.apache.brooklyn.entity.messaging.amqp.AmqpServer;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+
+/**
+ * TODO javadoc
+ */
+public class RabbitSshDriver extends AbstractSoftwareProcessSshDriver 
implements RabbitDriver {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RabbitSshDriver.class);
+
+    // See http://fedoraproject.org/wiki/EPEL/FAQ#howtouse
+    private static final Map<String, String> CENTOS_VERSION_TO_EPEL_VERSION = 
ImmutableMap.of(
+        "5", "5-4",
+        "6", "6-8",
+        "7", "7-5"
+    );
+
+    public RabbitSshDriver(RabbitBrokerImpl entity, SshMachineLocation 
machine) {
+        super(entity, machine);
+    }
+
+    protected String getLogFileLocation() { return 
getRunDir()+"/"+entity.getId()+".log"; }
+
+    public Integer getAmqpPort() { return 
entity.getAttribute(AmqpServer.AMQP_PORT); }
+
+    public String getVirtualHost() { return 
entity.getAttribute(AmqpServer.VIRTUAL_HOST_NAME); }
+
+    public String getErlangVersion() { return 
entity.getConfig(RabbitBroker.ERLANG_VERSION); }
+
+    @Override
+    public RabbitBrokerImpl getEntity() {
+        return (RabbitBrokerImpl) super.getEntity();
+    }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), 
resolver.getUnpackedDirectoryName(format("rabbitmq_server-%s", getVersion()))));
+    }
+
+    @Override
+    public void install() {
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+        // Version and architecture are only required for download of epel 
package on RHEL/Centos systems so pick sensible
+        // defaults if unavailable
+        String osMajorVersion = getMachine().getOsDetails().getVersion();
+        if (Strings.isNullOrEmpty(osMajorVersion)) {
+            osMajorVersion = "7";
+        } else {
+            osMajorVersion = osMajorVersion.indexOf(".") > 0 ? 
osMajorVersion.substring(0, osMajorVersion.indexOf('.')) : osMajorVersion;
+            if 
(!CENTOS_VERSION_TO_EPEL_VERSION.keySet().contains(osMajorVersion)) {
+                osMajorVersion = "7";
+            }
+        }
+        String epelVersion = 
CENTOS_VERSION_TO_EPEL_VERSION.get(osMajorVersion);
+        String osArchitecture = getMachine().getOsDetails().getArch();
+        if (Strings.isNullOrEmpty(osArchitecture)) {
+            osArchitecture = "x86_64";
+        }
+
+        List<String> commands = ImmutableList.<String>builder()
+                // EPEL repository for erlang install required on some Centos 
distributions
+                .add(chainGroup("which yum", sudo("yum -y update 
ca-certificates"), sudo("rpm -Uvh " +
+                        
format("http://download.fedoraproject.org/pub/epel/%s/%s/epel-release-%s.noarch.rpm";,
 osMajorVersion, osArchitecture, epelVersion))))
+                .add(ifExecutableElse0("zypper", chainGroup(
+                        ok(sudo("zypper --non-interactive addrepo 
http://download.opensuse.org/repositories/devel:/languages:/erlang/SLE_11_SP3 
erlang_sles_11")),
+                        ok(sudo("zypper --non-interactive addrepo 
http://download.opensuse.org/repositories/devel:/languages:/erlang/openSUSE_11.4
 erlang_suse_11")),
+                        ok(sudo("zypper --non-interactive addrepo 
http://download.opensuse.org/repositories/devel:/languages:/erlang/openSUSE_12.3
 erlang_suse_12")),
+                        ok(sudo("zypper --non-interactive addrepo 
http://download.opensuse.org/repositories/devel:/languages:/erlang/openSUSE_13.1
 erlang_suse_13")))))
+                .add(installPackage( // NOTE only 'port' states the version of 
Erlang used, maybe remove this constraint?
+                        ImmutableMap.of(
+                                "apt", "erlang-nox erlang-dev",
+                                "port", "erlang@"+getErlangVersion()+"+ssl"),
+                        "erlang"))
+                .addAll(commandsToDownloadUrlsAs(urls, saveAs))
+                .add(installExecutable("tar"))
+                .add(format("tar xvzf %s",saveAs))
+                .build();
+
+        newScript(INSTALLING).
+                failOnNonZeroResultCode().
+                body.append(commands).execute();
+    }
+
+    @Override
+    public void customize() {
+        Networking.checkPortsValid(MutableMap.of("amqpPort", getAmqpPort()));
+        ScriptHelper scriptHelper = newScript(CUSTOMIZING);
+
+        scriptHelper.body.append(
+                format("cp -R %s/* .", getExpandedInstallDir())
+        );
+
+        if 
(Boolean.TRUE.equals(entity.getConfig(RabbitBroker.ENABLE_MANAGEMENT_PLUGIN))) {
+            scriptHelper.body.append(
+                    "./sbin/rabbitmq-plugins enable rabbitmq_management"
+            );
+        }
+        scriptHelper.failOnNonZeroResultCode();
+        scriptHelper.execute();
+
+        copyTemplate(entity.getConfig(RabbitBroker.CONFIG_TEMPLATE_URL), 
getConfigPath() + ".config");
+    }
+
+    @Override
+    public void launch() {
+        newScript(MutableMap.of("usePidFile", false), LAUNCHING)
+            .body.append(
+                "nohup ./sbin/rabbitmq-server > console-out.log 2> 
console-err.log &",
+                "for i in {1..10}\n" +
+                    "do\n" +
+                     "    grep 'broker running' console-out.log && exit\n" +
+                     "    sleep 1\n" +
+                     "done",
+                "echo \"Couldn't determine if rabbitmq-server is running\"",
+                "exit 1"
+            ).execute();
+    }
+
+    @Override
+    public void configure() {
+        newScript(CUSTOMIZING)
+            .body.append(
+                "./sbin/rabbitmqctl add_vhost "+getEntity().getVirtualHost(),
+                "./sbin/rabbitmqctl set_permissions -p 
"+getEntity().getVirtualHost()+" guest \".*\" \".*\" \".*\""
+            ).execute();
+    }
+
+
+    public String getPidFile() { return "rabbitmq.pid"; }
+
+    @Override
+    public boolean isRunning() {
+        return newScript(MutableMap.of("usePidFile", false), CHECK_RUNNING)
+                .body.append("./sbin/rabbitmqctl -q status")
+                .execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(MutableMap.of("usePidFile", false), STOPPING)
+                .body.append("./sbin/rabbitmqctl stop")
+                .execute();
+    }
+
+
+    @Override
+    public void kill() {
+        stop(); // TODO No pid file to easily do `kill -9`
+    }
+
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        return MutableMap.<String, String>builder()
+                .putAll(super.getShellEnvironment())
+                .put("RABBITMQ_HOME", getRunDir())
+                .put("RABBITMQ_LOG_BASE", getRunDir())
+                .put("RABBITMQ_NODENAME", getEntity().getId())
+                .put("RABBITMQ_NODE_PORT", getAmqpPort().toString())
+                .put("RABBITMQ_PID_FILE", getRunDir()+"/"+getPidFile())
+                .put("RABBITMQ_CONFIG_FILE", getConfigPath())
+                .build();
+    }
+
+    private String getConfigPath() {
+        return getRunDir() + "/rabbitmq";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/Storm.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/Storm.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/Storm.java
new file mode 100644
index 0000000..8c4a4bf
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/Storm.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.config.render.RendererHints;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.UsesJmx;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a Storm 
node (UI, Nimbus or Supervisor).
+ */
+@Catalog(name="Storm Node", description="Apache Storm is a distributed 
realtime computation system. "
+        + "Storm makes it easy to reliably process unbounded streams of data, 
doing for realtime processing "
+        + "what Hadoop did for batch processing")
+@ImplementedBy(StormImpl.class)
+public interface Storm extends SoftwareProcess, UsesJmx {
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = 
ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.8.2");
+
+    @SetFromFlag("nimbusHostname")
+    ConfigKey<String> NIMBUS_HOSTNAME = 
ConfigKeys.newStringConfigKey("storm.nimbus.hostname");
+    
+    @SetFromFlag("nimbusEntity")
+    ConfigKey<Entity> NIMBUS_ENTITY = ConfigKeys.newConfigKey(Entity.class, 
"storm.nimbus.entity");
+
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new 
BasicAttributeSensorAndConfigKey<String>(
+            SoftwareProcess.DOWNLOAD_URL, 
"https://dl.dropboxusercontent.com/s/fl4kr7w0oc8ihdw/storm-${version}.zip";);
+
+    ConfigKey<Object> START_MUTEX = ConfigKeys.newConfigKey(Object.class, 
"storm.start.mutex");
+
+    @SetFromFlag("role")
+    ConfigKey<Role> ROLE = ConfigKeys.newConfigKey(Role.class, "storm.role", 
"The Storm server role");
+
+    @SetFromFlag("localDir")
+    ConfigKey<String> LOCAL_DIR = 
ConfigKeys.newStringConfigKey("storm.local.dir", "Setting for Storm local dir");
+    
+    @SetFromFlag("uiPort")
+    PortAttributeSensorAndConfigKey UI_PORT = new 
PortAttributeSensorAndConfigKey("storm.ui.port", "Storm UI port", "8080+");
+
+    @SetFromFlag("thriftPort")
+    PortAttributeSensorAndConfigKey THRIFT_PORT = new 
PortAttributeSensorAndConfigKey("storm.thrift.port", "Storm Thrift port", 
"6627");
+
+    @SetFromFlag("zookeeperEnsemble")
+    ConfigKey<ZooKeeperEnsemble> ZOOKEEPER_ENSEMBLE = 
ConfigKeys.newConfigKey(ZooKeeperEnsemble.class,
+            "storm.zookeeper.ensemble", "Zookeeper ensemble entity");
+
+    @SetFromFlag("stormConfigTemplateUrl")
+    ConfigKey<String> STORM_CONFIG_TEMPLATE_URL = 
ConfigKeys.newStringConfigKey("storm.config.templateUrl",
+            "Template file (in freemarker format) for the storm.yaml config 
file",
+            
"classpath://org/apache/brooklyn/entity/messaging/storm/storm.yaml");
+
+    @SetFromFlag("zeromqVersion")
+    ConfigKey<String> ZEROMQ_VERSION = 
ConfigKeys.newStringConfigKey("storm.zeromq.version", "zeromq version", 
"2.1.7");
+
+    AttributeSensor<Boolean> SERVICE_UP_JMX = 
Sensors.newBooleanSensor("storm.service.jmx.up", "Whether JMX is up for this 
service");
+
+    String getStormConfigTemplateUrl();
+
+    String getHostname();
+
+    Role getRole();
+    
+    enum Role { NIMBUS, SUPERVISOR, UI }
+
+    AttributeSensor<String> STORM_UI_URL = StormUiUrl.STORM_UI_URL;
+    
+    class StormUiUrl {
+        public static final AttributeSensor<String> STORM_UI_URL = 
Sensors.newStringSensor("storm.ui.url", "URL");
+
+        static {
+            RendererHints.register(STORM_UI_URL, 
RendererHints.namedActionWithUrl());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeployment.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeployment.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeployment.java
new file mode 100644
index 0000000..8f505a6
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeployment.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.trait.Startable;
+
+@Catalog(name="Storm Deployment", description="A Storm cluster. Apache Storm 
is a distributed realtime computation system. "
+        + "Storm makes it easy to reliably process unbounded streams of data, 
doing for realtime processing "
+        + "what Hadoop did for batch processing")
+@ImplementedBy(StormDeploymentImpl.class)
+public interface StormDeployment extends Entity, Startable {
+
+    @SetFromFlag("supervisors.count")
+    ConfigKey<Integer> SUPERVISORS_COUNT = 
ConfigKeys.newConfigKey("storm.supervisors.count", "Number of supervisor 
nodes", 3);
+
+    @SetFromFlag("zookeepers.count")
+    ConfigKey<Integer> ZOOKEEPERS_COUNT = 
ConfigKeys.newConfigKey("storm.zookeepers.count", "Number of zookeeper nodes", 
1);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeploymentImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeploymentImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeploymentImpl.java
new file mode 100644
index 0000000..0e5d006
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeploymentImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import static org.apache.brooklyn.entity.messaging.storm.Storm.ROLE;
+import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.NIMBUS;
+import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.SUPERVISOR;
+import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.UI;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.core.util.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.BasicStartableImpl;
+import brooklyn.entity.group.DynamicCluster;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
+
+public class StormDeploymentImpl extends BasicStartableImpl implements 
StormDeployment {
+
+    @SuppressWarnings("unused")
+    private static final Logger log = 
LoggerFactory.getLogger(StormDeploymentImpl.class);
+
+    @Override
+    public void init() {
+        super.init();
+        new 
ResourceUtils(this).checkUrlExists(Storm.STORM_CONFIG_TEMPLATE_URL.getDefaultValue());
+        
+        setDefaultDisplayName("Storm Deployment");
+        
+        ZooKeeperEnsemble zooKeeperEnsemble = addChild(EntitySpec.create(
+            ZooKeeperEnsemble.class).configure(
+                ZooKeeperEnsemble.INITIAL_SIZE, getConfig(ZOOKEEPERS_COUNT)));
+        
+        setConfig(Storm.ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble);
+        
+        Storm nimbus = addChild(EntitySpec.create(Storm.class).configure(ROLE, 
NIMBUS));
+        
+        setConfig(Storm.NIMBUS_ENTITY, nimbus);
+        setConfig(Storm.START_MUTEX, new Object());
+        
+        addChild(EntitySpec.create(DynamicCluster.class)
+            .configure(DynamicCluster.MEMBER_SPEC, 
+                EntitySpec.create(Storm.class).configure(ROLE, SUPERVISOR))
+            .configure(DynamicCluster.INITIAL_SIZE, 
getConfig(SUPERVISORS_COUNT))
+            .displayName("Storm Supervisor Cluster"));
+        
+        Storm ui = addChild(EntitySpec.create(Storm.class).configure(ROLE, 
UI));
+        
+        addEnricher(Enrichers.builder()
+                .propagating(Storm.STORM_UI_URL)
+                .from(ui)
+                .build());
+        addEnricher(Enrichers.builder()
+                .propagating(Attributes.HOSTNAME)
+                .from(nimbus)
+                .build());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDriver.java
new file mode 100644
index 0000000..f91429e
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+
+public interface StormDriver extends JavaSoftwareProcessDriver {
+
+    String getJvmOptsLine();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java
new file mode 100644
index 0000000..b2e5436
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.java.JavaAppUtils;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.event.feed.jmx.JmxHelper;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+public class StormImpl extends SoftwareProcessImpl implements Storm {
+
+    private static final Logger log = LoggerFactory.getLogger(StormImpl.class);
+
+    public static final ObjectName STORM_MBEAN = 
JmxHelper.createObjectName("backtype.storm.daemon.nimbus:type=*");
+
+    private JmxHelper jmxHelper;
+    private volatile JmxFeed jmxFeed;
+    
+    public StormImpl() {}
+    
+    @Override
+    public String getHostname() { return getAttribute(HOSTNAME); }
+
+    @Override
+    public Role getRole() { return getConfig(ROLE); }
+
+    @Override
+    public String getStormConfigTemplateUrl() { return 
getConfig(STORM_CONFIG_TEMPLATE_URL); }   
+    
+    @Override
+    public Class<?> getDriverInterface() {
+        return StormDriver.class;
+    }
+
+    public String getRoleName() { return getRole().name().toLowerCase(); }
+
+    @Override
+    protected void preStart() {
+        setDefaultDisplayName("Storm Node ("+ getRoleName()+")");
+        super.preStart();
+    }
+    
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+
+        // give it plenty of time to start before we advertise ourselves
+        Time.sleep(Duration.TEN_SECONDS);
+
+        if (getRole() == Role.UI) {
+            setAttribute(STORM_UI_URL, 
"http://"+getAttribute(Attributes.HOSTNAME)+":"+getAttribute(UI_PORT)+"/");
+        }
+
+        if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
+            jmxHelper = new JmxHelper(this);
+//            jmxFeed = JmxFeed.builder()
+//                    .entity(this)
+//                    .period(3000, TimeUnit.MILLISECONDS)
+//                    .helper(jmxHelper)
+//                    .pollAttribute(new 
JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
+//                            .objectName(STORM_MBEAN)
+//                            .attributeName("Initialized")
+//                            
.onSuccess(Functions.forPredicate(Predicates.notNull()))
+//                            .onException(Functions.constant(false)))
+//                    // TODO SERVICE_UP should really be a combo of JMX plus 
is running
+//                    .pollAttribute(new 
JmxAttributePollConfig<Boolean>(SERVICE_UP)
+//                            .objectName(STORM_MBEAN)
+//                            .attributeName("Initialized")
+//                            
.onSuccess(Functions.forPredicate(Predicates.notNull()))
+//                            .onException(Functions.constant(false)))
+//                    .build();
+            jmxFeed = JavaAppUtils.connectMXBeanSensors(this);
+            
+            // FIXME for now we do service up based on pid check -- we get a 
warning that:
+            // JMX object backtype.storm.daemon.nimbus:type=* not found at 
service:jmx:jmxmp://108.59.82.105:31001
+            // (JMX is up fine, but no such object there)
+            connectServiceUpIsRunning();
+         } else {
+            // if not using JMX
+            log.warn("Storm running without JMX monitoring; limited visibility 
of service available");
+            connectServiceUpIsRunning();
+        }
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        disconnectServiceUpIsRunning();
+        if (jmxFeed != null) jmxFeed.stop();
+        if (jmxHelper !=null) jmxHelper.terminate();
+    }
+
+}

Reply via email to