http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDriver.java new file mode 100644 index 0000000..99a7b86 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDriver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import brooklyn.entity.java.JavaSoftwareProcessDriver; + +public interface ActiveMQDriver extends JavaSoftwareProcessDriver { + + String getBrokerName(); + + Integer getOpenWirePort(); +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueue.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueue.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueue.java new file mode 100644 index 0000000..e8c8a15 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueue.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; + +import org.apache.brooklyn.entity.messaging.Queue; + +@ImplementedBy(ActiveMQQueueImpl.class) +public interface ActiveMQQueue extends ActiveMQDestination, Queue { +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueueImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueueImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueueImpl.java new file mode 100644 index 0000000..63d5c1c --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQQueueImpl.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; + +public class ActiveMQQueueImpl extends ActiveMQDestinationImpl implements ActiveMQQueue { + public static final Logger log = LoggerFactory.getLogger(ActiveMQQueue.class); + + public ActiveMQQueueImpl() { + } + + @Override + public void onManagementStarting() { + super.onManagementStarting(); + setAttribute(QUEUE_NAME, getName()); + } + + public String getQueueName() { + return getName(); + } + + public void create() { + log.debug("{} adding queue {} to broker {}", new Object[] {this, getName(), jmxHelper.getAttribute(brokerMBeanName, "BrokerName")}); + + jmxHelper.operation(brokerMBeanName, "addQueue", getName()); + + connectSensors(); + } + + public void delete() { + jmxHelper.operation(brokerMBeanName, "removeQueue", getName()); + disconnectSensors(); + } + + @Override + protected void connectSensors() { + String queue = String.format("org.apache.activemq:type=Broker,brokerName=%s,destinationType=Queue,destinationName=%s", getBrokerName(), getName()); + + jmxFeed = JmxFeed.builder() + .entity(this) + .helper(jmxHelper) + .pollAttribute(new JmxAttributePollConfig<Integer>(QUEUE_DEPTH_MESSAGES) + .objectName(queue) + .attributeName("QueueSize")) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSpecs.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSpecs.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSpecs.java new file mode 100644 index 0000000..813cba0 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSpecs.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; + +public class ActiveMQSpecs { + + public static EntitySpec<ActiveMQBroker> brokerSpec() { + return EntitySpec.create(ActiveMQBroker.class); + } + + public static EntitySpec<ActiveMQBroker> brokerSpecChef() { + return EntitySpec.create(ActiveMQBroker.class); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSshDriver.java new file mode 100644 index 0000000..77602e4 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQSshDriver.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import static java.lang.String.format; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.net.Networking; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; + +import com.google.common.collect.ImmutableMap; + +public class ActiveMQSshDriver extends JavaSoftwareProcessSshDriver implements ActiveMQDriver { + + public ActiveMQSshDriver(ActiveMQBrokerImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + protected String getLogFileLocation() { + return Os.mergePathsUnix(getRunDir(), "data/activemq.log"); + } + + @Override + public String getBrokerName() { + return entity.getAttribute(ActiveMQBroker.BROKER_NAME); + } + + @Override + public Integer getOpenWirePort() { + return entity.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT); + } + + public String getMirrorUrl() { + return entity.getConfig(ActiveMQBroker.MIRROR_URL); + } + + protected String getTemplateConfigurationUrl() { + return entity.getAttribute(ActiveMQBroker.TEMPLATE_CONFIGURATION_URL); + } + + public String getPidFile() { + return "data/activemq.pid"; + } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("apache-activemq-%s", getVersion())))); + } + + @Override + public void install() { + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + List<String> commands = new LinkedList<String>(); + commands.addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)); + commands.add(BashCommands.INSTALL_TAR); + commands.add("tar xzfv "+saveAs); + + newScript(INSTALLING) + .body.append(commands) + .execute(); + } + + @Override + public void customize() { + Networking.checkPortsValid(ImmutableMap.of("jmxPort", getJmxPort(), "openWirePort", getOpenWirePort())); + newScript(CUSTOMIZING) + .body.append( + format("cp -R %s/{bin,conf,data,lib,webapps} .", getExpandedInstallDir()), + // Required in version 5.5.1 (at least), but not in version 5.7.0 + "sed -i.bk 's/\\[-z \"$JAVA_HOME\"]/\\[ -z \"$JAVA_HOME\" ]/g' bin/activemq", + // Stop it writing to dev null on start + "sed -i.bk \"s/\\(ACTIVEMQ_HOME..bin.run.jar.*\\)>.dev.null/\\1/\" bin/activemq", + // Required if launching multiple AMQ's, prevent jetty port conflicts + "sed -i.bk 's/8161/"+getEntity().getAttribute(ActiveMQBroker.AMQ_JETTY_PORT)+"/g' conf/jetty.xml" + // TODO disable persistence (this should be a flag -- but it seems to have no effect, despite ): + // "sed -i.bk 's/broker /broker persistent=\"false\" /g' conf/activemq.xml", + ) + .execute(); + + // Copy the configuration file across + String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/activemq.xml"); + copyTemplate(getTemplateConfigurationUrl(), destinationConfigFile); + } + + @Override + public void launch() { + // Using nohup, as recommended at http://activemq.apache.org/run-broker.html + newScript(ImmutableMap.of(USE_PID_FILE, false), LAUNCHING) + .body.append("nohup ./bin/activemq start > ./data/activemq-extra.log 2>&1 &") + .execute(); + } + + @Override + public boolean isRunning() { + return newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; + } + + @Override + public void stop() { + newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); + } + + @Override + public void kill() { + newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), KILLING).execute(); + } + + @Override + public Map<String, String> getShellEnvironment() { + return MutableMap.<String,String>builder() + .putAll(super.getShellEnvironment()) + .put("ACTIVEMQ_HOME", getRunDir()) + .put("ACTIVEMQ_PIDFILE", getPidFile()) + .renameKey("JAVA_OPTS", "ACTIVEMQ_OPTS") + .build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopic.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopic.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopic.java new file mode 100644 index 0000000..536ce09 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopic.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; + +import org.apache.brooklyn.entity.messaging.Topic; + +@ImplementedBy(ActiveMQTopicImpl.class) +public interface ActiveMQTopic extends ActiveMQDestination, Topic { +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopicImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopicImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopicImpl.java new file mode 100644 index 0000000..1724b1f --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQTopicImpl.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + + +public class ActiveMQTopicImpl extends ActiveMQDestinationImpl implements ActiveMQTopic { + public ActiveMQTopicImpl() { + } + + @Override + public void onManagementStarting() { + super.onManagementStarting(); + setAttribute(TOPIC_NAME, getName()); + } + + @Override + public void create() { + jmxHelper.operation(brokerMBeanName, "addTopic", getName()); + connectSensors(); + } + + public void delete() { + jmxHelper.operation(brokerMBeanName, "removeTopic", getName()); + disconnectSensors(); + } + + public void connectSensors() { + //TODO add sensors for topics + } + + public String getTopicName() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpExchange.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpExchange.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpExchange.java new file mode 100644 index 0000000..f04c116 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpExchange.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.amqp; + +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; + +/** + * An interface that describes an AMQP exchange. + */ +public interface AmqpExchange { + + /* AMQP standard exchange names. */ + + String DIRECT = "amq.direct"; + String TOPIC = "amq.topic"; + + /** The AMQP exchange name {@link Sensor}. */ + @SetFromFlag("exchange") + BasicAttributeSensorAndConfigKey<String> EXCHANGE_NAME = new BasicAttributeSensorAndConfigKey<String>( + String.class, "amqp.exchange.name", "AMQP exchange name"); + + /** + * Return the AMQP exchange name. + */ + public String getExchangeName(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpServer.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpServer.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpServer.java new file mode 100644 index 0000000..97cce88 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/amqp/AmqpServer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.amqp; + +import org.apache.brooklyn.api.entity.Entity; + +import brooklyn.entity.basic.Attributes; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; + +/** + * Marker interface identifying AMQP servers. + */ +public interface AmqpServer extends Entity { + + /* AMQP protocol version strings. */ + + String AMQP_0_8 = "0-8"; + String AMQP_0_9 = "0-9"; + String AMQP_0_9_1 = "0-9-1"; + String AMQP_0_10 = "0-10"; + String AMQP_1_0 = "1-0"; + + PortAttributeSensorAndConfigKey AMQP_PORT = Attributes.AMQP_PORT; + + BasicAttributeSensorAndConfigKey<String> VIRTUAL_HOST_NAME = new BasicAttributeSensorAndConfigKey<String>( + String.class, "amqp.virtualHost", "AMQP virtual host name", "localhost"); + + BasicAttributeSensorAndConfigKey<String> AMQP_VERSION = new BasicAttributeSensorAndConfigKey<String>( + String.class, "amqp.version", "AMQP protocol version"); + + String getVirtualHost(); + + String getAmqpVersion(); + + Integer getAmqpPort(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBroker.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBroker.java new file mode 100644 index 0000000..a83d259 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBroker.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.jms; + +import java.util.Collection; +import java.util.Map; + +import brooklyn.entity.basic.SoftwareProcess; +import org.apache.brooklyn.entity.messaging.MessageBroker; +import org.apache.brooklyn.entity.messaging.Queue; +import org.apache.brooklyn.entity.messaging.Topic; + +import com.google.common.annotations.VisibleForTesting; + +public interface JMSBroker<Q extends JMSDestination & Queue, T extends JMSDestination & Topic> extends SoftwareProcess, MessageBroker { + + @VisibleForTesting + public Collection<String> getQueueNames(); + + @VisibleForTesting + public Collection<String> getTopicNames(); + + @VisibleForTesting + public Map<String, Q> getQueues(); + + @VisibleForTesting + public Map<String, T> getTopics(); + + /** TODO make this an effector */ + public void addQueue(String name); + + public void addQueue(String name, Map properties); + + public Q createQueue(Map properties); + + /** TODO make this an effector */ + public void addTopic(String name); + + public void addTopic(String name, Map properties); + + public T createTopic(Map properties); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBrokerImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBrokerImpl.java new file mode 100644 index 0000000..6fa16a0 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSBrokerImpl.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.jms; + +import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; + +import java.util.Collection; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.SoftwareProcessImpl; +import org.apache.brooklyn.entity.messaging.Queue; +import org.apache.brooklyn.entity.messaging.Topic; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public abstract class JMSBrokerImpl<Q extends JMSDestination & Queue, T extends JMSDestination & Topic> extends SoftwareProcessImpl implements JMSBroker<Q,T> { + private static final Logger log = LoggerFactory.getLogger(JMSBroker.class); + + Collection<String> queueNames; + Collection<String> topicNames; + Map<String, Q> queues = Maps.newLinkedHashMap(); + Map<String, T> topics = Maps.newLinkedHashMap(); + + public JMSBrokerImpl() { + } + + @Override + public JMSBrokerImpl configure(Map properties) { + if (queueNames==null) queueNames = Lists.newArrayList(); + if (groovyTruth(properties.get("queue"))) queueNames.add((String) properties.remove("queue")); + if (groovyTruth(properties.get("queues"))) queueNames.addAll((Collection<String>) properties.remove("queues")); + + if (topicNames==null) topicNames = Lists.newArrayList(); + if (groovyTruth(properties.get("topic"))) topicNames.add((String) properties.remove("topic")); + if (groovyTruth(properties.get("topics"))) topicNames.addAll((Collection<String>) properties.remove("topics")); + + return (JMSBrokerImpl) super.configure(properties); + } + + @Override + public Collection<String> getQueueNames() { + return queueNames; + } + + @Override + public Collection<String> getTopicNames() { + return topicNames; + } + + @Override + public Map<String, Q> getQueues() { + return queues; + } + + @Override + public Map<String, T> getTopics() { + return topics; + } + + @Override + protected void connectSensors() { + super.connectSensors(); + setBrokerUrl(); + } + + // should be called after sensor-polling is activated etc + @Override + protected void postStart() { + super.postStart(); + // stupid to do this here, but there appears to be a race where sometimes the + // broker throws a BrokerStopped exception, even though the sensor indicates it is up + Time.sleep(Duration.FIVE_SECONDS); + for (String name : queueNames) { + addQueue(name); + } + for (String name : topicNames) { + addTopic(name); + } + } + + @Override + public abstract void setBrokerUrl(); + + @Override + public void preStop() { + // If can't delete queues, continue trying to stop. + // (e.g. in CI have seen activemq "BrokerStoppedException" thrown in queue.destroy()). + try { + for (JMSDestination queue : queues.values()) { + queue.destroy(); + } + } catch (Exception e) { + log.warn("Error deleting queues from broker "+this+"; continuing with stop...", e); + } + + try { + for (JMSDestination topic : topics.values()) { + topic.destroy(); + } + } catch (Exception e) { + log.warn("Error deleting topics from broker "+this+"; continuing with stop...", e); + } + + super.preStop(); + } + + @Override + public void addQueue(String name) { + addQueue(name, MutableMap.of()); + } + + public void checkStartingOrRunning() { + Lifecycle state = getAttribute(SERVICE_STATE_ACTUAL); + if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.RUNNING) return; + if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.STARTING) return; + // TODO this check may be redundant or even inappropriate + throw new IllegalStateException("Cannot run against "+this+" in state "+state); + } + + @Override + public void addQueue(String name, Map properties) { + checkStartingOrRunning(); + properties.put("name", name); + queues.put(name, createQueue(properties)); + } + + @Override + public abstract Q createQueue(Map properties); + + @Override + public void addTopic(String name) { + addTopic(name, MutableMap.of()); + } + + @Override + public void addTopic(String name, Map properties) { + checkStartingOrRunning(); + properties.put("name", name); + topics.put(name, createTopic(properties)); + } + + @Override + public abstract T createTopic(Map properties); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestination.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestination.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestination.java new file mode 100644 index 0000000..5591d66 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestination.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.jms; + +import org.apache.brooklyn.api.entity.Entity; + +public interface JMSDestination extends Entity { + public String getName(); + + public void delete(); + + public void destroy(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestinationImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestinationImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestinationImpl.java new file mode 100644 index 0000000..dbd100f --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/jms/JMSDestinationImpl.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.jms; + +import brooklyn.entity.basic.AbstractEntity; + +import com.google.common.base.Preconditions; + +public abstract class JMSDestinationImpl extends AbstractEntity implements JMSDestination { + public JMSDestinationImpl() { + } + + @Override + public void onManagementStarting() { + super.onManagementStarting(); + Preconditions.checkNotNull(getName(), "Name must be specified"); + } + + @Override + public String getName() { + return getDisplayName(); + } + + protected abstract void connectSensors(); + + protected abstract void disconnectSensors(); + + public abstract void delete(); + + public void destroy() { + disconnectSensors(); + delete(); + super.destroy(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java new file mode 100644 index 0000000..d101343 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import static java.lang.String.format; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.net.Networking; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; + +public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriver { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(KafkaZooKeeperSshDriver.class); + + public AbstractfKafkaSshDriver(EntityLocal entity, SshMachineLocation machine) { + super(entity, machine); + } + + protected abstract Map<String, Integer> getPortMap(); + + protected abstract ConfigKey<String> getConfigTemplateKey(); + + protected abstract String getConfigFileName(); + + protected abstract String getLaunchScriptName(); + + protected abstract String getTopicsScriptName(); + + protected abstract String getProcessIdentifier(); + + @Override + protected String getLogFileLocation() { return Os.mergePaths(getRunDir(), "console.out"); } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("kafka_%s", getVersion())))); + } + + @Override + public void install() { + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + List<String> commands = new LinkedList<String>(); + commands.addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)); + commands.add(BashCommands.INSTALL_TAR); + commands.add("tar xzfv "+saveAs); + commands.add("cd "+getExpandedInstallDir()); + + newScript(INSTALLING) + .body.append(commands) + .execute(); + } + + @Override + public void customize() { + Networking.checkPortsValid(getPortMap()); + + newScript(CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir())) + .execute(); + + String config = entity.getConfig(getConfigTemplateKey()); + copyTemplate(config, getConfigFileName()); + } + + @Override + public void launch() { + newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING) + .failOnNonZeroResultCode() + .body.append(String.format("nohup ./bin/%s ./%s > console.out 2>&1 &", getLaunchScriptName(), getConfigFileName())) + .execute(); + } + + public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "kafka.pid"); } + + @Override + public boolean isRunning() { + return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; + } + + @Override + public void stop() { + newScript(MutableMap.of(USE_PID_FILE, false), STOPPING) + .body.append(String.format("ps ax | grep %s | awk '{print $1}' | xargs kill", getProcessIdentifier())) + .body.append(String.format("ps ax | grep %s | awk '{print $1}' | xargs kill -9", getProcessIdentifier())) + .execute(); + } + + /** + * Use RMI agent to provide JMX. + */ + @Override + public Map<String, String> getShellEnvironment() { + return MutableMap.<String, String>builder() + .putAll(super.getShellEnvironment()) + .renameKey("JAVA_OPTS", "KAFKA_JMX_OPTS") + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/Kafka.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/Kafka.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/Kafka.java new file mode 100644 index 0000000..ed34c1e --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/Kafka.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; + +/** + * Shared Kafka broker and zookeeper properties. + */ +public interface Kafka { + + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.9.2-0.8.2.1"); + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + Attributes.DOWNLOAD_URL, "http://apache.cbox.biz/kafka/0.8.2.1/kafka_${version}.tgz"); + + // TODO: Upgrade to version 0.8.0, which will require refactoring of the sensors to reflect the changes to the JMX beans +// @SetFromFlag("downloadUrl") +// BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( +// Attributes.DOWNLOAD_URL, "http://mirror.catn.com/pub/apache/kafka/${version}/kafka-${version}-src.tgz"); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBroker.java new file mode 100644 index 0000000..6ae6b33 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBroker.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.java.UsesJmx; +import org.apache.brooklyn.entity.messaging.MessageBroker; +import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; + +import org.apache.brooklyn.location.basic.PortRanges; + +import brooklyn.util.time.Duration; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka broker instance. + */ +@ImplementedBy(KafkaBrokerImpl.class) +public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Kafka { + + @SetFromFlag("startTimeout") + ConfigKey<Duration> START_TIMEOUT = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.START_TIMEOUT, Duration.FIVE_MINUTES); + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION; + + @SetFromFlag("kafkaPort") + PortAttributeSensorAndConfigKey KAFKA_PORT = new PortAttributeSensorAndConfigKey("kafka.port", "Kafka port", "9092+"); + + /** Location of the configuration file template to be copied to the server.*/ + @SetFromFlag("kafkaServerConfig") + ConfigKey<String> KAFKA_BROKER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class, + "kafka.broker.configTemplate", "Kafka broker configuration template (in freemarker format)", + "classpath://org/apache/brooklyn/entity/messaging/kafka/server.properties"); + + @SetFromFlag("zookeeper") + ConfigKey<ZooKeeperNode> ZOOKEEPER = new BasicConfigKey<ZooKeeperNode>(ZooKeeperNode.class, "kafka.broker.zookeeper", "Kafka zookeeper entity"); + + PortAttributeSensorAndConfigKey INTERNAL_JMX_PORT = new PortAttributeSensorAndConfigKey( + "internal.jmx.direct.port", "JMX internal port (started by Kafka broker, if using UsesJmx.JMX_AGENT_MODE is not null)", PortRanges.fromString("9999+")); + + AttributeSensor<Integer> BROKER_ID = Sensors.newIntegerSensor("kafka.broker.id", "Kafka unique broker ID"); + + AttributeSensor<Long> FETCH_REQUEST_COUNT = Sensors.newLongSensor("kafka.broker.fetch.total", "Fetch request count"); + AttributeSensor<Long> TOTAL_FETCH_TIME = Sensors.newLongSensor("kafka.broker.fetch.time.total", "Total fetch request processing time (millis)"); + AttributeSensor<Double> MAX_FETCH_TIME = Sensors.newDoubleSensor("kafka.broker.fetch.time.max", "Max fetch request processing time (millis)"); + + AttributeSensor<Long> PRODUCE_REQUEST_COUNT = Sensors.newLongSensor("kafka.broker.produce.total", "Produce request count"); + AttributeSensor<Long> TOTAL_PRODUCE_TIME = Sensors.newLongSensor("kafka.broker.produce.time.total", "Total produce request processing time (millis)"); + AttributeSensor<Double> MAX_PRODUCE_TIME = Sensors.newDoubleSensor("kafka.broker.produce.time.max", "Max produce request processing time (millis)"); + + AttributeSensor<Long> BYTES_RECEIVED = Sensors.newLongSensor("kafka.broker.bytes.received", "Total bytes received"); + AttributeSensor<Long> BYTES_SENT = Sensors.newLongSensor("kafka.broker.bytes.sent", "Total bytes sent"); + + Integer getKafkaPort(); + + Integer getBrokerId(); + + ZooKeeperNode getZookeeper(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java new file mode 100644 index 0000000..357dae8 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import brooklyn.entity.java.JavaSoftwareProcessDriver; + +public interface KafkaBrokerDriver extends JavaSoftwareProcessDriver { + + Integer getKafkaPort(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java new file mode 100644 index 0000000..b8a9076 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.management.ObjectName; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.SoftwareProcessImpl; +import org.apache.brooklyn.entity.messaging.MessageBroker; +import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode; +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.event.feed.jmx.JmxHelper; + +import com.google.common.base.Functions; +import com.google.common.base.Objects.ToStringHelper; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka broker instance. + */ +public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class); + private static final ObjectName SOCKET_SERVER_STATS_MBEAN = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats"); + + private volatile JmxFeed jmxFeed; + + public KafkaBrokerImpl() { + super(); + } + + @Override + public void init() { + super.init(); + setAttribute(BROKER_ID, Math.abs(hashCode())); // Must be positive for partitioning to work + } + + @Override + public Integer getKafkaPort() { return getAttribute(KAFKA_PORT); } + + @Override + public Integer getBrokerId() { return getAttribute(BROKER_ID); } + + @Override + public ZooKeeperNode getZookeeper() { return getConfig(ZOOKEEPER); } + + @Override + public Class<?> getDriverInterface() { + return KafkaBrokerDriver.class; + } + + @Override + public void waitForServiceUp(long duration, TimeUnit units) { + super.waitForServiceUp(duration, units); + + if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) { + // Wait for the MBean to exist + JmxHelper helper = new JmxHelper(this); + try { + helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration)); + } finally { + helper.terminate(); + } + } + } + + @Override + protected void connectSensors() { + connectServiceUpIsRunning(); + boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS); + + if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) { + jmxFeed = JmxFeed.builder() + .entity(this) + .period(500, TimeUnit.MILLISECONDS) + .pollAttribute(new JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT) + .objectName(SOCKET_SERVER_STATS_MBEAN) + .attributeName("NumFetchRequests") + .onException(Functions.constant(-1l)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_FETCH_TIME) + .objectName(SOCKET_SERVER_STATS_MBEAN) + .attributeName("TotalFetchRequestMs") + .onException(Functions.constant(-1l)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Double>(MAX_FETCH_TIME) + .objectName(SOCKET_SERVER_STATS_MBEAN) + .attributeName("MaxFetchRequestMs") + .onException(Functions.constant(-1.0d)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Long>(PRODUCE_REQUEST_COUNT) + .objectName(SOCKET_SERVER_STATS_MBEAN) + .attributeName("NumProduceRequests") + .onException(Functions.constant(-1l)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_PRODUCE_TIME) + .objectName(SOCKET_SERVER_STATS_MBEAN) + .attributeName("TotalProduceRequestMs") + .onException(Functions.constant(-1l)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Double>(MAX_PRODUCE_TIME) + .objectName(SOCKET_SERVER_STATS_MBEAN) + .attributeName("MaxProduceRequestMs") + .onException(Functions.constant(-1.0d)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_RECEIVED) + .objectName(SOCKET_SERVER_STATS_MBEAN) + .attributeName("TotalBytesRead") + .onException(Functions.constant(-1l)) + .enabled(retrieveUsageMetrics)) + .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_SENT) + .objectName(SOCKET_SERVER_STATS_MBEAN) + .attributeName("TotalBytesWritten") + .onException(Functions.constant(-1l)) + .enabled(retrieveUsageMetrics)) + .build(); + } + + setBrokerUrl(); + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + if (jmxFeed != null) jmxFeed.stop(); + } + + @Override + protected ToStringHelper toStringHelper() { + return super.toStringHelper() + .add("kafkaPort", getKafkaPort()); + } + + /** Use the {@link #getZookeeper() zookeeper} details if available, otherwise use our own host and port. */ + @Override + public void setBrokerUrl() { + ZooKeeperNode zookeeper = getZookeeper(); + if (zookeeper != null) { + setAttribute(BROKER_URL, String.format("zookeeper://%s:%d", zookeeper.getAttribute(HOSTNAME), zookeeper.getZookeeperPort())); + } else { + setAttribute(BROKER_URL, String.format("kafka://%s:%d", getAttribute(HOSTNAME), getKafkaPort())); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java new file mode 100644 index 0000000..df9b67d --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.java.UsesJmx; +import brooklyn.entity.java.UsesJmx.JmxAgentModes; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; + +public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements KafkaBrokerDriver { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerSshDriver.class); + + public KafkaBrokerSshDriver(KafkaBrokerImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + protected Map<String, Integer> getPortMap() { + return MutableMap.of("kafkaPort", getKafkaPort()); + } + + @Override + protected ConfigKey<String> getConfigTemplateKey() { + return KafkaBroker.KAFKA_BROKER_CONFIG_TEMPLATE; + } + + @Override + protected String getConfigFileName() { + return "server.properties"; + } + + @Override + protected String getLaunchScriptName() { + return "kafka-server-start.sh"; + } + + @Override + public String getTopicsScriptName() { + return "kafka-topics.sh"; + } + + @Override + protected String getProcessIdentifier() { + return "kafka\\.Kafka"; + } + + @Override + public Integer getKafkaPort() { + return getEntity().getAttribute(KafkaBroker.KAFKA_PORT); + } + + @Override + public Map<String, String> getShellEnvironment() { + JmxAgentModes jmxAgentMode = getEntity().getConfig(KafkaBroker.JMX_AGENT_MODE); + String jmxPort; + if (jmxAgentMode == JmxAgentModes.NONE) { + // seems odd to pass RMI port here, as it gets assigned to com.sun.mgmt.jmx.port in kafka-run-class.sh + // but RMI server/registry port works, whereas JMX port does not + jmxPort = String.valueOf(entity.getAttribute(UsesJmx.JMX_PORT)); + } else { + /* + * See ./bin/kafka-server-start.sh and ./bin/kafka-run-class.sh + * Really hard to turn off jmxremote on kafka! And can't use default because + * uses 9999, which means could only run one kafka broker per server. + */ + jmxPort = String.valueOf(entity.getAttribute(KafkaBroker.INTERNAL_JMX_PORT)); + } + + return MutableMap.<String, String> builder() + .putAll(super.getShellEnvironment()) + .put("JMX_PORT", jmxPort) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaCluster.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaCluster.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaCluster.java new file mode 100644 index 0000000..c512400 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaCluster.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.BrooklynConfigKeys; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.group.Cluster; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.trait.Resizable; +import brooklyn.entity.trait.Startable; +import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.util.time.Duration; + +/** + * Provides Kafka cluster functionality through a group of {@link KafkaBroker brokers} controlled + * by a single {@link KafkaZookeeper zookeeper} entity. + * <p> + * You can customise the Kafka zookeeper and brokers by supplying {@link EntitySpec entity specifications} + * to be used when creating them. An existing {@link Zookeeper} entity may also be provided instead of the + * Kafka zookeeper. + * <p> + * The contents of this entity are: + * <ul> + * <li>a {@link brooklyn.entity.group.DynamicCluster} of {@link KafkaBroker}s + * <li>a {@link KafkaZookeeper} or {@link Zookeeper} + * <li>a {@link org.apache.brooklyn.api.policy.Policy} to resize the broker cluster + * </ul> + * The {@link Group group} and {@link Resizable} interface methods are delegated to the broker cluster, so calling + * {@link Resizable#resize(Integer) resize} will change the number of brokers. + */ +@SuppressWarnings({ "unchecked", "rawtypes" }) +@Catalog(name="Kafka", description="Apache Kafka is a distributed publish-subscribe messaging system", iconUrl="classpath://org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg") +@ImplementedBy(KafkaClusterImpl.class) +public interface KafkaCluster extends Entity, Startable, Resizable, Group { + + @SetFromFlag("startTimeout") + ConfigKey<Duration> START_TIMEOUT = BrooklynConfigKeys.START_TIMEOUT; + + @SetFromFlag("initialSize") + ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(Cluster.INITIAL_SIZE, 1); + + /** Zookeeper for the cluster. If null a default be will created. */ + @SetFromFlag("zookeeper") + BasicAttributeSensorAndConfigKey<ZooKeeperNode> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<ZooKeeperNode>( + ZooKeeperNode.class, "kafka.cluster.zookeeper", "The zookeeper for the cluster; if null a default be will created"); + + /** Spec for creating the default Kafka zookeeper entity. */ + @SetFromFlag("zookeeperSpec") + BasicAttributeSensorAndConfigKey<EntitySpec<KafkaZooKeeper>> ZOOKEEPER_SPEC = new BasicAttributeSensorAndConfigKey( + EntitySpec.class, "kafka.cluster.zookeeperSpec", "Spec for creating the kafka zookeeper"); + + /** Spec for Kafka broker entities to be created. */ + @SetFromFlag("brokerSpec") + BasicAttributeSensorAndConfigKey<EntitySpec<KafkaBroker>> BROKER_SPEC = new BasicAttributeSensorAndConfigKey( + EntitySpec.class, "kafka.cluster.brokerSpec", "Spec for Kafka broker entiites to be created"); + + /** Underlying Kafka broker cluster. */ + AttributeSensor<DynamicCluster> CLUSTER = new BasicAttributeSensor<DynamicCluster>( + DynamicCluster.class, "kafka.cluster.brokerCluster", "Underlying Kafka broker cluster"); + + ZooKeeperNode getZooKeeper(); + + DynamicCluster getCluster(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java new file mode 100644 index 0000000..933d8ce --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import java.util.Collection; +import java.util.List; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.location.Location; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.enricher.Enrichers; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.trait.Startable; +import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode; +import brooklyn.event.feed.ConfigToAttributes; +import brooklyn.util.collections.MutableList; +import brooklyn.util.exceptions.CompoundRuntimeException; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +/** + * Implementation of a Kafka cluster containing a {@link KafkaZookeeper} node and a group of {@link KafkaBroker}s. + */ +public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster { + + public static final Logger log = LoggerFactory.getLogger(KafkaClusterImpl.class); + + public KafkaClusterImpl() { + } + + @Override + public void init() { + super.init(); + + setAttribute(SERVICE_UP, false); + ConfigToAttributes.apply(this, BROKER_SPEC); + ConfigToAttributes.apply(this, ZOOKEEPER); + ConfigToAttributes.apply(this, ZOOKEEPER_SPEC); + + log.debug("creating zookeeper child for {}", this); + ZooKeeperNode zookeeper = getAttribute(ZOOKEEPER); + if (zookeeper == null) { + EntitySpec<KafkaZooKeeper> zookeeperSpec = getAttribute(ZOOKEEPER_SPEC); + if (zookeeperSpec == null) { + log.debug("creating zookeeper using default spec for {}", this); + zookeeperSpec = EntitySpec.create(KafkaZooKeeper.class); + setAttribute(ZOOKEEPER_SPEC, zookeeperSpec); + } else { + log.debug("creating zookeeper using custom spec for {}", this); + } + zookeeper = addChild(zookeeperSpec); + if (Entities.isManaged(this)) Entities.manage(zookeeper); + setAttribute(ZOOKEEPER, zookeeper); + } + + log.debug("creating cluster child for {}", this); + EntitySpec<KafkaBroker> brokerSpec = getAttribute(BROKER_SPEC); + if (brokerSpec == null) { + log.debug("creating default broker spec for {}", this); + brokerSpec = EntitySpec.create(KafkaBroker.class); + setAttribute(BROKER_SPEC, brokerSpec); + } + // Relies on initialSize being inherited by DynamicCluster, because key id is identical + // We add the zookeeper configuration to the KafkaBroker specification here + DynamicCluster cluster = addChild(EntitySpec.create(DynamicCluster.class) + .configure("memberSpec", EntitySpec.create(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper))); + if (Entities.isManaged(this)) Entities.manage(cluster); + setAttribute(CLUSTER, cluster); + + connectSensors(); + } + + @Override + public ZooKeeperNode getZooKeeper() { + return getAttribute(ZOOKEEPER); + } + + @Override + public DynamicCluster getCluster() { + return getAttribute(CLUSTER); + } + + @Override + public void start(Collection<? extends Location> locations) { + if (isLegacyConstruction()) { + init(); + } + + if (locations.isEmpty()) locations = getLocations(); + Iterables.getOnlyElement(locations); // Assert just one + addLocations(locations); + + List<Entity> childrenToStart = MutableList.<Entity>of(getCluster()); + // Set the KafkaZookeeper entity as child of cluster, if it does not already have a parent + if (getZooKeeper().getParent() == null) { + addChild(getZooKeeper()); + } // And only start zookeeper if we are parent + if (Objects.equal(this, getZooKeeper().getParent())) childrenToStart.add(getZooKeeper()); + Entities.invokeEffector(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).getUnchecked(); + } + + @Override + public void stop() { + List<Exception> errors = Lists.newArrayList(); + if (getZooKeeper() != null && Objects.equal(this, getZooKeeper().getParent())) { + try { + getZooKeeper().stop(); + } catch (Exception e) { + errors.add(e); + } + } + if (getCurrentSize() > 0) { + try { + getCluster().stop(); + } catch (Exception e) { + errors.add(e); + } + } + + clearLocations(); + setAttribute(SERVICE_UP, false); + + if (errors.size() != 0) { + throw new CompoundRuntimeException("Error stopping Kafka cluster", errors); + } + } + + @Override + public void restart() { + // TODO prod the entities themselves to restart, instead? + Collection<Location> locations = Lists.newArrayList(getLocations()); + + stop(); + start(locations); + } + + void connectSensors() { + addEnricher(Enrichers.builder() + .propagatingAllBut(SERVICE_UP) + .from(getCluster()) + .build()); + addEnricher(Enrichers.builder() + .propagating(SERVICE_UP) + .from(getZooKeeper()) + .build()); + } + + /* + * All Group and Resizable interface methods are delegated to the broker cluster. + */ + + /** {@inheritDoc} */ + @Override + public Collection<Entity> getMembers() { return getCluster().getMembers(); } + + /** {@inheritDoc} */ + @Override + public boolean hasMember(Entity member) { return getCluster().hasMember(member); } + + /** {@inheritDoc} */ + @Override + public boolean addMember(Entity member) { return getCluster().addMember(member); } + + /** {@inheritDoc} */ + @Override + public boolean removeMember(Entity member) { return getCluster().removeMember(member); } + + /** {@inheritDoc} */ + @Override + public Integer getCurrentSize() { return getCluster().getCurrentSize(); } + + /** {@inheritDoc} */ + @Override + public Integer resize(Integer desiredSize) { return getCluster().resize(desiredSize); } + + @Override + public <T extends Entity> T addMemberChild(EntitySpec<T> spec) { return getCluster().addMemberChild(spec); } + + @Override + public <T extends Entity> T addMemberChild(T child) { return getCluster().addMemberChild(child); } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java new file mode 100644 index 0000000..9bdc33c --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.annotation.Effector; +import brooklyn.entity.annotation.EffectorParam; +import brooklyn.entity.basic.SoftwareProcess; +import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.util.time.Duration; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka zookeeper instance. + */ +@ImplementedBy(KafkaZooKeeperImpl.class) +public interface KafkaZooKeeper extends ZooKeeperNode, Kafka { + + @SetFromFlag("startTimeout") + ConfigKey<Duration> START_TIMEOUT = SoftwareProcess.START_TIMEOUT; + + /** The Kafka version, not the Zookeeper version. */ + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION; + + /** The Kafka version, not the Zookeeper version. */ + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = Kafka.DOWNLOAD_URL; + + /** Location of the kafka configuration file template to be copied to the server. */ + @SetFromFlag("kafkaZookeeperConfig") + ConfigKey<String> KAFKA_ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class, + "kafka.zookeeper.configTemplate", "Kafka zookeeper configuration template (in freemarker format)", + "classpath://org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties"); + + @Effector(description = "Create a topic with a single partition and only one replica") + void createTopic(@EffectorParam(name = "topic") String topic); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java new file mode 100644 index 0000000..f08736d --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import brooklyn.entity.java.JavaSoftwareProcessDriver; + +public interface KafkaZooKeeperDriver extends JavaSoftwareProcessDriver { + + Integer getZookeeperPort(); + + void createTopic(String topic); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java new file mode 100644 index 0000000..7764450 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import brooklyn.entity.annotation.EffectorParam; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.brooklyn.entity.zookeeper.AbstractZooKeeperImpl; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka zookeeper instance. + */ +public class KafkaZooKeeperImpl extends AbstractZooKeeperImpl implements KafkaZooKeeper { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(KafkaZooKeeperImpl.class); + + public KafkaZooKeeperImpl() { + } + + @Override + public Class<?> getDriverInterface() { + return KafkaZooKeeperDriver.class; + } + + @Override + public void createTopic(String topic) { + ((KafkaZooKeeperDriver)getDriver()).createTopic(topic); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java new file mode 100644 index 0000000..85ab649 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.kafka; + +import java.util.Map; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.Attributes; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; + +import static brooklyn.util.text.StringEscapes.BashStringEscapes.escapeLiteralForDoubleQuotedBash; + +public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements KafkaZooKeeperDriver { + + public KafkaZooKeeperSshDriver(KafkaZooKeeperImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + protected Map<String, Integer> getPortMap() { + return MutableMap.of("zookeeperPort", getZookeeperPort()); + } + + @Override + protected ConfigKey<String> getConfigTemplateKey() { + return KafkaZooKeeper.KAFKA_ZOOKEEPER_CONFIG_TEMPLATE; + } + + @Override + protected String getConfigFileName() { + return "zookeeper.properties"; + } + + @Override + protected String getLaunchScriptName() { + return "zookeeper-server-start.sh"; + } + + @Override + protected String getTopicsScriptName() { + return "kafka-topics.sh"; + } + + @Override + protected String getProcessIdentifier() { + return "quorum\\.QuorumPeerMain"; + } + + @Override + public Integer getZookeeperPort() { + return getEntity().getAttribute(KafkaZooKeeper.ZOOKEEPER_PORT); + } + + @Override + public void createTopic(String topic) { + String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + ":" + getZookeeperPort(); + newScript(CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append(String.format("./bin/%s --create --zookeeper \"%s\" --replication-factor 1 --partitions 1 --topic \"%s\"", + getTopicsScriptName(), + escapeLiteralForDoubleQuotedBash(zookeeperUrl), + escapeLiteralForDoubleQuotedBash(topic))) + .execute(); + } +}
