http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBroker.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBroker.java new file mode 100644 index 0000000..3e4a92a --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBroker.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import java.util.Map; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.java.UsesJmx; +import org.apache.brooklyn.entity.messaging.MessageBroker; +import org.apache.brooklyn.entity.messaging.amqp.AmqpServer; +import org.apache.brooklyn.entity.messaging.jms.JMSBroker; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Qpid broker instance, using AMQP 0-10. + */ +@Catalog(name="Qpid Broker", description="Apache Qpid is an open-source messaging system, implementing the Advanced Message Queuing Protocol (AMQP)", iconUrl="classpath:///qpid-logo.jpeg") +@ImplementedBy(QpidBrokerImpl.class) +public interface QpidBroker extends SoftwareProcess, MessageBroker, UsesJmx, AmqpServer, JMSBroker<QpidQueue, QpidTopic> { + + /* Qpid runtime file locations for convenience. */ + + public static final String CONFIG_XML = "etc/config.xml"; + public static final String VIRTUALHOSTS_XML = "etc/virtualhosts.xml"; + public static final String PASSWD = "etc/passwd"; + + @SetFromFlag("version") + public static final ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.20"); + + @SetFromFlag("downloadUrl") + public static final BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + Attributes.DOWNLOAD_URL, "http://download.nextag.com/apache/qpid/${version}/qpid-java-broker-${version}.tar.gz"); + + @SetFromFlag("amqpPort") + public static final PortAttributeSensorAndConfigKey AMQP_PORT = AmqpServer.AMQP_PORT; + + @SetFromFlag("virtualHost") + public static final BasicAttributeSensorAndConfigKey<String> VIRTUAL_HOST_NAME = AmqpServer.VIRTUAL_HOST_NAME; + + @SetFromFlag("amqpVersion") + public static final BasicAttributeSensorAndConfigKey<String> AMQP_VERSION = new BasicAttributeSensorAndConfigKey<String>( + AmqpServer.AMQP_VERSION, AmqpServer.AMQP_0_10); + + @SetFromFlag("httpManagementPort") + public static final PortAttributeSensorAndConfigKey HTTP_MANAGEMENT_PORT = new PortAttributeSensorAndConfigKey("qpid.http-management.port", "Qpid HTTP management plugin port"); + + @SetFromFlag("jmxUser") + public static final BasicAttributeSensorAndConfigKey<String> JMX_USER = new BasicAttributeSensorAndConfigKey<String>( + UsesJmx.JMX_USER, "admin"); + + @SetFromFlag("jmxPassword") + public static final BasicAttributeSensorAndConfigKey<String> JMX_PASSWORD = new BasicAttributeSensorAndConfigKey<String>( + UsesJmx.JMX_PASSWORD, "admin"); +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java new file mode 100644 index 0000000..66ff73d --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import static java.lang.String.format; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.java.JmxSupport; +import org.apache.brooklyn.entity.messaging.jms.JMSBrokerImpl; +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.event.feed.jmx.JmxHelper; +import brooklyn.util.exceptions.Exceptions; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Objects.ToStringHelper; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Qpid broker instance, using AMQP 0-10. + */ +public class QpidBrokerImpl extends JMSBrokerImpl<QpidQueue, QpidTopic> implements QpidBroker { + private static final Logger log = LoggerFactory.getLogger(QpidBrokerImpl.class); + + private volatile JmxFeed jmxFeed; + + public QpidBrokerImpl() { + super(); + } + + public String getVirtualHost() { return getAttribute(VIRTUAL_HOST_NAME); } + public String getAmqpVersion() { return getAttribute(AMQP_VERSION); } + public Integer getAmqpPort() { return getAttribute(AMQP_PORT); } + + public void setBrokerUrl() { + String urlFormat = "amqp://guest:guest@/%s?brokerlist='tcp://%s:%d'"; + setAttribute(BROKER_URL, format(urlFormat, getAttribute(VIRTUAL_HOST_NAME), getAttribute(HOSTNAME), getAttribute(AMQP_PORT))); + } + + @Override + public void init() { + super.init(); + new JmxSupport(this, null).recommendJmxRmiCustomAgent(); + } + + public void waitForServiceUp(long duration, TimeUnit units) { + super.waitForServiceUp(duration, units); + + // Also wait for the MBean to exist (as used when creating queue/topic) + JmxHelper helper = new JmxHelper(this); + try { + String virtualHost = getConfig(QpidBroker.VIRTUAL_HOST_NAME); + ObjectName virtualHostManager = new ObjectName(format("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"%s\"", virtualHost)); + helper.connect(); + helper.assertMBeanExistsEventually(virtualHostManager, units.toMillis(duration)); + } catch (MalformedObjectNameException e) { + throw Exceptions.propagate(e); + } catch (IOException e) { + throw Exceptions.propagate(e); + } finally { + if (helper != null) helper.terminate(); + } + } + + public QpidQueue createQueue(Map properties) { + QpidQueue result = addChild(EntitySpec.create(QpidQueue.class).configure(properties)); + Entities.manage(result); + result.create(); + return result; + } + + public QpidTopic createTopic(Map properties) { + QpidTopic result = addChild(EntitySpec.create(QpidTopic.class).configure(properties)); + Entities.manage(result); + result.create(); + return result; + } + + @Override + public Class getDriverInterface() { + return QpidDriver.class; + } + + @Override + protected void connectSensors() { + super.connectSensors(); + String serverInfoMBeanName = "org.apache.qpid:type=ServerInformation,name=ServerInformation"; + + jmxFeed = JmxFeed.builder() + .entity(this) + .period(500, TimeUnit.MILLISECONDS) + .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP) + .objectName(serverInfoMBeanName) + .attributeName("ProductVersion") + .onSuccess(new Function<Object,Boolean>() { + private boolean hasWarnedOfVersionMismatch; + @Override public Boolean apply(Object input) { + if (input == null) return false; + if (!hasWarnedOfVersionMismatch && !getConfig(QpidBroker.SUGGESTED_VERSION).equals(input)) { + log.warn("Qpid version mismatch: ProductVersion is {}, requested version is {}", input, getConfig(QpidBroker.SUGGESTED_VERSION)); + hasWarnedOfVersionMismatch = true; + } + return true; + }}) + .onException(Functions.constant(false)) + .suppressDuplicates(true)) + .build(); + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + if (jmxFeed != null) jmxFeed.stop(); + } + + @Override + protected ToStringHelper toStringHelper() { + return super.toStringHelper().add("amqpPort", getAmqpPort()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestination.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestination.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestination.java new file mode 100644 index 0000000..a73d079 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestination.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange; +import org.apache.brooklyn.entity.messaging.jms.JMSDestination; + +public interface QpidDestination extends JMSDestination, AmqpExchange { + + public void create(); + + /** + * Return the AMQP name for the queue. + */ + public String getQueueName(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java new file mode 100644 index 0000000..718ce88 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import static java.lang.String.format; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.core.util.flags.SetFromFlag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.java.UsesJmx; +import org.apache.brooklyn.entity.messaging.amqp.AmqpServer; +import org.apache.brooklyn.entity.messaging.jms.JMSDestinationImpl; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.event.feed.jmx.JmxHelper; +import brooklyn.util.exceptions.Exceptions; + +public abstract class QpidDestinationImpl extends JMSDestinationImpl implements QpidDestination { + public static final Logger log = LoggerFactory.getLogger(QpidDestination.class); + + @SetFromFlag + String virtualHost; + + protected ObjectName virtualHostManager; + protected ObjectName exchange; + protected transient JmxHelper jmxHelper; + protected volatile JmxFeed jmxFeed; + + public QpidDestinationImpl() { + } + + @Override + public QpidBroker getParent() { + return (QpidBroker) super.getParent(); + } + + @Override + public void onManagementStarting() { + super.onManagementStarting(); + + // TODO Would be nice to share the JmxHelper for all destinations, so just one connection. + // But tricky for if brooklyn were distributed + try { + if (virtualHost == null) virtualHost = getConfig(QpidBroker.VIRTUAL_HOST_NAME); + setAttribute(QpidBroker.VIRTUAL_HOST_NAME, virtualHost); + virtualHostManager = new ObjectName(format("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"%s\"", virtualHost)); + jmxHelper = new JmxHelper((EntityLocal)getParent()); + } catch (MalformedObjectNameException e) { + throw Exceptions.propagate(e); + } + } + + @Override + protected void disconnectSensors() { + if (jmxFeed != null) jmxFeed.stop(); + } + + @Override + public void create() { + jmxHelper.operation(virtualHostManager, "createNewQueue", getName(), getParent().getAttribute(UsesJmx.JMX_USER), true); + jmxHelper.operation(exchange, "createNewBinding", getName(), getName()); + connectSensors(); + } + + @Override + public void delete() { + jmxHelper.operation(exchange, "removeBinding", getName(), getName()); + jmxHelper.operation(virtualHostManager, "deleteQueue", getName()); + disconnectSensors(); + } + + @Override + public String getQueueName() { + + if (AmqpServer.AMQP_0_10.equals(getParent().getAmqpVersion())) { + return String.format("'%s'/'%s'; { assert: never }", getExchangeName(), getName()); + } else { + return getName(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDriver.java new file mode 100644 index 0000000..46aeb09 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidDriver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import brooklyn.entity.java.JavaSoftwareProcessDriver; + +public interface QpidDriver extends JavaSoftwareProcessDriver { + + Integer getAmqpPort(); + + String getAmqpVersion(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueue.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueue.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueue.java new file mode 100644 index 0000000..b2710eb --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueue.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; + +import org.apache.brooklyn.entity.messaging.Queue; + +@ImplementedBy(QpidQueueImpl.class) +public interface QpidQueue extends QpidDestination, Queue { + @Override + public String getExchangeName(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueueImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueueImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueueImpl.java new file mode 100644 index 0000000..cec93a9 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidQueueImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import static java.lang.String.format; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange; +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.util.exceptions.Exceptions; + +public class QpidQueueImpl extends QpidDestinationImpl implements QpidQueue { + public QpidQueueImpl() { + } + + @Override + public void onManagementStarting() { + super.onManagementStarting(); + setAttribute(QUEUE_NAME, getName()); + try { + exchange = new ObjectName(format("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"%s\",name=\"%s\",ExchangeType=direct", virtualHost, getExchangeName())); + } catch (MalformedObjectNameException e) { + throw Exceptions.propagate(e); + } + } + + @Override + protected void connectSensors() { + String queue = format("org.apache.qpid:type=VirtualHost.Queue,VirtualHost=\"%s\",name=\"%s\"", virtualHost, getName()); + + jmxFeed = JmxFeed.builder() + .entity(this) + .helper(jmxHelper) + .pollAttribute(new JmxAttributePollConfig<Integer>(QUEUE_DEPTH_BYTES) + .objectName(queue) + .attributeName("QueueDepth")) + .pollAttribute(new JmxAttributePollConfig<Integer>(QUEUE_DEPTH_MESSAGES) + .objectName(queue) + .attributeName("MessageCount")) + .build(); + } + + @Override + public String getExchangeName() { + return AmqpExchange.DIRECT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidSshDriver.java new file mode 100644 index 0000000..3c6daf5 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidSshDriver.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import static java.lang.String.format; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.net.Networking; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; + +import com.google.common.collect.ImmutableMap; + +public class QpidSshDriver extends JavaSoftwareProcessSshDriver implements QpidDriver{ + + private static final Logger log = LoggerFactory.getLogger(QpidSshDriver.class); + + public QpidSshDriver(QpidBrokerImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + protected String getLogFileLocation() { return Os.mergePaths(getRunDir(), "log", "qpid.log"); } + + @Override + public Integer getAmqpPort() { return entity.getAttribute(QpidBroker.AMQP_PORT); } + + @Override + public String getAmqpVersion() { return entity.getAttribute(QpidBroker.AMQP_VERSION); } + + public Integer getHttpManagementPort() { return entity.getAttribute(QpidBroker.HTTP_MANAGEMENT_PORT); } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("qpid-broker-%s", getVersion())))); + } + + @Override + public void install() { + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + List<String> commands = new LinkedList<String>(); + commands.addAll( BashCommands.commandsToDownloadUrlsAs(urls, saveAs)); + commands.add(BashCommands.INSTALL_TAR); + commands.add("tar xzfv "+saveAs); + + newScript(INSTALLING) + .body.append(commands) + .execute(); + } + + @Override + public void customize() { + Networking.checkPortsValid(MutableMap.of("jmxPort", getJmxPort(), "amqpPort", getAmqpPort())); + newScript(CUSTOMIZING) + .body.append( + format("cp -R %s/{bin,etc,lib} .", getExpandedInstallDir()), + "mkdir lib/opt" + ) + .execute(); + } + + @Override + public void launch() { + newScript(ImmutableMap.of(USE_PID_FILE, false), LAUNCHING) + .body.append("nohup ./bin/qpid-server -b '*' > qpid-server-launch.log 2>&1 &") + .execute(); + } + + public String getPidFile() { return "qpid-server.pid"; } + + @Override + public boolean isRunning() { + return newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; + } + + @Override + public void stop() { + newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); + } + + @Override + public void kill() { + newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), KILLING).execute(); + } + + @Override + public Map<String, Object> getCustomJavaSystemProperties() { + return MutableMap.<String, Object>builder() + .putAll(super.getCustomJavaSystemProperties()) + .put("connector.port", getAmqpPort()) + .put("management.enabled", "true") + .put("management.jmxport.registryServer", getRmiRegistryPort()) + .put("management.jmxport.connectorServer", getJmxPort()) + .put("management.http.enabled", Boolean.toString(getHttpManagementPort() != null)) + .putIfNotNull("management.http.port", getHttpManagementPort()) + .build(); + } + + @Override + public Map<String, String> getShellEnvironment() { + return MutableMap.<String, String>builder() + .putAll(super.getShellEnvironment()) + .put("QPID_HOME", getRunDir()) + .put("QPID_WORK", getRunDir()) + .renameKey("JAVA_OPTS", "QPID_OPTS") + .build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopic.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopic.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopic.java new file mode 100644 index 0000000..bee9519 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopic.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; + +import org.apache.brooklyn.entity.messaging.Topic; + +@ImplementedBy(QpidTopicImpl.class) +public interface QpidTopic extends QpidDestination, Topic { +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopicImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopicImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopicImpl.java new file mode 100644 index 0000000..8de02d9 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/qpid/QpidTopicImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.qpid; + +import static java.lang.String.format; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange; +import brooklyn.util.exceptions.Exceptions; + +public class QpidTopicImpl extends QpidDestinationImpl implements QpidTopic { + + public QpidTopicImpl() { + } + + @Override + public void onManagementStarting() { + super.onManagementStarting(); + setAttribute(TOPIC_NAME, getName()); + try { + String virtualHost = getParent().getVirtualHost(); + exchange = new ObjectName(format("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"%s\",name=\"%s\",ExchangeType=topic", virtualHost, getExchangeName())); + } catch (MalformedObjectNameException e) { + throw Exceptions.propagate(e); + } + } + + // TODO sensors + @Override + public void connectSensors() { + } + + @Override + public String getExchangeName() { return AmqpExchange.TOPIC; } + + @Override + public String getTopicName() { return getQueueName(); } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBroker.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBroker.java new file mode 100644 index 0000000..7e0f54e --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBroker.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.rabbit; + +import java.util.Map; + +import com.google.common.annotations.Beta; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import org.apache.brooklyn.entity.messaging.MessageBroker; +import org.apache.brooklyn.entity.messaging.amqp.AmqpServer; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Rabbit MQ broker instance, using AMQP 0-9-1. + */ +@Catalog(name="RabbitMQ Broker", description="RabbitMQ is an open source message broker software (i.e. message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP) standard", iconUrl="classpath:///RabbitMQLogo.png") +@ImplementedBy(RabbitBrokerImpl.class) +public interface RabbitBroker extends SoftwareProcess, MessageBroker, AmqpServer { + + @SetFromFlag("version") + public static final ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.8.7"); + + @SetFromFlag("downloadUrl") + public static final BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + SoftwareProcess.DOWNLOAD_URL, "http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/rabbitmq-server-generic-unix-${version}.tar.gz"); + + @SetFromFlag("erlangVersion") + public static final BasicConfigKey<String> ERLANG_VERSION = new BasicConfigKey<String>(String.class, "erlang.version", "Erlang runtime version", "R15B"); + + @SetFromFlag("rabbitmqConfigTemplateUrl") + ConfigKey<String> CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( + "rabbitmq.templateUrl", "Template file (in freemarker format) for the rabbitmq.config config file", + "classpath://org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config"); + + @SetFromFlag("amqpPort") + public static final PortAttributeSensorAndConfigKey AMQP_PORT = AmqpServer.AMQP_PORT; + + @SetFromFlag("virtualHost") + public static final BasicAttributeSensorAndConfigKey<String> VIRTUAL_HOST_NAME = AmqpServer.VIRTUAL_HOST_NAME; + + @SetFromFlag("amqpVersion") + public static final BasicAttributeSensorAndConfigKey<String> AMQP_VERSION = new BasicAttributeSensorAndConfigKey<String>( + AmqpServer.AMQP_VERSION, AmqpServer.AMQP_0_9_1); + + @SetFromFlag("managmentPort") + public static final PortAttributeSensorAndConfigKey MANAGEMENT_PORT = new PortAttributeSensorAndConfigKey( + "rabbitmq.management.port", "Port on which management interface will be available", "15672+"); + + public static AttributeSensor<String> MANAGEMENT_URL = Sensors.newStringSensor( + "rabbitmq.management.url", "Management URL is only available if management plugin flag is true"); + + @SetFromFlag("enableManagementPlugin") + public static final ConfigKey<Boolean> ENABLE_MANAGEMENT_PLUGIN = ConfigKeys.newBooleanConfigKey( + "rabbitmq.management.plugin", "Management plugin will be enabled", false); + + RabbitQueue createQueue(Map properties); + + // TODO required by RabbitDestination due to close-coupling between that and RabbitBroker; how best to improve? + @Beta + Map<String, String> getShellEnvironment(); + + @Beta + String getRunDir(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java new file mode 100644 index 0000000..82347bf --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.rabbit; + +import static java.lang.String.format; + +import java.util.Map; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects.ToStringHelper; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.SoftwareProcessImpl; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Rabbit MQ broker instance, using AMQP 0-9-1. + */ +public class RabbitBrokerImpl extends SoftwareProcessImpl implements RabbitBroker { + private static final Logger log = LoggerFactory.getLogger(RabbitBrokerImpl.class); + + public String getVirtualHost() { return getAttribute(VIRTUAL_HOST_NAME); } + public String getAmqpVersion() { return getAttribute(AMQP_VERSION); } + public Integer getAmqpPort() { return getAttribute(AMQP_PORT); } + + public RabbitBrokerImpl() { + super(); + } + + @Override + public RabbitDriver getDriver() { + return (RabbitDriver) super.getDriver(); + } + + @Override + public Map<String, String> getShellEnvironment() { + return getDriver().getShellEnvironment(); + } + + @Override + public String getRunDir() { + return getDriver().getRunDir(); + } + + @Override + protected void postStart() { + super.postStart(); + + getDriver().configure(); + + // TODO implement this using AMQP connection, no external mechanism available + // queueNames.each { String name -> addQueue(name) } + } + + public void setBrokerUrl() { + String urlFormat = "amqp://guest:guest@%s:%d/%s"; + setAttribute(BROKER_URL, format(urlFormat, getAttribute(HOSTNAME), getAttribute(AMQP_PORT), getAttribute(VIRTUAL_HOST_NAME))); + } + + public RabbitQueue createQueue(Map properties) { + RabbitQueue result = addChild(EntitySpec.create(RabbitQueue.class).configure(properties)); + Entities.manage(result); + result.create(); + return result; + } + + @Override + public Class<? extends RabbitDriver> getDriverInterface() { + return RabbitDriver.class; + } + + @Override + protected void connectSensors() { + super.connectSensors(); + + connectServiceUpIsRunning(); + + setBrokerUrl(); + + if (getEnableManagementPlugin()) { + setAttribute(MANAGEMENT_URL, format("http://%s:%s/", getAttribute(HOSTNAME), getAttribute(MANAGEMENT_PORT))); + } + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + } + + public boolean getEnableManagementPlugin() { + return Boolean.TRUE.equals(getConfig(ENABLE_MANAGEMENT_PLUGIN)); + } + + public Integer getManagementPort() { + return getAttribute(MANAGEMENT_PORT); + } + + @Override + protected ToStringHelper toStringHelper() { + return super.toStringHelper().add("amqpPort", getAmqpPort()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDestination.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDestination.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDestination.java new file mode 100644 index 0000000..0f2a7e4 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDestination.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.rabbit; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.AbstractEntity; +import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import com.google.common.base.Objects.ToStringHelper; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; + +public abstract class RabbitDestination extends AbstractEntity implements AmqpExchange { + public static final Logger log = LoggerFactory.getLogger(RabbitDestination.class); + + private String virtualHost; + private String exchange; + protected SshMachineLocation machine; + protected Map<String,String> shellEnvironment; + + public RabbitDestination() { + } + + @Override + public void onManagementStarting() { + super.onManagementStarting(); + + exchange = (getConfig(EXCHANGE_NAME) != null) ? getConfig(EXCHANGE_NAME) : getDefaultExchangeName(); + virtualHost = getConfig(RabbitBroker.VIRTUAL_HOST_NAME); + setAttribute(RabbitBroker.VIRTUAL_HOST_NAME, virtualHost); + + machine = (SshMachineLocation) Iterables.find(getParent().getLocations(), Predicates.instanceOf(SshMachineLocation.class)); + shellEnvironment = getParent().getShellEnvironment(); + } + + // FIXME Should return RabbitBroker; won't work if gets a proxy rather than "real" entity + @Override + public RabbitBroker getParent() { + return (RabbitBroker) super.getParent(); + } + + public void create() { + connectSensors(); + } + + public void delete() { + disconnectSensors(); + } + + protected void connectSensors() { } + + protected void disconnectSensors() { } + + public String getVirtualHost() { + return virtualHost; + } + + @Override + public String getExchangeName() { + return exchange; + } + + public String getDefaultExchangeName() { + return AmqpExchange.DIRECT; + } + + @Override + protected ToStringHelper toStringHelper() { + return super.toStringHelper().add("virtualHost", getParent().getVirtualHost()).add("exchange", getExchangeName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDriver.java new file mode 100644 index 0000000..2c989d5 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitDriver.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.rabbit; + +import java.util.Map; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface RabbitDriver extends SoftwareProcessDriver { + + public void configure(); + + public Map<String, String> getShellEnvironment(); + + public String getRunDir(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitQueue.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitQueue.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitQueue.java new file mode 100644 index 0000000..cb12b4f --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitQueue.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.rabbit; + +import org.apache.brooklyn.entity.messaging.Queue; +import brooklyn.event.feed.ssh.SshFeed; +import brooklyn.event.feed.ssh.SshPollConfig; +import brooklyn.event.feed.ssh.SshPollValue; + +import com.google.common.base.Function; +import com.google.common.base.Functions; + +public class RabbitQueue extends RabbitDestination implements Queue { + + private SshFeed sshFeed; + + public RabbitQueue() { + } + + public String getName() { + return getDisplayName(); + } + + @Override + public void create() { + setAttribute(QUEUE_NAME, getName()); + super.create(); + } + + @Override + protected void connectSensors() { + String runDir = getParent().getRunDir(); + String cmd = String.format("%s/sbin/rabbitmqctl list_queues -p /%s | grep '%s'", runDir, getVirtualHost(), getQueueName()); + + sshFeed = SshFeed.builder() + .entity(this) + .machine(machine) + .poll(new SshPollConfig<Integer>(QUEUE_DEPTH_BYTES) + .env(shellEnvironment) + .command(cmd) + .onFailure(Functions.constant(-1)) + .onSuccess(new Function<SshPollValue, Integer>() { + @Override public Integer apply(SshPollValue input) { + return 0; // TODO parse out queue depth from output + }})) + .poll(new SshPollConfig<Integer>(QUEUE_DEPTH_MESSAGES) + .env(shellEnvironment) + .command(cmd) + .onFailure(Functions.constant(-1)) + .onSuccess(new Function<SshPollValue, Integer>() { + @Override public Integer apply(SshPollValue input) { + return 0; // TODO parse out queue depth from output + }})) + .build(); + } + + @Override + protected void disconnectSensors() { + if (sshFeed != null) sshFeed.stop(); + super.disconnectSensors(); + } + + /** + * Return the AMQP name for the queue. + */ + public String getQueueName() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java new file mode 100644 index 0000000..56e248f --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.rabbit; + +import static brooklyn.util.ssh.BashCommands.*; +import static java.lang.String.format; + +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.lifecycle.ScriptHelper; +import org.apache.brooklyn.entity.messaging.amqp.AmqpServer; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.net.Networking; +import brooklyn.util.os.Os; + +/** + * TODO javadoc + */ +public class RabbitSshDriver extends AbstractSoftwareProcessSshDriver implements RabbitDriver { + + private static final Logger log = LoggerFactory.getLogger(RabbitSshDriver.class); + + // See http://fedoraproject.org/wiki/EPEL/FAQ#howtouse + private static final Map<String, String> CENTOS_VERSION_TO_EPEL_VERSION = ImmutableMap.of( + "5", "5-4", + "6", "6-8", + "7", "7-5" + ); + + public RabbitSshDriver(RabbitBrokerImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + protected String getLogFileLocation() { return getRunDir()+"/"+entity.getId()+".log"; } + + public Integer getAmqpPort() { return entity.getAttribute(AmqpServer.AMQP_PORT); } + + public String getVirtualHost() { return entity.getAttribute(AmqpServer.VIRTUAL_HOST_NAME); } + + public String getErlangVersion() { return entity.getConfig(RabbitBroker.ERLANG_VERSION); } + + @Override + public RabbitBrokerImpl getEntity() { + return (RabbitBrokerImpl) super.getEntity(); + } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("rabbitmq_server-%s", getVersion())))); + } + + @Override + public void install() { + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + // Version and architecture are only required for download of epel package on RHEL/Centos systems so pick sensible + // defaults if unavailable + String osMajorVersion = getMachine().getOsDetails().getVersion(); + if (Strings.isNullOrEmpty(osMajorVersion)) { + osMajorVersion = "7"; + } else { + osMajorVersion = osMajorVersion.indexOf(".") > 0 ? osMajorVersion.substring(0, osMajorVersion.indexOf('.')) : osMajorVersion; + if (!CENTOS_VERSION_TO_EPEL_VERSION.keySet().contains(osMajorVersion)) { + osMajorVersion = "7"; + } + } + String epelVersion = CENTOS_VERSION_TO_EPEL_VERSION.get(osMajorVersion); + String osArchitecture = getMachine().getOsDetails().getArch(); + if (Strings.isNullOrEmpty(osArchitecture)) { + osArchitecture = "x86_64"; + } + + List<String> commands = ImmutableList.<String>builder() + // EPEL repository for erlang install required on some Centos distributions + .add(chainGroup("which yum", sudo("yum -y update ca-certificates"), sudo("rpm -Uvh " + + format("http://download.fedoraproject.org/pub/epel/%s/%s/epel-release-%s.noarch.rpm", osMajorVersion, osArchitecture, epelVersion)))) + .add(ifExecutableElse0("zypper", chainGroup( + ok(sudo("zypper --non-interactive addrepo http://download.opensuse.org/repositories/devel:/languages:/erlang/SLE_11_SP3 erlang_sles_11")), + ok(sudo("zypper --non-interactive addrepo http://download.opensuse.org/repositories/devel:/languages:/erlang/openSUSE_11.4 erlang_suse_11")), + ok(sudo("zypper --non-interactive addrepo http://download.opensuse.org/repositories/devel:/languages:/erlang/openSUSE_12.3 erlang_suse_12")), + ok(sudo("zypper --non-interactive addrepo http://download.opensuse.org/repositories/devel:/languages:/erlang/openSUSE_13.1 erlang_suse_13"))))) + .add(installPackage( // NOTE only 'port' states the version of Erlang used, maybe remove this constraint? + ImmutableMap.of( + "apt", "erlang-nox erlang-dev", + "port", "erlang@"+getErlangVersion()+"+ssl"), + "erlang")) + .addAll(commandsToDownloadUrlsAs(urls, saveAs)) + .add(installExecutable("tar")) + .add(format("tar xvzf %s",saveAs)) + .build(); + + newScript(INSTALLING). + failOnNonZeroResultCode(). + body.append(commands).execute(); + } + + @Override + public void customize() { + Networking.checkPortsValid(MutableMap.of("amqpPort", getAmqpPort())); + ScriptHelper scriptHelper = newScript(CUSTOMIZING); + + scriptHelper.body.append( + format("cp -R %s/* .", getExpandedInstallDir()) + ); + + if (Boolean.TRUE.equals(entity.getConfig(RabbitBroker.ENABLE_MANAGEMENT_PLUGIN))) { + scriptHelper.body.append( + "./sbin/rabbitmq-plugins enable rabbitmq_management" + ); + } + scriptHelper.failOnNonZeroResultCode(); + scriptHelper.execute(); + + copyTemplate(entity.getConfig(RabbitBroker.CONFIG_TEMPLATE_URL), getConfigPath() + ".config"); + } + + @Override + public void launch() { + newScript(MutableMap.of("usePidFile", false), LAUNCHING) + .body.append( + "nohup ./sbin/rabbitmq-server > console-out.log 2> console-err.log &", + "for i in {1..10}\n" + + "do\n" + + " grep 'broker running' console-out.log && exit\n" + + " sleep 1\n" + + "done", + "echo \"Couldn't determine if rabbitmq-server is running\"", + "exit 1" + ).execute(); + } + + @Override + public void configure() { + newScript(CUSTOMIZING) + .body.append( + "./sbin/rabbitmqctl add_vhost "+getEntity().getVirtualHost(), + "./sbin/rabbitmqctl set_permissions -p "+getEntity().getVirtualHost()+" guest \".*\" \".*\" \".*\"" + ).execute(); + } + + + public String getPidFile() { return "rabbitmq.pid"; } + + @Override + public boolean isRunning() { + return newScript(MutableMap.of("usePidFile", false), CHECK_RUNNING) + .body.append("./sbin/rabbitmqctl -q status") + .execute() == 0; + } + + @Override + public void stop() { + newScript(MutableMap.of("usePidFile", false), STOPPING) + .body.append("./sbin/rabbitmqctl stop") + .execute(); + } + + + @Override + public void kill() { + stop(); // TODO No pid file to easily do `kill -9` + } + + @Override + public Map<String, String> getShellEnvironment() { + return MutableMap.<String, String>builder() + .putAll(super.getShellEnvironment()) + .put("RABBITMQ_HOME", getRunDir()) + .put("RABBITMQ_LOG_BASE", getRunDir()) + .put("RABBITMQ_NODENAME", getEntity().getId()) + .put("RABBITMQ_NODE_PORT", getAmqpPort().toString()) + .put("RABBITMQ_PID_FILE", getRunDir()+"/"+getPidFile()) + .put("RABBITMQ_CONFIG_FILE", getConfigPath()) + .build(); + } + + private String getConfigPath() { + return getRunDir() + "/rabbitmq"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/Storm.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/Storm.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/Storm.java new file mode 100644 index 0000000..8c4a4bf --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/Storm.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.storm; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.config.render.RendererHints; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.java.UsesJmx; +import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a Storm node (UI, Nimbus or Supervisor). + */ +@Catalog(name="Storm Node", description="Apache Storm is a distributed realtime computation system. " + + "Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing " + + "what Hadoop did for batch processing") +@ImplementedBy(StormImpl.class) +public interface Storm extends SoftwareProcess, UsesJmx { + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.8.2"); + + @SetFromFlag("nimbusHostname") + ConfigKey<String> NIMBUS_HOSTNAME = ConfigKeys.newStringConfigKey("storm.nimbus.hostname"); + + @SetFromFlag("nimbusEntity") + ConfigKey<Entity> NIMBUS_ENTITY = ConfigKeys.newConfigKey(Entity.class, "storm.nimbus.entity"); + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + SoftwareProcess.DOWNLOAD_URL, "https://dl.dropboxusercontent.com/s/fl4kr7w0oc8ihdw/storm-${version}.zip"); + + ConfigKey<Object> START_MUTEX = ConfigKeys.newConfigKey(Object.class, "storm.start.mutex"); + + @SetFromFlag("role") + ConfigKey<Role> ROLE = ConfigKeys.newConfigKey(Role.class, "storm.role", "The Storm server role"); + + @SetFromFlag("localDir") + ConfigKey<String> LOCAL_DIR = ConfigKeys.newStringConfigKey("storm.local.dir", "Setting for Storm local dir"); + + @SetFromFlag("uiPort") + PortAttributeSensorAndConfigKey UI_PORT = new PortAttributeSensorAndConfigKey("storm.ui.port", "Storm UI port", "8080+"); + + @SetFromFlag("thriftPort") + PortAttributeSensorAndConfigKey THRIFT_PORT = new PortAttributeSensorAndConfigKey("storm.thrift.port", "Storm Thrift port", "6627"); + + @SetFromFlag("zookeeperEnsemble") + ConfigKey<ZooKeeperEnsemble> ZOOKEEPER_ENSEMBLE = ConfigKeys.newConfigKey(ZooKeeperEnsemble.class, + "storm.zookeeper.ensemble", "Zookeeper ensemble entity"); + + @SetFromFlag("stormConfigTemplateUrl") + ConfigKey<String> STORM_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey("storm.config.templateUrl", + "Template file (in freemarker format) for the storm.yaml config file", + "classpath://org/apache/brooklyn/entity/messaging/storm/storm.yaml"); + + @SetFromFlag("zeromqVersion") + ConfigKey<String> ZEROMQ_VERSION = ConfigKeys.newStringConfigKey("storm.zeromq.version", "zeromq version", "2.1.7"); + + AttributeSensor<Boolean> SERVICE_UP_JMX = Sensors.newBooleanSensor("storm.service.jmx.up", "Whether JMX is up for this service"); + + String getStormConfigTemplateUrl(); + + String getHostname(); + + Role getRole(); + + enum Role { NIMBUS, SUPERVISOR, UI } + + AttributeSensor<String> STORM_UI_URL = StormUiUrl.STORM_UI_URL; + + class StormUiUrl { + public static final AttributeSensor<String> STORM_UI_URL = Sensors.newStringSensor("storm.ui.url", "URL"); + + static { + RendererHints.register(STORM_UI_URL, RendererHints.namedActionWithUrl()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeployment.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeployment.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeployment.java new file mode 100644 index 0000000..8f505a6 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeployment.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.storm; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.trait.Startable; + +@Catalog(name="Storm Deployment", description="A Storm cluster. Apache Storm is a distributed realtime computation system. " + + "Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing " + + "what Hadoop did for batch processing") +@ImplementedBy(StormDeploymentImpl.class) +public interface StormDeployment extends Entity, Startable { + + @SetFromFlag("supervisors.count") + ConfigKey<Integer> SUPERVISORS_COUNT = ConfigKeys.newConfigKey("storm.supervisors.count", "Number of supervisor nodes", 3); + + @SetFromFlag("zookeepers.count") + ConfigKey<Integer> ZOOKEEPERS_COUNT = ConfigKeys.newConfigKey("storm.zookeepers.count", "Number of zookeeper nodes", 1); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeploymentImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeploymentImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeploymentImpl.java new file mode 100644 index 0000000..0e5d006 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDeploymentImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.storm; + +import static org.apache.brooklyn.entity.messaging.storm.Storm.ROLE; +import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.NIMBUS; +import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.SUPERVISOR; +import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.UI; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.core.util.ResourceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.enricher.Enrichers; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.BasicStartableImpl; +import brooklyn.entity.group.DynamicCluster; +import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble; + +public class StormDeploymentImpl extends BasicStartableImpl implements StormDeployment { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(StormDeploymentImpl.class); + + @Override + public void init() { + super.init(); + new ResourceUtils(this).checkUrlExists(Storm.STORM_CONFIG_TEMPLATE_URL.getDefaultValue()); + + setDefaultDisplayName("Storm Deployment"); + + ZooKeeperEnsemble zooKeeperEnsemble = addChild(EntitySpec.create( + ZooKeeperEnsemble.class).configure( + ZooKeeperEnsemble.INITIAL_SIZE, getConfig(ZOOKEEPERS_COUNT))); + + setConfig(Storm.ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble); + + Storm nimbus = addChild(EntitySpec.create(Storm.class).configure(ROLE, NIMBUS)); + + setConfig(Storm.NIMBUS_ENTITY, nimbus); + setConfig(Storm.START_MUTEX, new Object()); + + addChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, + EntitySpec.create(Storm.class).configure(ROLE, SUPERVISOR)) + .configure(DynamicCluster.INITIAL_SIZE, getConfig(SUPERVISORS_COUNT)) + .displayName("Storm Supervisor Cluster")); + + Storm ui = addChild(EntitySpec.create(Storm.class).configure(ROLE, UI)); + + addEnricher(Enrichers.builder() + .propagating(Storm.STORM_UI_URL) + .from(ui) + .build()); + addEnricher(Enrichers.builder() + .propagating(Attributes.HOSTNAME) + .from(nimbus) + .build()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDriver.java new file mode 100644 index 0000000..f91429e --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormDriver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.storm; + +import brooklyn.entity.java.JavaSoftwareProcessDriver; + +public interface StormDriver extends JavaSoftwareProcessDriver { + + String getJvmOptsLine(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java new file mode 100644 index 0000000..b2e5436 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.storm; + +import javax.management.ObjectName; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.entity.java.JavaAppUtils; +import brooklyn.entity.java.JavaSoftwareProcessDriver; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.event.feed.jmx.JmxHelper; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +public class StormImpl extends SoftwareProcessImpl implements Storm { + + private static final Logger log = LoggerFactory.getLogger(StormImpl.class); + + public static final ObjectName STORM_MBEAN = JmxHelper.createObjectName("backtype.storm.daemon.nimbus:type=*"); + + private JmxHelper jmxHelper; + private volatile JmxFeed jmxFeed; + + public StormImpl() {} + + @Override + public String getHostname() { return getAttribute(HOSTNAME); } + + @Override + public Role getRole() { return getConfig(ROLE); } + + @Override + public String getStormConfigTemplateUrl() { return getConfig(STORM_CONFIG_TEMPLATE_URL); } + + @Override + public Class<?> getDriverInterface() { + return StormDriver.class; + } + + public String getRoleName() { return getRole().name().toLowerCase(); } + + @Override + protected void preStart() { + setDefaultDisplayName("Storm Node ("+ getRoleName()+")"); + super.preStart(); + } + + @Override + protected void connectSensors() { + super.connectSensors(); + + // give it plenty of time to start before we advertise ourselves + Time.sleep(Duration.TEN_SECONDS); + + if (getRole() == Role.UI) { + setAttribute(STORM_UI_URL, "http://"+getAttribute(Attributes.HOSTNAME)+":"+getAttribute(UI_PORT)+"/"); + } + + if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) { + jmxHelper = new JmxHelper(this); +// jmxFeed = JmxFeed.builder() +// .entity(this) +// .period(3000, TimeUnit.MILLISECONDS) +// .helper(jmxHelper) +// .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX) +// .objectName(STORM_MBEAN) +// .attributeName("Initialized") +// .onSuccess(Functions.forPredicate(Predicates.notNull())) +// .onException(Functions.constant(false))) +// // TODO SERVICE_UP should really be a combo of JMX plus is running +// .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP) +// .objectName(STORM_MBEAN) +// .attributeName("Initialized") +// .onSuccess(Functions.forPredicate(Predicates.notNull())) +// .onException(Functions.constant(false))) +// .build(); + jmxFeed = JavaAppUtils.connectMXBeanSensors(this); + + // FIXME for now we do service up based on pid check -- we get a warning that: + // JMX object backtype.storm.daemon.nimbus:type=* not found at service:jmx:jmxmp://108.59.82.105:31001 + // (JMX is up fine, but no such object there) + connectServiceUpIsRunning(); + } else { + // if not using JMX + log.warn("Storm running without JMX monitoring; limited visibility of service available"); + connectServiceUpIsRunning(); + } + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + if (jmxFeed != null) jmxFeed.stop(); + if (jmxHelper !=null) jmxHelper.terminate(); + } + +}
