http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java deleted file mode 100644 index 470221e..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitSshDriver.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.rabbit; - -import static brooklyn.util.ssh.BashCommands.*; -import static java.lang.String.format; - -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.lifecycle.ScriptHelper; -import brooklyn.entity.messaging.amqp.AmqpServer; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.net.Networking; -import brooklyn.util.os.Os; - -/** - * TODO javadoc - */ -public class RabbitSshDriver extends AbstractSoftwareProcessSshDriver implements RabbitDriver { - - private static final Logger log = LoggerFactory.getLogger(RabbitSshDriver.class); - - // See http://fedoraproject.org/wiki/EPEL/FAQ#howtouse - private static final Map<String, String> CENTOS_VERSION_TO_EPEL_VERSION = ImmutableMap.of( - "5", "5-4", - "6", "6-8", - "7", "7-5" - ); - - public RabbitSshDriver(RabbitBrokerImpl entity, SshMachineLocation machine) { - super(entity, machine); - } - - protected String getLogFileLocation() { return getRunDir()+"/"+entity.getId()+".log"; } - - public Integer getAmqpPort() { return entity.getAttribute(AmqpServer.AMQP_PORT); } - - public String getVirtualHost() { return entity.getAttribute(AmqpServer.VIRTUAL_HOST_NAME); } - - public String getErlangVersion() { return entity.getConfig(RabbitBroker.ERLANG_VERSION); } - - @Override - public RabbitBrokerImpl getEntity() { - return (RabbitBrokerImpl) super.getEntity(); - } - - @Override - public void preInstall() { - resolver = Entities.newDownloader(this); - setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("rabbitmq_server-%s", getVersion())))); - } - - @Override - public void install() { - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - // Version and architecture are only required for download of epel package on RHEL/Centos systems so pick sensible - // defaults if unavailable - String osMajorVersion = getMachine().getOsDetails().getVersion(); - if (Strings.isNullOrEmpty(osMajorVersion)) { - osMajorVersion = "7"; - } else { - osMajorVersion = osMajorVersion.indexOf(".") > 0 ? osMajorVersion.substring(0, osMajorVersion.indexOf('.')) : osMajorVersion; - if (!CENTOS_VERSION_TO_EPEL_VERSION.keySet().contains(osMajorVersion)) { - osMajorVersion = "7"; - } - } - String epelVersion = CENTOS_VERSION_TO_EPEL_VERSION.get(osMajorVersion); - String osArchitecture = getMachine().getOsDetails().getArch(); - if (Strings.isNullOrEmpty(osArchitecture)) { - osArchitecture = "x86_64"; - } - - List<String> commands = ImmutableList.<String>builder() - // EPEL repository for erlang install required on some Centos distributions - .add(chainGroup("which yum", sudo("yum -y update ca-certificates"), sudo("rpm -Uvh " + - format("http://download.fedoraproject.org/pub/epel/%s/%s/epel-release-%s.noarch.rpm", osMajorVersion, osArchitecture, epelVersion)))) - .add(ifExecutableElse0("zypper", chainGroup( - ok(sudo("zypper --non-interactive addrepo http://download.opensuse.org/repositories/devel:/languages:/erlang/SLE_11_SP3 erlang_sles_11")), - ok(sudo("zypper --non-interactive addrepo http://download.opensuse.org/repositories/devel:/languages:/erlang/openSUSE_11.4 erlang_suse_11")), - ok(sudo("zypper --non-interactive addrepo http://download.opensuse.org/repositories/devel:/languages:/erlang/openSUSE_12.3 erlang_suse_12")), - ok(sudo("zypper --non-interactive addrepo http://download.opensuse.org/repositories/devel:/languages:/erlang/openSUSE_13.1 erlang_suse_13"))))) - .add(installPackage( // NOTE only 'port' states the version of Erlang used, maybe remove this constraint? - ImmutableMap.of( - "apt", "erlang-nox erlang-dev", - "port", "erlang@"+getErlangVersion()+"+ssl"), - "erlang")) - .addAll(commandsToDownloadUrlsAs(urls, saveAs)) - .add(installExecutable("tar")) - .add(format("tar xvzf %s",saveAs)) - .build(); - - newScript(INSTALLING). - failOnNonZeroResultCode(). - body.append(commands).execute(); - } - - @Override - public void customize() { - Networking.checkPortsValid(MutableMap.of("amqpPort", getAmqpPort())); - ScriptHelper scriptHelper = newScript(CUSTOMIZING); - - scriptHelper.body.append( - format("cp -R %s/* .", getExpandedInstallDir()) - ); - - if (Boolean.TRUE.equals(entity.getConfig(RabbitBroker.ENABLE_MANAGEMENT_PLUGIN))) { - scriptHelper.body.append( - "./sbin/rabbitmq-plugins enable rabbitmq_management" - ); - } - scriptHelper.failOnNonZeroResultCode(); - scriptHelper.execute(); - - copyTemplate(entity.getConfig(RabbitBroker.CONFIG_TEMPLATE_URL), getConfigPath() + ".config"); - } - - @Override - public void launch() { - newScript(MutableMap.of("usePidFile", false), LAUNCHING) - .body.append( - "nohup ./sbin/rabbitmq-server > console-out.log 2> console-err.log &", - "for i in {1..10}\n" + - "do\n" + - " grep 'broker running' console-out.log && exit\n" + - " sleep 1\n" + - "done", - "echo \"Couldn't determine if rabbitmq-server is running\"", - "exit 1" - ).execute(); - } - - @Override - public void configure() { - newScript(CUSTOMIZING) - .body.append( - "./sbin/rabbitmqctl add_vhost "+getEntity().getVirtualHost(), - "./sbin/rabbitmqctl set_permissions -p "+getEntity().getVirtualHost()+" guest \".*\" \".*\" \".*\"" - ).execute(); - } - - - public String getPidFile() { return "rabbitmq.pid"; } - - @Override - public boolean isRunning() { - return newScript(MutableMap.of("usePidFile", false), CHECK_RUNNING) - .body.append("./sbin/rabbitmqctl -q status") - .execute() == 0; - } - - @Override - public void stop() { - newScript(MutableMap.of("usePidFile", false), STOPPING) - .body.append("./sbin/rabbitmqctl stop") - .execute(); - } - - - @Override - public void kill() { - stop(); // TODO No pid file to easily do `kill -9` - } - - @Override - public Map<String, String> getShellEnvironment() { - return MutableMap.<String, String>builder() - .putAll(super.getShellEnvironment()) - .put("RABBITMQ_HOME", getRunDir()) - .put("RABBITMQ_LOG_BASE", getRunDir()) - .put("RABBITMQ_NODENAME", getEntity().getId()) - .put("RABBITMQ_NODE_PORT", getAmqpPort().toString()) - .put("RABBITMQ_PID_FILE", getRunDir()+"/"+getPidFile()) - .put("RABBITMQ_CONFIG_FILE", getConfigPath()) - .build(); - } - - private String getConfigPath() { - return getRunDir() + "/rabbitmq"; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/storm/Storm.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/Storm.java b/software/messaging/src/main/java/brooklyn/entity/messaging/storm/Storm.java deleted file mode 100644 index ced7bef..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/Storm.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; -import org.apache.brooklyn.api.event.AttributeSensor; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.config.render.RendererHints; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.java.UsesJmx; -import brooklyn.entity.zookeeper.ZooKeeperEnsemble; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; -import brooklyn.event.basic.Sensors; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a Storm node (UI, Nimbus or Supervisor). - */ -@Catalog(name="Storm Node", description="Apache Storm is a distributed realtime computation system. " - + "Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing " - + "what Hadoop did for batch processing") -@ImplementedBy(StormImpl.class) -public interface Storm extends SoftwareProcess, UsesJmx { - - @SetFromFlag("version") - ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.8.2"); - - @SetFromFlag("nimbusHostname") - ConfigKey<String> NIMBUS_HOSTNAME = ConfigKeys.newStringConfigKey("storm.nimbus.hostname"); - - @SetFromFlag("nimbusEntity") - ConfigKey<Entity> NIMBUS_ENTITY = ConfigKeys.newConfigKey(Entity.class, "storm.nimbus.entity"); - - @SetFromFlag("downloadUrl") - BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - SoftwareProcess.DOWNLOAD_URL, "https://dl.dropboxusercontent.com/s/fl4kr7w0oc8ihdw/storm-${version}.zip"); - - ConfigKey<Object> START_MUTEX = ConfigKeys.newConfigKey(Object.class, "storm.start.mutex"); - - @SetFromFlag("role") - ConfigKey<Role> ROLE = ConfigKeys.newConfigKey(Role.class, "storm.role", "The Storm server role"); - - @SetFromFlag("localDir") - ConfigKey<String> LOCAL_DIR = ConfigKeys.newStringConfigKey("storm.local.dir", "Setting for Storm local dir"); - - @SetFromFlag("uiPort") - PortAttributeSensorAndConfigKey UI_PORT = new PortAttributeSensorAndConfigKey("storm.ui.port", "Storm UI port", "8080+"); - - @SetFromFlag("thriftPort") - PortAttributeSensorAndConfigKey THRIFT_PORT = new PortAttributeSensorAndConfigKey("storm.thrift.port", "Storm Thrift port", "6627"); - - @SetFromFlag("zookeeperEnsemble") - ConfigKey<ZooKeeperEnsemble> ZOOKEEPER_ENSEMBLE = ConfigKeys.newConfigKey(ZooKeeperEnsemble.class, - "storm.zookeeper.ensemble", "Zookeeper ensemble entity"); - - @SetFromFlag("stormConfigTemplateUrl") - ConfigKey<String> STORM_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey("storm.config.templateUrl", - "Template file (in freemarker format) for the storm.yaml config file", - "classpath://brooklyn/entity/messaging/storm/storm.yaml"); - - @SetFromFlag("zeromqVersion") - ConfigKey<String> ZEROMQ_VERSION = ConfigKeys.newStringConfigKey("storm.zeromq.version", "zeromq version", "2.1.7"); - - AttributeSensor<Boolean> SERVICE_UP_JMX = Sensors.newBooleanSensor("storm.service.jmx.up", "Whether JMX is up for this service"); - - String getStormConfigTemplateUrl(); - - String getHostname(); - - Role getRole(); - - enum Role { NIMBUS, SUPERVISOR, UI } - - AttributeSensor<String> STORM_UI_URL = StormUiUrl.STORM_UI_URL; - - class StormUiUrl { - public static final AttributeSensor<String> STORM_UI_URL = Sensors.newStringSensor("storm.ui.url", "URL"); - - static { - RendererHints.register(STORM_UI_URL, RendererHints.namedActionWithUrl()); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDeployment.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDeployment.java b/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDeployment.java deleted file mode 100644 index fda29fd..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDeployment.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.trait.Startable; - -@Catalog(name="Storm Deployment", description="A Storm cluster. Apache Storm is a distributed realtime computation system. " - + "Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing " - + "what Hadoop did for batch processing") -@ImplementedBy(StormDeploymentImpl.class) -public interface StormDeployment extends Entity, Startable { - - @SetFromFlag("supervisors.count") - ConfigKey<Integer> SUPERVISORS_COUNT = ConfigKeys.newConfigKey("storm.supervisors.count", "Number of supervisor nodes", 3); - - @SetFromFlag("zookeepers.count") - ConfigKey<Integer> ZOOKEEPERS_COUNT = ConfigKeys.newConfigKey("storm.zookeepers.count", "Number of zookeeper nodes", 1); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDeploymentImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDeploymentImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDeploymentImpl.java deleted file mode 100644 index cf90db8..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDeploymentImpl.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import static brooklyn.entity.messaging.storm.Storm.ROLE; -import static brooklyn.entity.messaging.storm.Storm.Role.NIMBUS; -import static brooklyn.entity.messaging.storm.Storm.Role.SUPERVISOR; -import static brooklyn.entity.messaging.storm.Storm.Role.UI; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.core.util.ResourceUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.enricher.Enrichers; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.BasicStartableImpl; -import brooklyn.entity.group.DynamicCluster; -import brooklyn.entity.zookeeper.ZooKeeperEnsemble; - -public class StormDeploymentImpl extends BasicStartableImpl implements StormDeployment { - - @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(StormDeploymentImpl.class); - - @Override - public void init() { - super.init(); - new ResourceUtils(this).checkUrlExists(Storm.STORM_CONFIG_TEMPLATE_URL.getDefaultValue()); - - setDefaultDisplayName("Storm Deployment"); - - ZooKeeperEnsemble zooKeeperEnsemble = addChild(EntitySpec.create( - ZooKeeperEnsemble.class).configure( - ZooKeeperEnsemble.INITIAL_SIZE, getConfig(ZOOKEEPERS_COUNT))); - - setConfig(Storm.ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble); - - Storm nimbus = addChild(EntitySpec.create(Storm.class).configure(ROLE, NIMBUS)); - - setConfig(Storm.NIMBUS_ENTITY, nimbus); - setConfig(Storm.START_MUTEX, new Object()); - - addChild(EntitySpec.create(DynamicCluster.class) - .configure(DynamicCluster.MEMBER_SPEC, - EntitySpec.create(Storm.class).configure(ROLE, SUPERVISOR)) - .configure(DynamicCluster.INITIAL_SIZE, getConfig(SUPERVISORS_COUNT)) - .displayName("Storm Supervisor Cluster")); - - Storm ui = addChild(EntitySpec.create(Storm.class).configure(ROLE, UI)); - - addEnricher(Enrichers.builder() - .propagating(Storm.STORM_UI_URL) - .from(ui) - .build()); - addEnricher(Enrichers.builder() - .propagating(Attributes.HOSTNAME) - .from(nimbus) - .build()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDriver.java deleted file mode 100644 index ddf7481..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormDriver.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import brooklyn.entity.java.JavaSoftwareProcessDriver; - -public interface StormDriver extends JavaSoftwareProcessDriver { - - String getJvmOptsLine(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormImpl.java deleted file mode 100644 index 57c09e6..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormImpl.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import javax.management.ObjectName; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.entity.java.JavaAppUtils; -import brooklyn.entity.java.JavaSoftwareProcessDriver; -import brooklyn.event.feed.jmx.JmxFeed; -import brooklyn.event.feed.jmx.JmxHelper; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -public class StormImpl extends SoftwareProcessImpl implements Storm { - - private static final Logger log = LoggerFactory.getLogger(StormImpl.class); - - public static final ObjectName STORM_MBEAN = JmxHelper.createObjectName("backtype.storm.daemon.nimbus:type=*"); - - private JmxHelper jmxHelper; - private volatile JmxFeed jmxFeed; - - public StormImpl() {} - - @Override - public String getHostname() { return getAttribute(HOSTNAME); } - - @Override - public Role getRole() { return getConfig(ROLE); } - - @Override - public String getStormConfigTemplateUrl() { return getConfig(STORM_CONFIG_TEMPLATE_URL); } - - @Override - public Class<?> getDriverInterface() { - return StormDriver.class; - } - - public String getRoleName() { return getRole().name().toLowerCase(); } - - @Override - protected void preStart() { - setDefaultDisplayName("Storm Node ("+ getRoleName()+")"); - super.preStart(); - } - - @Override - protected void connectSensors() { - super.connectSensors(); - - // give it plenty of time to start before we advertise ourselves - Time.sleep(Duration.TEN_SECONDS); - - if (getRole() == Role.UI) { - setAttribute(STORM_UI_URL, "http://"+getAttribute(Attributes.HOSTNAME)+":"+getAttribute(UI_PORT)+"/"); - } - - if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) { - jmxHelper = new JmxHelper(this); -// jmxFeed = JmxFeed.builder() -// .entity(this) -// .period(3000, TimeUnit.MILLISECONDS) -// .helper(jmxHelper) -// .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX) -// .objectName(STORM_MBEAN) -// .attributeName("Initialized") -// .onSuccess(Functions.forPredicate(Predicates.notNull())) -// .onException(Functions.constant(false))) -// // TODO SERVICE_UP should really be a combo of JMX plus is running -// .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP) -// .objectName(STORM_MBEAN) -// .attributeName("Initialized") -// .onSuccess(Functions.forPredicate(Predicates.notNull())) -// .onException(Functions.constant(false))) -// .build(); - jmxFeed = JavaAppUtils.connectMXBeanSensors(this); - - // FIXME for now we do service up based on pid check -- we get a warning that: - // JMX object backtype.storm.daemon.nimbus:type=* not found at service:jmx:jmxmp://108.59.82.105:31001 - // (JMX is up fine, but no such object there) - connectServiceUpIsRunning(); - } else { - // if not using JMX - log.warn("Storm running without JMX monitoring; limited visibility of service available"); - connectServiceUpIsRunning(); - } - } - - @Override - public void disconnectSensors() { - super.disconnectSensors(); - disconnectServiceUpIsRunning(); - if (jmxFeed != null) jmxFeed.stop(); - if (jmxHelper !=null) jmxHelper.terminate(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormSshDriver.java deleted file mode 100644 index 76d5bfd..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/storm/StormSshDriver.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.storm; - -import static java.lang.String.format; - -import java.util.List; -import java.util.Map; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.basic.EntityLocal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; -import brooklyn.entity.zookeeper.ZooKeeperEnsemble; -import brooklyn.event.basic.DependentConfiguration; -import org.apache.brooklyn.location.basic.Machines; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.net.Networking; -import brooklyn.util.os.Os; -import brooklyn.util.ssh.BashCommands; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -public class StormSshDriver extends JavaSoftwareProcessSshDriver implements StormDriver { - - private static final Logger log = LoggerFactory.getLogger(StormSshDriver.class); - - public StormSshDriver(EntityLocal entity, SshMachineLocation machine) { - super(entity, machine); - } - - public String getRoleName() { - return entity.getConfig(Storm.ROLE).name().toLowerCase(); - } - - public String getZeromqVersion() { - return entity.getConfig(Storm.ZEROMQ_VERSION); - } - - public String getLocalDir() { - return Optional.fromNullable(entity.getConfig(Storm.LOCAL_DIR)).or(Os.mergePathsUnix(getRunDir(), "storm")); - } - - public String getNimbusHostname() { - String result = entity.getConfig(Storm.NIMBUS_HOSTNAME); - if (result != null) return result; - - Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY); - if (nimbus == null) { - log.warn("No nimbus hostname available; using 'localhost'"); - return "localhost"; - } - return Entities.submit(entity, DependentConfiguration.attributeWhenReady(nimbus, Attributes.HOSTNAME)).getUnchecked(); - } - - public Integer getUiPort() { - return entity.getAttribute(Storm.UI_PORT); - } - - public Map<String, Integer> getPortMap() { - return MutableMap.of("uiPort", getUiPort()); - } - - @Override - protected List<String> getCustomJavaConfigOptions() { - List<String> result = super.getCustomJavaConfigOptions(); - if ("nimbus".equals(getRoleName()) || "supervisor".equals(getRoleName())) { - result.add("-verbose:gc"); - result.add("-XX:+PrintGCTimeStamps"); - result.add("-XX:+PrintGCDetails"); - } - - if ("ui".equals(getRoleName())) { - result.add("-Xmx768m"); - } - - return result; - } - - public String getJvmOptsLine() { - return Optional.fromNullable(getShellEnvironment().get("JAVA_OPTS")).or(""); - } - - public List<String> getZookeeperServers() { - ZooKeeperEnsemble zooKeeperEnsemble = entity.getConfig(Storm.ZOOKEEPER_ENSEMBLE); - Supplier<List<String>> supplier = Entities.attributeSupplierWhenReady(zooKeeperEnsemble, ZooKeeperEnsemble.ZOOKEEPER_SERVERS); - return supplier.get(); - } - - public String getStormConfigTemplateUrl() { - return entity.getConfig(Storm.STORM_CONFIG_TEMPLATE_URL); - } - - @Override - public void preInstall() { - resolver = Entities.newDownloader(this); - setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("storm-%s", getVersion())))); - } - - @Override - public void install() { - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - - ImmutableList.Builder<String> commands= ImmutableList.<String> builder(); - if (!getLocation().getOsDetails().isMac()) { - commands.add(BashCommands.installPackage(ImmutableMap.of( - "yum", "libuuid-devel", - "apt", "build-essential uuid-dev pkg-config libtool automake"), - "libuuid-devel")); - commands.add(BashCommands.ifExecutableElse0("yum", BashCommands.sudo("yum -y groupinstall 'Development Tools'"))); - } - commands.add(BashCommands.installPackage(ImmutableMap.of("yum", "git"), "git")) - .add(BashCommands.INSTALL_UNZIP) - .addAll(installNativeDependencies()) - .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) - .add("unzip " + saveAs) - .add("mkdir -p " + getLocalDir()) - .add("chmod 777 " + getLocalDir()); // FIXME - newScript(INSTALLING) - .body.append(commands.build()) - .gatherOutput() - .execute(); - } - - public String getPidFile() { - return Os.mergePathsUnix(getRunDir(), format("%s.pid", getRoleName())); - } - - @Override - protected String getLogFileLocation() { - return Os.mergePathsUnix(getRunDir(), "logs", format("%s.log", getRoleName())); - } - - @Override - public void launch() { - boolean needsSleep = false; - if (getRoleName().equals("supervisor")) { - Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY); - if (nimbus == null) { - log.warn("No nimbus entity available; not blocking before starting supervisors"); - } else { - Entities.waitForServiceUp(nimbus, entity.getConfig(SoftwareProcess.START_TIMEOUT)); - needsSleep = true; - } - } - - String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get(); - log.info("Launching " + entity + " with role " + getRoleName() + " and " + "hostname (public) " - + getEntity().getAttribute(Attributes.HOSTNAME) + ", " + "hostname (subnet) " + subnetHostname + ")"); - - // ensure only one node at a time tries to start - // attempting to eliminate the causes of: - // 2013-12-12 09:21:45 supervisor [ERROR] Error on initialization of server mk-supervisor - // org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /assignments - // TODO use SoftwareProcess#START_LATCH instead here? - - Object startMutex = Optional.fromNullable(entity.getConfig(Storm.START_MUTEX)).or(new Object()); - synchronized (startMutex) { - if (needsSleep) { - // give 10s extra to make sure nimbus is ready; we see weird zookeeper no /assignments node error otherwise - // (this could be optimized by recording nimbus service_up time) - Time.sleep(Duration.TEN_SECONDS); - } - newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING) - .body.append(format("nohup ./bin/storm %s > %s 2>&1 &", getRoleName(), getLogFileLocation())) - .execute(); - } - } - - @Override - public boolean isRunning() { - return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; - } - - @Override - public void stop() { - newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); - } - - @Override - public void customize() { - log.debug("Customizing {}", entity); - Networking.checkPortsValid(getPortMap()); - - newScript(CUSTOMIZING) - .body.append(format("cp -R %s/* .", getExpandedInstallDir())) - .execute(); - - String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/storm.yaml"); - copyTemplate(getStormConfigTemplateUrl(), destinationConfigFile); - } - - protected List<String> installNativeDependencies() { - String zeromqUrl = format("http://download.zeromq.org/zeromq-%s.tar.gz", getZeromqVersion()); - String targz = format("zeromq-%s.tar.gz", getZeromqVersion()); - String jzmq = "https://github.com/nathanmarz/jzmq.git"; - - ImmutableList.Builder<String> commands = ImmutableList.<String>builder(); - if (getLocation().getOsDetails().isMac()) { - commands.add("export PATH=$PATH:/usr/local/bin") - .add("export JAVA_HOME=$(/usr/libexec/java_home)") - .add("cd " + getInstallDir()) - .add(BashCommands.installPackage(ImmutableMap.of("brew", "automake"), "make")) - .add(BashCommands.installPackage(ImmutableMap.of("brew", "libtool"), "libtool")) - .add(BashCommands.installPackage(ImmutableMap.of("brew", "pkg-config"), "pkg-config")) - .add(BashCommands.installPackage(ImmutableMap.of("brew", "zeromq"), "zeromq")) - .add("git clone https://github.com/asmaier/jzmq") - .add("cd jzmq") - .add("./autogen.sh") - .add("./configure") - .add("make") - .add((BashCommands.sudo("make install"))) - .add("cd " + getInstallDir()); - } else { - commands.add("export JAVA_HOME=$(dirname $(readlink -m `which java`))/../../ || export JAVA_HOME=/usr/lib/jvm/java") - .add("cd " + getInstallDir()) - .add(BashCommands.commandToDownloadUrlAs(zeromqUrl, targz)) - .add("tar xzf " + targz) - .add(format("cd zeromq-%s", getZeromqVersion())) - .add("./configure") - .add("make") - .add((BashCommands.sudo("make install"))) - // install jzmq - .add("cd " + getInstallDir()) - .add("git clone " + jzmq) - .add("cd jzmq") - .add("./autogen.sh") - .add("./configure") - - // hack needed on ubuntu 12.04; ignore if it fails - // see https://github.com/zeromq/jzmq/issues/114 - .add(BashCommands.ok( - "pushd src ; touch classdist_noinst.stamp ; CLASSPATH=.:./.:$CLASSPATH " - + "javac -d . org/zeromq/ZMQ.java org/zeromq/App.java org/zeromq/ZMQForwarder.java org/zeromq/EmbeddedLibraryTools.java org/zeromq/ZMQQueue.java org/zeromq/ZMQStreamer.java org/zeromq/ZMQException.java")) - .add(BashCommands.ok("popd")) - - .add("make") - .add((BashCommands.sudo("make install"))) - .add("cd " + getInstallDir()); - } - return commands.build(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java deleted file mode 100644 index 6e14a42..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.zookeeper; - -import java.util.concurrent.TimeUnit; - -import javax.management.ObjectName; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.entity.java.JavaSoftwareProcessDriver; -import brooklyn.event.feed.jmx.JmxAttributePollConfig; -import brooklyn.event.feed.jmx.JmxFeed; -import brooklyn.event.feed.jmx.JmxHelper; - -import com.google.common.base.Functions; -import com.google.common.base.Objects.ToStringHelper; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance. - */ -public abstract class AbstractZooKeeperImpl extends SoftwareProcessImpl implements ZooKeeperNode { - - @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(AbstractZooKeeperImpl.class); - private static final ObjectName ZOOKEEPER_MBEAN = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1"); - - private volatile JmxFeed jmxFeed; - - public AbstractZooKeeperImpl() { - } - - @Override - public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); } - - @Override - public String getHostname() { return getAttribute(HOSTNAME); } - - @Override - public void waitForServiceUp(long duration, TimeUnit units) { - super.waitForServiceUp(duration, units); - - if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) { - // Wait for the MBean to exist - JmxHelper helper = new JmxHelper(this); - try { - helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration)); - } finally { - helper.terminate(); - } - } - } - - @Override - protected void connectSensors() { - connectServiceUpIsRunning(); - - if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) { - jmxFeed = JmxFeed.builder() - .entity(this) - .period(500, TimeUnit.MILLISECONDS) - .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS) - .objectName(ZOOKEEPER_MBEAN) - .attributeName("OutstandingRequests") - .onFailureOrException(Functions.constant(-1l))) - .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED) - .objectName(ZOOKEEPER_MBEAN) - .attributeName("PacketsReceived") - .onFailureOrException(Functions.constant(-1l))) - .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT) - .objectName(ZOOKEEPER_MBEAN) - .attributeName("PacketsSent") - .onFailureOrException(Functions.constant(-1l))) - .build(); - } - } - - @Override - public void disconnectSensors() { - super.disconnectSensors(); - disconnectServiceUpIsRunning(); - if (jmxFeed != null) jmxFeed.stop(); - } - - @Override - protected ToStringHelper toStringHelper() { - return super.toStringHelper() - .add("zookeeperPort", getZookeeperPort()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperDriver.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperDriver.java deleted file mode 100644 index 7dca734..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperDriver.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.zookeeper; - -import brooklyn.entity.java.JavaSoftwareProcessDriver; - -public interface ZooKeeperDriver extends JavaSoftwareProcessDriver { - - Integer getZooKeeperPort(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java deleted file mode 100644 index e5869c5..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.zookeeper; - -import java.util.List; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; -import org.apache.brooklyn.api.event.AttributeSensor; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.group.DynamicCluster; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.Sensors; - -import com.google.common.reflect.TypeToken; - -@Catalog(name="ZooKeeper ensemble", description="A cluster of ZooKeeper servers. " - + "Apache ZooKeeper enables highly reliable distributed coordination.") -@ImplementedBy(ZooKeeperEnsembleImpl.class) -public interface ZooKeeperEnsemble extends DynamicCluster { - - @SetFromFlag("clusterName") - BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String - .class, "zookeeper.cluster.name", "Name of the Zookeeper cluster", "BrooklynZookeeperCluster"); - - @SetFromFlag("initialSize") - public static final ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE, 3); - - @SuppressWarnings("serial") - AttributeSensor<List<String>> ZOOKEEPER_SERVERS = Sensors.newSensor(new TypeToken<List<String>>() { }, - "zookeeper.servers", "Hostnames to connect to cluster with"); - - String getClusterName(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java deleted file mode 100644 index eedcfe8..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.zookeeper; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.policy.PolicySpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.group.AbstractMembershipTrackingPolicy; -import brooklyn.entity.group.DynamicClusterImpl; - -import com.google.common.collect.Lists; - -public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements ZooKeeperEnsemble { - - private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleImpl.class); - private static final AtomicInteger myId = new AtomicInteger(); - - private MemberTrackingPolicy policy; - - public ZooKeeperEnsembleImpl() {} - - /** - * Sets the default {@link #MEMBER_SPEC} to describe the ZooKeeper nodes. - */ - @Override - protected EntitySpec<?> getMemberSpec() { - return getConfig(MEMBER_SPEC, EntitySpec.create(ZooKeeperNode.class)); - } - - @Override - public String getClusterName() { - return getAttribute(CLUSTER_NAME); - } - - @Override - public void init() { - log.info("Initializing the ZooKeeper Ensemble"); - super.init(); - - policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class) - .displayName("Members tracker") - .configure("group", this)); - } - - public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { - @Override - protected void onEntityChange(Entity member) { - } - - @Override - protected void onEntityAdded(Entity member) { - if (member.getAttribute(ZooKeeperNode.MY_ID) == null) { - ((EntityInternal) member).setAttribute(ZooKeeperNode.MY_ID, myId.incrementAndGet()); - } - } - - @Override - protected void onEntityRemoved(Entity member) { - } - }; - - @Override - protected void initEnrichers() { - super.initEnrichers(); - - } - - @Override - public void start(Collection<? extends Location> locations) { - super.start(locations); - - List<String> zookeeperServers = Lists.newArrayList(); - for (Entity zookeeper : getMembers()) { - zookeeperServers.add(zookeeper.getAttribute(Attributes.HOSTNAME)); - } - setAttribute(ZOOKEEPER_SERVERS, zookeeperServers); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperNode.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperNode.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperNode.java deleted file mode 100644 index 504a894..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperNode.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.zookeeper; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; -import org.apache.brooklyn.api.event.AttributeSensor; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.event.basic.BasicAttributeSensor; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance. - */ -@Catalog(name="ZooKeeper Node", description="Apache ZooKeeper is a server which enables " - + "highly reliable distributed coordination.") -@ImplementedBy(ZooKeeperNodeImpl.class) -public interface ZooKeeperNode extends SoftwareProcess { - - @SetFromFlag("version") - ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "3.4.5"); - @SetFromFlag("zookeeperPort") - PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+"); - @SetFromFlag("zookeeperLeaderPort") - PortAttributeSensorAndConfigKey ZOOKEEPER_LEADER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.leader.port", "Zookeeper leader ports", "2888+"); - @SetFromFlag("zookeeperElectionPort") - PortAttributeSensorAndConfigKey ZOOKEEPER_ELECTION_PORT = new PortAttributeSensorAndConfigKey("zookeeper.election.port", "Zookeeper election ports", "3888+"); - @SetFromFlag("downloadUrl") - BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - SoftwareProcess.DOWNLOAD_URL, "http://apache.fastbull.org/zookeeper/zookeeper-${version}/zookeeper-${version}.tar.gz"); - /** - * Location of the ZK configuration file template to be copied to the server. - */ - @SetFromFlag("zookeeperConfig") - ConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = ConfigKeys.newStringConfigKey( - "zookeeper.configTemplate", "Zookeeper configuration template (in freemarker format)", - "classpath://brooklyn/entity/messaging/zookeeper/zoo.cfg"); - AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "zookeeper.outstandingRequests", "Outstanding request count"); - AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.received", "Total packets received"); - AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.sent", "Total packets sent"); - AttributeSensor<Integer> MY_ID = new BasicAttributeSensor<Integer>(Integer.class, "zookeeper.myid", "ZooKeeper node's myId"); - - Integer getZookeeperPort(); - - String getHostname(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java deleted file mode 100644 index 0346d02..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.zookeeper; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single standalone zookeeper instance. - */ -public class ZooKeeperNodeImpl extends AbstractZooKeeperImpl implements ZooKeeperNode { - - public ZooKeeperNodeImpl() {} - - @Override - public Class<?> getDriverInterface() { - return ZooKeeperDriver.class; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java deleted file mode 100644 index 2683682..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.zookeeper; - -import static java.lang.String.format; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import org.apache.brooklyn.api.entity.Entity; - -import brooklyn.entity.basic.Entities; -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.net.Networking; -import brooklyn.util.os.Os; -import brooklyn.util.ssh.BashCommands; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -public class ZooKeeperSshDriver extends JavaSoftwareProcessSshDriver implements ZooKeeperDriver { - - public ZooKeeperSshDriver(ZooKeeperNodeImpl entity, SshMachineLocation machine) { - super(entity, machine); - } - - @Override - protected String getLogFileLocation() { return Os.mergePathsUnix(getRunDir(), "console.out"); } - - protected Map<String, Integer> getPortMap() { - return MutableMap.of("zookeeperPort", getZooKeeperPort()); - } - - protected String getConfigFileName() { - return entity.getConfig(ZooKeeperNode.ZOOKEEPER_CONFIG_TEMPLATE); - } - - protected int getMyId() { - return entity.getAttribute(ZooKeeperNode.MY_ID); - } - - // FIXME All for one, and one for all! If any node fails then we're stuck waiting for its hostname/port forever. - // Need a way to terminate the wait based on the entity going on-fire etc. - // FIXME Race in getMemebers. Should we change DynamicCluster.grow to create the members and only then call start on them all? - public List<ZooKeeperServerConfig> getZookeeperServers() throws ExecutionException, InterruptedException { - ZooKeeperEnsemble ensemble = (ZooKeeperEnsemble) entity.getParent(); - List<ZooKeeperServerConfig> result = Lists.newArrayList(); - - for (Entity member : ensemble.getMembers()) { - Integer myid = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.MY_ID).get(); - String hostname = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.HOSTNAME).get(); - Integer port = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_PORT).get(); - Integer leaderPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_LEADER_PORT).get(); - Integer electionPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_ELECTION_PORT).get(); - result.add(new ZooKeeperServerConfig(myid, hostname, port, leaderPort, electionPort)); - } - return result; - } - - @Override - public Integer getZooKeeperPort() { - return getEntity().getAttribute(ZooKeeperNode.ZOOKEEPER_PORT); - } - - @Override - public boolean isRunning() { - return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; - } - - @Override - public void stop() { - newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); - } - - @Override - public void preInstall() { - resolver = Entities.newDownloader(this); - setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("zookeeper-%s", getVersion())))); - } - - @Override - public void install() { - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - - List<String> commands = ImmutableList.<String> builder() - .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) - .add(BashCommands.INSTALL_TAR) - .add("tar xzfv " + saveAs) - .build(); - - newScript(INSTALLING) - .body.append(commands) - .execute(); - } - - @Override - public void customize() { - log.debug("Customizing {}", entity); - Networking.checkPortsValid(getPortMap()); - newScript(CUSTOMIZING) - .body.append( - format("cp -R %s/* .", getExpandedInstallDir()), - format("mkdir %s/zookeeper", getRunDir()), - format("echo %d > %s/zookeeper/myid", getMyId(), getRunDir()) - ) - .execute(); - - String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/zoo.cfg"); - copyTemplate(getConfigFileName(), destinationConfigFile); - } - - public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "zookeeper.pid"); } - - @Override - public void launch() { - newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING) - .body.append(format("nohup java $JAVA_OPTS -cp zookeeper-%s.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo.cfg > %s 2>&1 &", getVersion(), getLogFileLocation())) - .execute(); - } - - public static class ZooKeeperServerConfig { - private final Integer myid; - private final String hostname; - private final Integer port; - private final Integer leaderPort; - private final Integer electionPort; - - public ZooKeeperServerConfig(Integer myid, String hostname, Integer port, Integer leaderPort, Integer electionPort) { - this.myid = myid; - this.hostname = hostname; - this.port = port; - this.leaderPort = leaderPort; - this.electionPort = electionPort; - } - - public Integer getMyid() { return myid; } - public String getHostname() { return hostname; } - public Integer getPort() { return port; } - public Integer getLeaderPort() { return leaderPort; } - public Integer getElectionPort() { return electionPort; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/MessageBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/MessageBroker.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/MessageBroker.java new file mode 100644 index 0000000..2d22e03 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/MessageBroker.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.event.AttributeSensor; + +import brooklyn.event.basic.Sensors; + +/** + * Marker interface identifying message brokers. + */ +public interface MessageBroker extends Entity { + AttributeSensor<String> BROKER_URL = Sensors.newStringSensor("broker.url", "Broker Connection URL"); + + /** Setup the URL for external connections to the broker. */ + void setBrokerUrl(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/Queue.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/Queue.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/Queue.java new file mode 100644 index 0000000..9c1aca5 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/Queue.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging; + +import org.apache.brooklyn.api.event.AttributeSensor; + +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; + +/** + * An interface that describes a messaging queue. + */ +public interface Queue { + BasicAttributeSensorAndConfigKey<String> QUEUE_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, "queue.name", "Queue name"); + + AttributeSensor<Integer> QUEUE_DEPTH_BYTES = Sensors.newIntegerSensor("queue.depth.bytes", "Queue depth in bytes"); + AttributeSensor<Integer> QUEUE_DEPTH_MESSAGES = Sensors.newIntegerSensor("queue.depth.messages", "Queue depth in messages"); + + /** + * Create the queue. + * + * TODO make this an effector + */ + abstract void create(); + + /** + * Delete the queue. + * + * TODO make this an effector + */ + abstract void delete(); + + String getQueueName(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/Topic.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/Topic.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/Topic.java new file mode 100644 index 0000000..ac71226 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/Topic.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging; + +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; + +/** + * An interface that describes a messaging topic. + */ +public interface Topic { + BasicAttributeSensorAndConfigKey<String> TOPIC_NAME = new BasicAttributeSensorAndConfigKey<String>( + String.class, "topic.name", "Topic name"); + + /** + * Create the topic. + * + * TODO make this an effector + */ + public abstract void create(); + + /** + * Delete the topic. + * + * TODO make this an effector + */ + public abstract void delete(); + + String getTopicName(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQBroker.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQBroker.java new file mode 100644 index 0000000..9ec63f6 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQBroker.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.java.UsesJmx; +import org.apache.brooklyn.entity.messaging.MessageBroker; +import org.apache.brooklyn.entity.messaging.jms.JMSBroker; +import brooklyn.event.basic.AttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.util.time.Duration; +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single ActiveMQ broker instance. + */ +@Catalog(name="ActiveMQ Broker", description="ActiveMQ is an open source message broker which fully implements the Java Message Service 1.1 (JMS)", iconUrl="classpath:///activemq-logo.png") +@ImplementedBy(ActiveMQBrokerImpl.class) +public interface ActiveMQBroker extends SoftwareProcess, MessageBroker, UsesJmx, JMSBroker<ActiveMQQueue, ActiveMQTopic> { + + @SetFromFlag("startTimeout") + ConfigKey<Duration> START_TIMEOUT = SoftwareProcess.START_TIMEOUT; + + @SetFromFlag("version") + public static final ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "5.10.2"); + + @SetFromFlag("downloadUrl") + public static final AttributeSensorAndConfigKey<String,String> DOWNLOAD_URL = new StringAttributeSensorAndConfigKey( + Attributes.DOWNLOAD_URL, "${driver.mirrorUrl}/${version}/apache-activemq-${version}-bin.tar.gz"); + + /** download mirror, if desired */ + @SetFromFlag("mirrorUrl") + public static final BasicConfigKey<String> MIRROR_URL = new BasicConfigKey<String>(String.class, "activemq.install.mirror.url", "URL of mirror", + "http://www.mirrorservice.org/sites/ftp.apache.org/activemq"); + + @SetFromFlag("brokerName") + public static final AttributeSensorAndConfigKey<String,String> BROKER_NAME = + ConfigKeys.newStringSensorAndConfigKey("activemq.brokerName", "ActiveMQ Broker Name", "localhost"); + + @SetFromFlag("openWirePort") + public static final PortAttributeSensorAndConfigKey OPEN_WIRE_PORT = new PortAttributeSensorAndConfigKey("openwire.port", "OpenWire port", "61616+"); + + @SetFromFlag("jettyPort") + public static final PortAttributeSensorAndConfigKey AMQ_JETTY_PORT = new PortAttributeSensorAndConfigKey("activemq.jetty.port", "jetty port", "8161+"); + + @SetFromFlag("jmxUser") + public static final BasicAttributeSensorAndConfigKey<String> JMX_USER = new BasicAttributeSensorAndConfigKey<String>(UsesJmx.JMX_USER, "admin"); + + @SetFromFlag("jmxPassword") + public static final BasicAttributeSensorAndConfigKey<String> JMX_PASSWORD = new BasicAttributeSensorAndConfigKey<String>(UsesJmx.JMX_PASSWORD, "admin"); + + @SetFromFlag("templateConfigurationUrl") + public static final BasicAttributeSensorAndConfigKey<String> TEMPLATE_CONFIGURATION_URL = new BasicAttributeSensorAndConfigKey<String>( + String.class, "activemq.templateConfigurationUrl", "Template file (in freemarker format) for the conf/activemq.xml file", + "classpath://org/apache/brooklyn/entity/messaging/activemq/activemq.xml"); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQBrokerImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQBrokerImpl.java new file mode 100644 index 0000000..3907d76 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQBrokerImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.java.UsesJmx; +import org.apache.brooklyn.entity.messaging.jms.JMSBrokerImpl; +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; + +import com.google.common.base.Functions; +import com.google.common.base.Objects.ToStringHelper; +import com.google.common.base.Predicates; +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single ActiveMQ broker instance. + */ +public class ActiveMQBrokerImpl extends JMSBrokerImpl<ActiveMQQueue, ActiveMQTopic> implements ActiveMQBroker { + private static final Logger log = LoggerFactory.getLogger(ActiveMQBrokerImpl.class); + + private volatile JmxFeed jmxFeed; + + public ActiveMQBrokerImpl() { + super(); + } + + @Override + public void init() { + super.init(); + Entities.getRequiredUrlConfig(this, TEMPLATE_CONFIGURATION_URL); + } + + public void setBrokerUrl() { + setAttribute(BROKER_URL, String.format("tcp://%s:%d", getAttribute(HOSTNAME), getAttribute(OPEN_WIRE_PORT))); + } + + public Integer getJmxPort() { + return !isJmxEnabled() ? Integer.valueOf(-1) : getAttribute(UsesJmx.JMX_PORT); + } + + public String getBrokerName() { + return getAttribute(BROKER_NAME); + } + + public Integer getOpenWirePort() { + return getAttribute(OPEN_WIRE_PORT); + } + + public boolean isJmxEnabled() { + return Boolean.TRUE.equals(getConfig(USE_JMX)); + } + + @Override + public ActiveMQQueue createQueue(Map properties) { + ActiveMQQueue result = addChild(EntitySpec.create(ActiveMQQueue.class).configure(properties)); + Entities.manage(result); + result.create(); + return result; + } + + @Override + public ActiveMQTopic createTopic(Map properties) { + ActiveMQTopic result = addChild(EntitySpec.create(ActiveMQTopic.class).configure(properties)); + Entities.manage(result); + result.create(); + return result; + } + + @Override + protected void connectSensors() { + setAttribute(BROKER_URL, String.format("tcp://%s:%d", getAttribute(HOSTNAME), getAttribute(OPEN_WIRE_PORT))); + + String brokerMbeanName = "org.apache.activemq:type=Broker,brokerName=" + getBrokerName(); + + jmxFeed = JmxFeed.builder() + .entity(this) + .period(500, TimeUnit.MILLISECONDS) + .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP) + .objectName(brokerMbeanName) + .attributeName("BrokerName") + .onSuccess(Functions.forPredicate(Predicates.notNull())) + .onFailureOrException(Functions.constant(false)) + .suppressDuplicates(true)) + .build(); + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + if (jmxFeed != null) jmxFeed.stop(); + } + + @Override + protected ToStringHelper toStringHelper() { + return super.toStringHelper().add("openWirePort", getAttribute(OPEN_WIRE_PORT)); + } + + @Override + public Class getDriverInterface() { + return ActiveMQDriver.class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDestination.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDestination.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDestination.java new file mode 100644 index 0000000..c941ac1 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDestination.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import org.apache.brooklyn.entity.messaging.jms.JMSDestination; + +public interface ActiveMQDestination extends JMSDestination { +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDestinationImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDestinationImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDestinationImpl.java new file mode 100644 index 0000000..becd7d0 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQDestinationImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.activemq; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.basic.EntityLocal; + +import com.google.common.base.Preconditions; + +import org.apache.brooklyn.entity.messaging.jms.JMSDestinationImpl; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.event.feed.jmx.JmxHelper; +import brooklyn.util.exceptions.Exceptions; + +public abstract class ActiveMQDestinationImpl extends JMSDestinationImpl implements ActiveMQDestination { + protected ObjectName brokerMBeanName; + protected transient JmxHelper jmxHelper; + protected volatile JmxFeed jmxFeed; + + public ActiveMQDestinationImpl() { + } + + @Override + public void onManagementStarting() { + super.onManagementStarting(); + + String brokerName = getBrokerName(); + Preconditions.checkArgument(brokerName != null && !brokerName.isEmpty(), "ActiveMQ brokerName attribute must be specified"); + + try { + brokerMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + brokerName); + jmxHelper = new JmxHelper((EntityLocal) getParent()); + } catch (MalformedObjectNameException e) { + throw Exceptions.propagate(e); + } + } + + @Override + protected void disconnectSensors() { + if (jmxFeed != null) jmxFeed.stop(); + } + + protected String getBrokerName() { + Preconditions.checkNotNull(getParent(), "JMS Destination must have a broker parent"); + return getParent().getAttribute(ActiveMQBroker.BROKER_NAME); + } +}
