http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDriver.java
new file mode 100644
index 0000000..99a7b86
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDriver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+
+public interface ActiveMQDriver extends JavaSoftwareProcessDriver {
+
+    String getBrokerName();
+
+    Integer getOpenWirePort();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueue.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueue.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueue.java
new file mode 100644
index 0000000..e8c8a15
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueue.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+
+import org.apache.brooklyn.entity.messaging.Queue;
+
+@ImplementedBy(ActiveMQQueueImpl.class)
+public interface ActiveMQQueue extends ActiveMQDestination, Queue {
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueueImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueueImpl.java
new file mode 100644
index 0000000..63d5c1c
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueueImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+
+public class ActiveMQQueueImpl extends ActiveMQDestinationImpl implements 
ActiveMQQueue {
+    public static final Logger log = 
LoggerFactory.getLogger(ActiveMQQueue.class);
+
+    public ActiveMQQueueImpl() {
+    }
+
+    @Override
+    public void onManagementStarting() {
+        super.onManagementStarting();
+        setAttribute(QUEUE_NAME, getName());
+    }
+
+    public String getQueueName() {
+        return getName();
+    }
+    
+    public void create() {
+        log.debug("{} adding queue {} to broker {}", new Object[] {this, 
getName(), jmxHelper.getAttribute(brokerMBeanName, "BrokerName")});
+        
+        jmxHelper.operation(brokerMBeanName, "addQueue", getName());
+        
+        connectSensors();
+    }
+
+    public void delete() {
+        jmxHelper.operation(brokerMBeanName, "removeQueue", getName());
+        disconnectSensors();
+    }
+
+    @Override
+    protected void connectSensors() {
+        String queue = 
String.format("org.apache.activemq:type=Broker,brokerName=%s,destinationType=Queue,destinationName=%s",
 getBrokerName(), getName());
+        
+        jmxFeed = JmxFeed.builder()
+                .entity(this)
+                .helper(jmxHelper)
+                .pollAttribute(new 
JmxAttributePollConfig<Integer>(QUEUE_DEPTH_MESSAGES)
+                        .objectName(queue)
+                        .attributeName("QueueSize"))
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSpecs.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSpecs.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSpecs.java
new file mode 100644
index 0000000..813cba0
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSpecs.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+
+public class ActiveMQSpecs {
+
+    public static EntitySpec<ActiveMQBroker> brokerSpec() {
+        return EntitySpec.create(ActiveMQBroker.class);
+    }
+    
+    public static EntitySpec<ActiveMQBroker> brokerSpecChef() {
+        return EntitySpec.create(ActiveMQBroker.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSshDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSshDriver.java
new file mode 100644
index 0000000..77602e4
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSshDriver.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+import static java.lang.String.format;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+
+import com.google.common.collect.ImmutableMap;
+
+public class ActiveMQSshDriver extends JavaSoftwareProcessSshDriver implements 
ActiveMQDriver {
+
+    public ActiveMQSshDriver(ActiveMQBrokerImpl entity, SshMachineLocation 
machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    protected String getLogFileLocation() { 
+        return Os.mergePathsUnix(getRunDir(), "data/activemq.log");
+    }
+
+    @Override
+    public String getBrokerName() { 
+        return entity.getAttribute(ActiveMQBroker.BROKER_NAME);
+    }
+
+    @Override
+    public Integer getOpenWirePort() { 
+        return entity.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
+    }
+
+    public String getMirrorUrl() {
+        return entity.getConfig(ActiveMQBroker.MIRROR_URL);
+    }
+
+    protected String getTemplateConfigurationUrl() {
+        return entity.getAttribute(ActiveMQBroker.TEMPLATE_CONFIGURATION_URL);
+    }
+
+    public String getPidFile() {
+        return "data/activemq.pid";
+    }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), 
resolver.getUnpackedDirectoryName(format("apache-activemq-%s", getVersion()))));
+    }
+
+    @Override
+    public void install() {
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+
+        List<String> commands = new LinkedList<String>();
+        commands.addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs));
+        commands.add(BashCommands.INSTALL_TAR);
+        commands.add("tar xzfv "+saveAs);
+
+        newScript(INSTALLING)
+                .body.append(commands)
+                .execute();
+    }
+
+    @Override
+    public void customize() {
+        Networking.checkPortsValid(ImmutableMap.of("jmxPort", getJmxPort(), 
"openWirePort", getOpenWirePort()));
+        newScript(CUSTOMIZING)
+                .body.append(
+                        format("cp -R %s/{bin,conf,data,lib,webapps} .", 
getExpandedInstallDir()),
+                        // Required in version 5.5.1 (at least), but not in 
version 5.7.0
+                        "sed -i.bk 's/\\[-z \"$JAVA_HOME\"]/\\[ -z 
\"$JAVA_HOME\" ]/g' bin/activemq",
+                        // Stop it writing to dev null on start
+                        "sed -i.bk 
\"s/\\(ACTIVEMQ_HOME..bin.run.jar.*\\)>.dev.null/\\1/\" bin/activemq",
+                        // Required if launching multiple AMQ's, prevent jetty 
port conflicts
+                        "sed -i.bk 
's/8161/"+getEntity().getAttribute(ActiveMQBroker.AMQ_JETTY_PORT)+"/g' 
conf/jetty.xml"
+                        // TODO disable persistence (this should be a flag -- 
but it seems to have no effect, despite ):
+                        // "sed -i.bk 's/broker /broker persistent=\"false\" 
/g' conf/activemq.xml",
+                    )
+                .execute();
+
+        // Copy the configuration file across
+        String destinationConfigFile = Os.mergePathsUnix(getRunDir(), 
"conf/activemq.xml");
+        copyTemplate(getTemplateConfigurationUrl(), destinationConfigFile);
+    }
+
+    @Override
+    public void launch() {
+        // Using nohup, as recommended at 
http://activemq.apache.org/run-broker.html
+        newScript(ImmutableMap.of(USE_PID_FILE, false), LAUNCHING)
+                .body.append("nohup ./bin/activemq start > 
./data/activemq-extra.log 2>&1 &")
+                .execute();
+    }
+    
+    @Override
+    public boolean isRunning() {
+        return newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), 
CHECK_RUNNING).execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), 
STOPPING).execute();
+    }
+
+    @Override
+    public void kill() {
+        newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), 
KILLING).execute();
+    }
+
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        return MutableMap.<String,String>builder()
+                .putAll(super.getShellEnvironment())
+                .put("ACTIVEMQ_HOME", getRunDir())
+                .put("ACTIVEMQ_PIDFILE", getPidFile())
+                .renameKey("JAVA_OPTS", "ACTIVEMQ_OPTS")
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopic.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopic.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopic.java
new file mode 100644
index 0000000..536ce09
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopic.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+
+import org.apache.brooklyn.entity.messaging.Topic;
+
+@ImplementedBy(ActiveMQTopicImpl.class)
+public interface ActiveMQTopic extends ActiveMQDestination, Topic {
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopicImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopicImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopicImpl.java
new file mode 100644
index 0000000..1724b1f
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopicImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.activemq;
+
+
+public class ActiveMQTopicImpl extends ActiveMQDestinationImpl implements 
ActiveMQTopic {
+    public ActiveMQTopicImpl() {
+    }
+
+    @Override
+    public void onManagementStarting() {
+        super.onManagementStarting();
+        setAttribute(TOPIC_NAME, getName());
+    }
+
+    @Override
+    public void create() {
+        jmxHelper.operation(brokerMBeanName, "addTopic", getName());
+        connectSensors();
+    }
+
+    public void delete() {
+        jmxHelper.operation(brokerMBeanName, "removeTopic", getName());
+        disconnectSensors();
+    }
+
+    public void connectSensors() {
+        //TODO add sensors for topics
+    }
+
+    public String getTopicName() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpExchange.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpExchange.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpExchange.java
new file mode 100644
index 0000000..f04c116
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpExchange.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.amqp;
+
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+
+/**
+ * An interface that describes an AMQP exchange.
+ */
+public interface AmqpExchange {
+
+    /* AMQP standard exchange names. */
+    
+    String DIRECT = "amq.direct";
+    String TOPIC = "amq.topic";
+
+    /** The AMQP exchange name {@link Sensor}. */
+    @SetFromFlag("exchange")
+    BasicAttributeSensorAndConfigKey<String> EXCHANGE_NAME = new 
BasicAttributeSensorAndConfigKey<String>(
+            String.class, "amqp.exchange.name", "AMQP exchange name");
+
+    /**
+     * Return the AMQP exchange name.
+     */
+    public String getExchangeName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpServer.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpServer.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpServer.java
new file mode 100644
index 0000000..97cce88
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpServer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.amqp;
+
+import org.apache.brooklyn.api.entity.Entity;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+
+/**
+ * Marker interface identifying AMQP servers.
+ */
+public interface AmqpServer extends Entity {
+    
+    /* AMQP protocol version strings. */
+
+    String AMQP_0_8 = "0-8";
+    String AMQP_0_9 = "0-9";
+    String AMQP_0_9_1 = "0-9-1";
+    String AMQP_0_10 = "0-10";
+    String AMQP_1_0 = "1-0";
+
+    PortAttributeSensorAndConfigKey AMQP_PORT = Attributes.AMQP_PORT;
+
+    BasicAttributeSensorAndConfigKey<String> VIRTUAL_HOST_NAME = new 
BasicAttributeSensorAndConfigKey<String>(
+            String.class, "amqp.virtualHost", "AMQP virtual host name", 
"localhost");
+
+    BasicAttributeSensorAndConfigKey<String> AMQP_VERSION = new 
BasicAttributeSensorAndConfigKey<String>(
+            String.class, "amqp.version", "AMQP protocol version");
+
+    String getVirtualHost();
+
+    String getAmqpVersion();
+
+    Integer getAmqpPort();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBroker.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBroker.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBroker.java
new file mode 100644
index 0000000..a83d259
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBroker.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.jms;
+
+import java.util.Collection;
+import java.util.Map;
+
+import brooklyn.entity.basic.SoftwareProcess;
+import org.apache.brooklyn.entity.messaging.MessageBroker;
+import org.apache.brooklyn.entity.messaging.Queue;
+import org.apache.brooklyn.entity.messaging.Topic;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public interface JMSBroker<Q extends JMSDestination & Queue, T extends 
JMSDestination & Topic> extends SoftwareProcess, MessageBroker {
+    
+    @VisibleForTesting
+    public Collection<String> getQueueNames();
+    
+    @VisibleForTesting
+    public Collection<String> getTopicNames();
+
+    @VisibleForTesting
+    public Map<String, Q> getQueues();
+    
+    @VisibleForTesting
+    public Map<String, T> getTopics();
+    
+    /** TODO make this an effector */
+    public void addQueue(String name);
+    
+    public void addQueue(String name, Map properties);
+
+    public Q createQueue(Map properties);
+
+    /** TODO make this an effector */
+    public void addTopic(String name);
+    
+    public void addTopic(String name, Map properties);
+
+    public T createTopic(Map properties);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBrokerImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBrokerImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBrokerImpl.java
new file mode 100644
index 0000000..6fa16a0
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBrokerImpl.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.jms;
+
+import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import org.apache.brooklyn.entity.messaging.Queue;
+import org.apache.brooklyn.entity.messaging.Topic;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public abstract class JMSBrokerImpl<Q extends JMSDestination & Queue, T 
extends JMSDestination & Topic> extends SoftwareProcessImpl implements 
JMSBroker<Q,T> {
+    private static final Logger log = LoggerFactory.getLogger(JMSBroker.class);
+    
+    Collection<String> queueNames;
+    Collection<String> topicNames;
+    Map<String, Q> queues = Maps.newLinkedHashMap();
+    Map<String, T> topics = Maps.newLinkedHashMap();
+
+    public JMSBrokerImpl() {
+    }
+
+    @Override
+    public JMSBrokerImpl configure(Map properties) {
+        if (queueNames==null) queueNames = Lists.newArrayList();
+        if (groovyTruth(properties.get("queue"))) queueNames.add((String) 
properties.remove("queue"));
+        if (groovyTruth(properties.get("queues"))) 
queueNames.addAll((Collection<String>) properties.remove("queues"));
+
+        if (topicNames==null) topicNames = Lists.newArrayList();
+        if (groovyTruth(properties.get("topic"))) topicNames.add((String) 
properties.remove("topic"));
+        if (groovyTruth(properties.get("topics"))) 
topicNames.addAll((Collection<String>) properties.remove("topics"));
+        
+        return (JMSBrokerImpl) super.configure(properties);
+    }
+
+    @Override
+    public Collection<String> getQueueNames() {
+        return queueNames;
+    }
+    
+    @Override
+    public Collection<String> getTopicNames() {
+        return topicNames;
+    }
+
+    @Override
+    public Map<String, Q> getQueues() {
+        return queues;
+    }
+    
+    @Override
+    public Map<String, T> getTopics() {
+        return topics;
+    }
+    
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+        setBrokerUrl();
+    }
+
+    // should be called after sensor-polling is activated etc
+    @Override
+    protected void postStart() {
+        super.postStart();
+        // stupid to do this here, but there appears to be a race where 
sometimes the
+        // broker throws a BrokerStopped exception, even though the sensor 
indicates it is up
+        Time.sleep(Duration.FIVE_SECONDS);
+        for (String name : queueNames) {
+            addQueue(name);
+        }
+        for (String name : topicNames) {
+            addTopic(name);
+        }
+    }
+    
+    @Override
+    public abstract void setBrokerUrl();
+
+    @Override
+    public void preStop() {
+        // If can't delete queues, continue trying to stop.
+        // (e.g. in CI have seen activemq "BrokerStoppedException" thrown in 
queue.destroy()). 
+        try {
+            for (JMSDestination queue : queues.values()) {
+                queue.destroy();
+            }
+        } catch (Exception e) {
+            log.warn("Error deleting queues from broker "+this+"; continuing 
with stop...", e);
+        }
+        
+        try {
+            for (JMSDestination topic : topics.values()) {
+                topic.destroy();
+            }
+        } catch (Exception e) {
+            log.warn("Error deleting topics from broker "+this+"; continuing 
with stop...", e);
+        }
+        
+        super.preStop();
+    }
+    
+    @Override
+    public void addQueue(String name) {
+        addQueue(name, MutableMap.of());
+    }
+    
+    public void checkStartingOrRunning() {
+        Lifecycle state = getAttribute(SERVICE_STATE_ACTUAL);
+        if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.RUNNING) return;
+        if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.STARTING) return;
+        // TODO this check may be redundant or even inappropriate
+        throw new IllegalStateException("Cannot run against "+this+" in state 
"+state);
+    }
+
+    @Override
+    public void addQueue(String name, Map properties) {
+        checkStartingOrRunning();
+        properties.put("name", name);
+        queues.put(name, createQueue(properties));
+    }
+
+    @Override
+    public abstract Q createQueue(Map properties);
+
+    @Override
+    public void addTopic(String name) {
+        addTopic(name, MutableMap.of());
+    }
+    
+    @Override
+    public void addTopic(String name, Map properties) {
+        checkStartingOrRunning();
+        properties.put("name", name);
+        topics.put(name, createTopic(properties));
+    }
+
+    @Override
+    public abstract T createTopic(Map properties);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestination.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestination.java
new file mode 100644
index 0000000..5591d66
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestination.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.jms;
+
+import org.apache.brooklyn.api.entity.Entity;
+
+public interface JMSDestination extends Entity {
+    public String getName();
+    
+    public void delete();
+
+    public void destroy();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestinationImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestinationImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestinationImpl.java
new file mode 100644
index 0000000..dbd100f
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestinationImpl.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.jms;
+
+import brooklyn.entity.basic.AbstractEntity;
+
+import com.google.common.base.Preconditions;
+
+public abstract class JMSDestinationImpl extends AbstractEntity implements 
JMSDestination {
+    public JMSDestinationImpl() {
+    }
+
+    @Override
+    public void onManagementStarting() {
+        super.onManagementStarting();
+        Preconditions.checkNotNull(getName(), "Name must be specified");
+    }
+
+    @Override
+    public String getName() {
+        return getDisplayName();
+    }
+    
+    protected abstract void connectSensors();
+
+    protected abstract void disconnectSensors();
+
+    public abstract void delete();
+
+    public void destroy() {
+        disconnectSensors();
+        delete();
+        super.destroy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
new file mode 100644
index 0000000..d101343
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import static java.lang.String.format;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+
+public abstract class AbstractfKafkaSshDriver extends 
JavaSoftwareProcessSshDriver {
+
+    @SuppressWarnings("unused")
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaZooKeeperSshDriver.class);
+
+    public AbstractfKafkaSshDriver(EntityLocal entity, SshMachineLocation 
machine) {
+        super(entity, machine);
+    }
+
+    protected abstract Map<String, Integer> getPortMap();
+
+    protected abstract ConfigKey<String> getConfigTemplateKey();
+
+    protected abstract String getConfigFileName();
+
+    protected abstract String getLaunchScriptName();
+
+    protected abstract String getTopicsScriptName();
+
+    protected abstract String getProcessIdentifier();
+
+    @Override
+    protected String getLogFileLocation() { return Os.mergePaths(getRunDir(), 
"console.out"); }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), 
resolver.getUnpackedDirectoryName(format("kafka_%s", getVersion()))));
+    }
+
+    @Override
+    public void install() {
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+
+        List<String> commands = new LinkedList<String>();
+        commands.addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs));
+        commands.add(BashCommands.INSTALL_TAR);
+        commands.add("tar xzfv "+saveAs);
+        commands.add("cd "+getExpandedInstallDir());
+
+        newScript(INSTALLING)
+                .body.append(commands)
+                .execute();
+    }
+
+    @Override
+    public void customize() {
+        Networking.checkPortsValid(getPortMap());
+
+        newScript(CUSTOMIZING)
+                .failOnNonZeroResultCode()
+                .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), 
getRunDir()))
+                .execute();
+
+        String config = entity.getConfig(getConfigTemplateKey());
+        copyTemplate(config, getConfigFileName());
+    }
+
+    @Override
+    public void launch() {
+        newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING)
+                .failOnNonZeroResultCode()
+                .body.append(String.format("nohup ./bin/%s ./%s > console.out 
2>&1 &", getLaunchScriptName(), getConfigFileName()))
+                .execute();
+    }
+
+    public String getPidFile() { return Os.mergePathsUnix(getRunDir(), 
"kafka.pid"); }
+
+    @Override
+    public boolean isRunning() {
+        return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), 
CHECK_RUNNING).execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(MutableMap.of(USE_PID_FILE, false), STOPPING)
+                .body.append(String.format("ps ax | grep %s | awk '{print $1}' 
| xargs kill", getProcessIdentifier()))
+                .body.append(String.format("ps ax | grep %s | awk '{print $1}' 
| xargs kill -9", getProcessIdentifier()))
+                .execute();
+    }
+
+    /**
+     * Use RMI agent to provide JMX.
+     */
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        return MutableMap.<String, String>builder()
+                .putAll(super.getShellEnvironment())
+                .renameKey("JAVA_OPTS", "KAFKA_JMX_OPTS")
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/Kafka.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/Kafka.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/Kafka.java
new file mode 100644
index 0000000..ed34c1e
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/Kafka.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+
+/**
+ * Shared Kafka broker and zookeeper properties.
+ */
+public interface Kafka {
+
+    ConfigKey<String> SUGGESTED_VERSION = 
ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, 
"2.9.2-0.8.2.1");
+
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new 
BasicAttributeSensorAndConfigKey<String>(
+            Attributes.DOWNLOAD_URL, 
"http://apache.cbox.biz/kafka/0.8.2.1/kafka_${version}.tgz";);
+
+    // TODO: Upgrade to version 0.8.0, which will require refactoring of the 
sensors to reflect the changes to the JMX beans
+//    @SetFromFlag("downloadUrl")
+//    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new 
BasicAttributeSensorAndConfigKey<String>(
+//            Attributes.DOWNLOAD_URL, 
"http://mirror.catn.com/pub/apache/kafka/${version}/kafka-${version}-src.tgz";);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBroker.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBroker.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBroker.java
new file mode 100644
index 0000000..6ae6b33
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBroker.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.UsesJmx;
+import org.apache.brooklyn.entity.messaging.MessageBroker;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+
+import org.apache.brooklyn.location.basic.PortRanges;
+
+import brooklyn.util.time.Duration;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single 
Kafka broker instance.
+ */
+@ImplementedBy(KafkaBrokerImpl.class)
+public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, 
Kafka {
+
+    @SetFromFlag("startTimeout")
+    ConfigKey<Duration> START_TIMEOUT = 
ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.START_TIMEOUT, 
Duration.FIVE_MINUTES);
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION;
+
+    @SetFromFlag("kafkaPort")
+    PortAttributeSensorAndConfigKey KAFKA_PORT = new 
PortAttributeSensorAndConfigKey("kafka.port", "Kafka port", "9092+");
+
+    /** Location of the configuration file template to be copied to the 
server.*/
+    @SetFromFlag("kafkaServerConfig")
+    ConfigKey<String> KAFKA_BROKER_CONFIG_TEMPLATE = new 
BasicConfigKey<String>(String.class,
+            "kafka.broker.configTemplate", "Kafka broker configuration 
template (in freemarker format)",
+            
"classpath://org/apache/brooklyn/entity/messaging/kafka/server.properties");
+
+    @SetFromFlag("zookeeper")
+    ConfigKey<ZooKeeperNode> ZOOKEEPER = new 
BasicConfigKey<ZooKeeperNode>(ZooKeeperNode.class, "kafka.broker.zookeeper", 
"Kafka zookeeper entity");
+
+    PortAttributeSensorAndConfigKey INTERNAL_JMX_PORT = new 
PortAttributeSensorAndConfigKey(
+            "internal.jmx.direct.port", "JMX internal port (started by Kafka 
broker, if using UsesJmx.JMX_AGENT_MODE is not null)", 
PortRanges.fromString("9999+"));
+
+    AttributeSensor<Integer> BROKER_ID = 
Sensors.newIntegerSensor("kafka.broker.id", "Kafka unique broker ID");
+
+    AttributeSensor<Long> FETCH_REQUEST_COUNT = 
Sensors.newLongSensor("kafka.broker.fetch.total", "Fetch request count");
+    AttributeSensor<Long> TOTAL_FETCH_TIME = 
Sensors.newLongSensor("kafka.broker.fetch.time.total", "Total fetch request 
processing time (millis)");
+    AttributeSensor<Double> MAX_FETCH_TIME = 
Sensors.newDoubleSensor("kafka.broker.fetch.time.max", "Max fetch request 
processing time (millis)");
+
+    AttributeSensor<Long> PRODUCE_REQUEST_COUNT = 
Sensors.newLongSensor("kafka.broker.produce.total", "Produce request count");
+    AttributeSensor<Long> TOTAL_PRODUCE_TIME = 
Sensors.newLongSensor("kafka.broker.produce.time.total", "Total produce request 
processing time (millis)");
+    AttributeSensor<Double> MAX_PRODUCE_TIME = 
Sensors.newDoubleSensor("kafka.broker.produce.time.max", "Max produce request 
processing time (millis)");
+
+    AttributeSensor<Long> BYTES_RECEIVED = 
Sensors.newLongSensor("kafka.broker.bytes.received", "Total bytes received");
+    AttributeSensor<Long> BYTES_SENT = 
Sensors.newLongSensor("kafka.broker.bytes.sent", "Total bytes sent");
+    
+    Integer getKafkaPort();
+
+    Integer getBrokerId();
+
+    ZooKeeperNode getZookeeper();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
new file mode 100644
index 0000000..357dae8
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+
+public interface KafkaBrokerDriver extends JavaSoftwareProcessDriver {
+
+    Integer getKafkaPort();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
new file mode 100644
index 0000000..b8a9076
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.ObjectName;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import org.apache.brooklyn.entity.messaging.MessageBroker;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.event.feed.jmx.JmxHelper;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Objects.ToStringHelper;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single 
Kafka broker instance.
+ */
+public class KafkaBrokerImpl extends SoftwareProcessImpl implements 
MessageBroker, KafkaBroker {
+
+    @SuppressWarnings("unused")
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaBrokerImpl.class);
+    private static final ObjectName SOCKET_SERVER_STATS_MBEAN = 
JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats");
+
+    private volatile JmxFeed jmxFeed;
+
+    public KafkaBrokerImpl() {
+        super();
+    }
+
+    @Override
+    public void init() {
+        super.init();
+        setAttribute(BROKER_ID, Math.abs(hashCode())); // Must be positive for 
partitioning to work
+    }
+
+    @Override
+    public Integer getKafkaPort() { return getAttribute(KAFKA_PORT); }
+
+    @Override
+    public Integer getBrokerId() { return getAttribute(BROKER_ID); }
+
+    @Override
+    public ZooKeeperNode getZookeeper() { return getConfig(ZOOKEEPER); }
+
+    @Override
+    public Class<?> getDriverInterface() {
+        return KafkaBrokerDriver.class;
+    }
+
+    @Override
+    public void waitForServiceUp(long duration, TimeUnit units) {
+        super.waitForServiceUp(duration, units);
+
+        if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) {
+            // Wait for the MBean to exist
+            JmxHelper helper = new JmxHelper(this);
+            try {
+                helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, 
units.toMillis(duration));
+            } finally {
+                helper.terminate();
+            }
+        }
+    }
+
+    @Override
+    protected void connectSensors() {
+        connectServiceUpIsRunning();
+        boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS);
+        
+        if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) {
+            jmxFeed = JmxFeed.builder()
+                .entity(this)
+                .period(500, TimeUnit.MILLISECONDS)
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
+                        .attributeName("NumFetchRequests")
+                        .onException(Functions.constant(-1l))
+                        .enabled(retrieveUsageMetrics))
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(TOTAL_FETCH_TIME)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
+                        .attributeName("TotalFetchRequestMs")
+                        .onException(Functions.constant(-1l))
+                        .enabled(retrieveUsageMetrics))
+                .pollAttribute(new 
JmxAttributePollConfig<Double>(MAX_FETCH_TIME)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
+                        .attributeName("MaxFetchRequestMs")
+                        .onException(Functions.constant(-1.0d))
+                        .enabled(retrieveUsageMetrics))
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(PRODUCE_REQUEST_COUNT)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
+                        .attributeName("NumProduceRequests")
+                        .onException(Functions.constant(-1l))
+                        .enabled(retrieveUsageMetrics))
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(TOTAL_PRODUCE_TIME)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
+                        .attributeName("TotalProduceRequestMs")
+                        .onException(Functions.constant(-1l))
+                        .enabled(retrieveUsageMetrics))
+                .pollAttribute(new 
JmxAttributePollConfig<Double>(MAX_PRODUCE_TIME)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
+                        .attributeName("MaxProduceRequestMs")
+                        .onException(Functions.constant(-1.0d))
+                        .enabled(retrieveUsageMetrics))
+                .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_RECEIVED)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
+                        .attributeName("TotalBytesRead")
+                        .onException(Functions.constant(-1l))
+                        .enabled(retrieveUsageMetrics))
+                .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_SENT)
+                        .objectName(SOCKET_SERVER_STATS_MBEAN)
+                        .attributeName("TotalBytesWritten")
+                        .onException(Functions.constant(-1l))
+                        .enabled(retrieveUsageMetrics))
+                .build();
+        }
+
+        setBrokerUrl();
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        disconnectServiceUpIsRunning();
+        if (jmxFeed != null) jmxFeed.stop();
+    }
+
+    @Override
+    protected ToStringHelper toStringHelper() {
+        return super.toStringHelper()
+                .add("kafkaPort", getKafkaPort());
+    }
+
+    /** Use the {@link #getZookeeper() zookeeper} details if available, 
otherwise use our own host and port. */
+    @Override
+    public void setBrokerUrl() {
+        ZooKeeperNode zookeeper = getZookeeper();
+        if (zookeeper != null) {
+            setAttribute(BROKER_URL, String.format("zookeeper://%s:%d", 
zookeeper.getAttribute(HOSTNAME), zookeeper.getZookeeperPort()));
+        } else {
+            setAttribute(BROKER_URL, String.format("kafka://%s:%d", 
getAttribute(HOSTNAME), getKafkaPort()));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
new file mode 100644
index 0000000..df9b67d
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.java.UsesJmx;
+import brooklyn.entity.java.UsesJmx.JmxAgentModes;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+
+public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements 
KafkaBrokerDriver {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBrokerSshDriver.class);
+
+    public KafkaBrokerSshDriver(KafkaBrokerImpl entity, SshMachineLocation 
machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    protected Map<String, Integer> getPortMap() {
+        return MutableMap.of("kafkaPort", getKafkaPort());
+    }
+
+    @Override
+    protected ConfigKey<String> getConfigTemplateKey() {
+        return KafkaBroker.KAFKA_BROKER_CONFIG_TEMPLATE;
+    }
+
+    @Override
+    protected String getConfigFileName() {
+        return "server.properties";
+    }
+
+    @Override
+    protected String getLaunchScriptName() {
+        return "kafka-server-start.sh";
+    }
+
+    @Override
+    public String getTopicsScriptName() {
+        return "kafka-topics.sh";
+    }
+
+    @Override
+    protected String getProcessIdentifier() {
+        return "kafka\\.Kafka";
+    }
+
+    @Override
+    public Integer getKafkaPort() {
+        return getEntity().getAttribute(KafkaBroker.KAFKA_PORT);
+    }
+
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        JmxAgentModes jmxAgentMode = 
getEntity().getConfig(KafkaBroker.JMX_AGENT_MODE);
+        String jmxPort;
+        if (jmxAgentMode == JmxAgentModes.NONE) {
+            // seems odd to pass RMI port here, as it gets assigned to 
com.sun.mgmt.jmx.port in kafka-run-class.sh
+            // but RMI server/registry port works, whereas JMX port does not
+            jmxPort = String.valueOf(entity.getAttribute(UsesJmx.JMX_PORT));
+        } else {
+            /*
+             * See ./bin/kafka-server-start.sh  and ./bin/kafka-run-class.sh
+             * Really hard to turn off jmxremote on kafka! And can't use 
default because
+             * uses 9999, which means could only run one kafka broker per 
server.
+             */
+            jmxPort = 
String.valueOf(entity.getAttribute(KafkaBroker.INTERNAL_JMX_PORT));
+        }
+
+        return MutableMap.<String, String> builder()
+                .putAll(super.getShellEnvironment())
+                .put("JMX_PORT", jmxPort)
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaCluster.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaCluster.java
new file mode 100644
index 0000000..c512400
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaCluster.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynConfigKeys;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.group.Cluster;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.trait.Resizable;
+import brooklyn.entity.trait.Startable;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.util.time.Duration;
+
+/**
+ * Provides Kafka cluster functionality through a group of {@link KafkaBroker 
brokers} controlled
+ * by a single {@link KafkaZookeeper zookeeper} entity.
+ * <p>
+ * You can customise the Kafka zookeeper and brokers by supplying {@link 
EntitySpec entity specifications}
+ * to be used when creating them. An existing {@link Zookeeper} entity may 
also be provided instead of the
+ * Kafka zookeeper.
+ * <p>
+ * The contents of this entity are:
+ * <ul>
+ * <li>a {@link brooklyn.entity.group.DynamicCluster} of {@link KafkaBroker}s
+ * <li>a {@link KafkaZookeeper} or {@link Zookeeper}
+ * <li>a {@link org.apache.brooklyn.api.policy.Policy} to resize the broker 
cluster
+ * </ul>
+ * The {@link Group group} and {@link Resizable} interface methods are 
delegated to the broker cluster, so calling
+ * {@link Resizable#resize(Integer) resize} will change the number of brokers.
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+@Catalog(name="Kafka", description="Apache Kafka is a distributed 
publish-subscribe messaging system", 
iconUrl="classpath://org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg")
+@ImplementedBy(KafkaClusterImpl.class)
+public interface KafkaCluster extends Entity, Startable, Resizable, Group  {
+
+    @SetFromFlag("startTimeout")
+    ConfigKey<Duration> START_TIMEOUT = BrooklynConfigKeys.START_TIMEOUT;
+
+    @SetFromFlag("initialSize")
+    ConfigKey<Integer> INITIAL_SIZE = 
ConfigKeys.newConfigKeyWithDefault(Cluster.INITIAL_SIZE, 1);
+
+    /** Zookeeper for the cluster. If null a default be will created. */
+    @SetFromFlag("zookeeper")
+    BasicAttributeSensorAndConfigKey<ZooKeeperNode> ZOOKEEPER = new 
BasicAttributeSensorAndConfigKey<ZooKeeperNode>(
+            ZooKeeperNode.class, "kafka.cluster.zookeeper", "The zookeeper for 
the cluster; if null a default be will created");
+
+    /** Spec for creating the default Kafka zookeeper entity. */
+    @SetFromFlag("zookeeperSpec")
+    BasicAttributeSensorAndConfigKey<EntitySpec<KafkaZooKeeper>> 
ZOOKEEPER_SPEC = new BasicAttributeSensorAndConfigKey(
+            EntitySpec.class, "kafka.cluster.zookeeperSpec", "Spec for 
creating the kafka zookeeper");
+
+    /** Spec for Kafka broker entities to be created. */
+    @SetFromFlag("brokerSpec")
+    BasicAttributeSensorAndConfigKey<EntitySpec<KafkaBroker>> BROKER_SPEC = 
new BasicAttributeSensorAndConfigKey(
+            EntitySpec.class, "kafka.cluster.brokerSpec", "Spec for Kafka 
broker entiites to be created");
+
+    /** Underlying Kafka broker cluster. */
+    AttributeSensor<DynamicCluster> CLUSTER = new 
BasicAttributeSensor<DynamicCluster>(
+            DynamicCluster.class, "kafka.cluster.brokerCluster", "Underlying 
Kafka broker cluster");
+
+    ZooKeeperNode getZooKeeper();
+
+    DynamicCluster getCluster();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
new file mode 100644
index 0000000..933d8ce
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.trait.Startable;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
+import brooklyn.event.feed.ConfigToAttributes;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.exceptions.CompoundRuntimeException;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Implementation of a Kafka cluster containing a {@link KafkaZookeeper} node 
and a group of {@link KafkaBroker}s.
+ */
+public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster {
+
+    public static final Logger log = 
LoggerFactory.getLogger(KafkaClusterImpl.class);
+
+    public KafkaClusterImpl() {
+    }
+
+    @Override
+    public void init() {
+        super.init();
+        
+        setAttribute(SERVICE_UP, false);
+        ConfigToAttributes.apply(this, BROKER_SPEC);
+        ConfigToAttributes.apply(this, ZOOKEEPER);
+        ConfigToAttributes.apply(this, ZOOKEEPER_SPEC);
+
+        log.debug("creating zookeeper child for {}", this);
+        ZooKeeperNode zookeeper = getAttribute(ZOOKEEPER);
+        if (zookeeper == null) {
+            EntitySpec<KafkaZooKeeper> zookeeperSpec = 
getAttribute(ZOOKEEPER_SPEC);
+            if (zookeeperSpec == null) {
+                log.debug("creating zookeeper using default spec for {}", 
this);
+                zookeeperSpec = EntitySpec.create(KafkaZooKeeper.class);
+                setAttribute(ZOOKEEPER_SPEC, zookeeperSpec);
+            } else {
+                log.debug("creating zookeeper using custom spec for {}", this);
+            }
+            zookeeper = addChild(zookeeperSpec);
+            if (Entities.isManaged(this)) Entities.manage(zookeeper);
+            setAttribute(ZOOKEEPER, zookeeper);
+        }
+
+        log.debug("creating cluster child for {}", this);
+        EntitySpec<KafkaBroker> brokerSpec = getAttribute(BROKER_SPEC);
+        if (brokerSpec == null) {
+            log.debug("creating default broker spec for {}", this);
+            brokerSpec = EntitySpec.create(KafkaBroker.class);
+            setAttribute(BROKER_SPEC, brokerSpec);
+        }
+        // Relies on initialSize being inherited by DynamicCluster, because 
key id is identical
+        // We add the zookeeper configuration to the KafkaBroker specification 
here
+        DynamicCluster cluster = 
addChild(EntitySpec.create(DynamicCluster.class)
+                .configure("memberSpec", 
EntitySpec.create(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper)));
+        if (Entities.isManaged(this)) Entities.manage(cluster);
+        setAttribute(CLUSTER, cluster);
+        
+        connectSensors();
+    }
+
+    @Override
+    public ZooKeeperNode getZooKeeper() {
+        return getAttribute(ZOOKEEPER);
+    }
+
+    @Override
+    public DynamicCluster getCluster() {
+        return getAttribute(CLUSTER);
+    }
+
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        if (isLegacyConstruction()) {
+            init();
+        }
+
+        if (locations.isEmpty()) locations = getLocations();
+        Iterables.getOnlyElement(locations); // Assert just one
+        addLocations(locations);
+
+        List<Entity> childrenToStart = MutableList.<Entity>of(getCluster());
+        // Set the KafkaZookeeper entity as child of cluster, if it does not 
already have a parent
+        if (getZooKeeper().getParent() == null) {
+            addChild(getZooKeeper());
+        } // And only start zookeeper if we are parent
+        if (Objects.equal(this, getZooKeeper().getParent())) 
childrenToStart.add(getZooKeeper());
+        Entities.invokeEffector(this, childrenToStart, Startable.START, 
ImmutableMap.of("locations", locations)).getUnchecked();
+    }
+
+    @Override
+    public void stop() {
+        List<Exception> errors = Lists.newArrayList();
+        if (getZooKeeper() != null && Objects.equal(this, 
getZooKeeper().getParent())) {
+            try {
+                getZooKeeper().stop();
+            } catch (Exception e) {
+                errors.add(e);
+            }
+        }
+        if (getCurrentSize() > 0) {
+            try {
+                getCluster().stop();
+            } catch (Exception e) {
+                errors.add(e);
+            }
+        }
+
+        clearLocations();
+        setAttribute(SERVICE_UP, false);
+
+        if (errors.size() != 0) {
+            throw new CompoundRuntimeException("Error stopping Kafka cluster", 
errors);
+        }
+    }
+
+    @Override
+    public void restart() {
+        // TODO prod the entities themselves to restart, instead?
+        Collection<Location> locations = Lists.newArrayList(getLocations());
+
+        stop();
+        start(locations);
+    }
+
+    void connectSensors() {
+        addEnricher(Enrichers.builder()
+                .propagatingAllBut(SERVICE_UP)
+                .from(getCluster())
+                .build());
+        addEnricher(Enrichers.builder()
+                .propagating(SERVICE_UP)
+                .from(getZooKeeper())
+                .build());
+    }
+
+    /*
+     * All Group and Resizable interface methods are delegated to the broker 
cluster.
+     */
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<Entity> getMembers() { return getCluster().getMembers(); 
}
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean hasMember(Entity member) { return 
getCluster().hasMember(member); }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean addMember(Entity member) { return 
getCluster().addMember(member); }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean removeMember(Entity member) { return 
getCluster().removeMember(member); }
+
+    /** {@inheritDoc} */
+    @Override
+    public Integer getCurrentSize() { return getCluster().getCurrentSize(); }
+
+    /** {@inheritDoc} */
+    @Override
+    public Integer resize(Integer desiredSize) { return 
getCluster().resize(desiredSize); }
+
+    @Override
+    public <T extends Entity> T addMemberChild(EntitySpec<T> spec) { return 
getCluster().addMemberChild(spec); }
+
+    @Override
+    public <T extends Entity> T addMemberChild(T child) { return 
getCluster().addMemberChild(child); }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
new file mode 100644
index 0000000..9bdc33c
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.annotation.EffectorParam;
+import brooklyn.entity.basic.SoftwareProcess;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.util.time.Duration;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single 
Kafka zookeeper instance.
+ */
+@ImplementedBy(KafkaZooKeeperImpl.class)
+public interface KafkaZooKeeper extends ZooKeeperNode, Kafka {
+
+    @SetFromFlag("startTimeout")
+    ConfigKey<Duration> START_TIMEOUT = SoftwareProcess.START_TIMEOUT;
+
+    /** The Kafka version, not the Zookeeper version. */
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION;
+    
+    /** The Kafka version, not the Zookeeper version. */
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = Kafka.DOWNLOAD_URL;
+
+    /** Location of the kafka configuration file template to be copied to the 
server. */
+    @SetFromFlag("kafkaZookeeperConfig")
+    ConfigKey<String> KAFKA_ZOOKEEPER_CONFIG_TEMPLATE = new 
BasicConfigKey<String>(String.class,
+            "kafka.zookeeper.configTemplate", "Kafka zookeeper configuration 
template (in freemarker format)",
+            
"classpath://org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties");
+
+    @Effector(description = "Create a topic with a single partition and only 
one replica")
+    void createTopic(@EffectorParam(name = "topic") String topic);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
new file mode 100644
index 0000000..f08736d
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+
+public interface KafkaZooKeeperDriver extends JavaSoftwareProcessDriver {
+
+    Integer getZookeeperPort();
+
+    void createTopic(String topic);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
new file mode 100644
index 0000000..7764450
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.annotation.EffectorParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.brooklyn.entity.zookeeper.AbstractZooKeeperImpl;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single 
Kafka zookeeper instance.
+ */
+public class KafkaZooKeeperImpl extends AbstractZooKeeperImpl implements 
KafkaZooKeeper {
+
+    @SuppressWarnings("unused")
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaZooKeeperImpl.class);
+
+    public KafkaZooKeeperImpl() {
+    }
+
+    @Override
+    public Class<?> getDriverInterface() {
+        return KafkaZooKeeperDriver.class;
+    }
+
+    @Override
+    public void createTopic(String topic) {
+        ((KafkaZooKeeperDriver)getDriver()).createTopic(topic);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
new file mode 100644
index 0000000..85ab649
--- /dev/null
+++ 
b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.kafka;
+
+import java.util.Map;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.Attributes;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+
+import static 
brooklyn.util.text.StringEscapes.BashStringEscapes.escapeLiteralForDoubleQuotedBash;
+
+public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver 
implements KafkaZooKeeperDriver {
+
+    public KafkaZooKeeperSshDriver(KafkaZooKeeperImpl entity, 
SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    protected Map<String, Integer> getPortMap() {
+        return MutableMap.of("zookeeperPort", getZookeeperPort());
+    }
+
+    @Override
+    protected ConfigKey<String> getConfigTemplateKey() {
+        return KafkaZooKeeper.KAFKA_ZOOKEEPER_CONFIG_TEMPLATE;
+    }
+
+    @Override
+    protected String getConfigFileName() {
+        return "zookeeper.properties";
+    }
+
+    @Override
+    protected String getLaunchScriptName() {
+        return "zookeeper-server-start.sh";
+    }
+
+    @Override
+    protected String getTopicsScriptName() {
+        return "kafka-topics.sh";
+    }
+
+    @Override
+    protected String getProcessIdentifier() {
+        return "quorum\\.QuorumPeerMain";
+    }
+
+    @Override
+    public Integer getZookeeperPort() {
+        return getEntity().getAttribute(KafkaZooKeeper.ZOOKEEPER_PORT);
+    }
+
+    @Override
+    public void createTopic(String topic) {
+        String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + 
":" + getZookeeperPort();
+        newScript(CUSTOMIZING)
+                .failOnNonZeroResultCode()
+                .body.append(String.format("./bin/%s  --create --zookeeper 
\"%s\" --replication-factor 1 --partitions 1 --topic \"%s\"",
+                                           getTopicsScriptName(),
+                                           
escapeLiteralForDoubleQuotedBash(zookeeperUrl),
+                                           
escapeLiteralForDoubleQuotedBash(topic)))
+                .execute();
+    }
+}

Reply via email to