http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java new file mode 100644 index 0000000..1f108a0 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.messaging.storm; + +import static java.lang.String.format; + +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble; +import brooklyn.event.basic.DependentConfiguration; +import org.apache.brooklyn.location.basic.Machines; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.net.Networking; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class StormSshDriver extends JavaSoftwareProcessSshDriver implements StormDriver { + + private static final Logger log = LoggerFactory.getLogger(StormSshDriver.class); + + public StormSshDriver(EntityLocal entity, SshMachineLocation machine) { + super(entity, machine); + } + + public String getRoleName() { + return entity.getConfig(Storm.ROLE).name().toLowerCase(); + } + + public String getZeromqVersion() { + return entity.getConfig(Storm.ZEROMQ_VERSION); + } + + public String getLocalDir() { + return Optional.fromNullable(entity.getConfig(Storm.LOCAL_DIR)).or(Os.mergePathsUnix(getRunDir(), "storm")); + } + + public String getNimbusHostname() { + String result = entity.getConfig(Storm.NIMBUS_HOSTNAME); + if (result != null) return result; + + Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY); + if (nimbus == null) { + log.warn("No nimbus hostname available; using 'localhost'"); + return "localhost"; + } + return Entities.submit(entity, DependentConfiguration.attributeWhenReady(nimbus, Attributes.HOSTNAME)).getUnchecked(); + } + + public Integer getUiPort() { + return entity.getAttribute(Storm.UI_PORT); + } + + public Map<String, Integer> getPortMap() { + return MutableMap.of("uiPort", getUiPort()); + } + + @Override + protected List<String> getCustomJavaConfigOptions() { + List<String> result = super.getCustomJavaConfigOptions(); + if ("nimbus".equals(getRoleName()) || "supervisor".equals(getRoleName())) { + result.add("-verbose:gc"); + result.add("-XX:+PrintGCTimeStamps"); + result.add("-XX:+PrintGCDetails"); + } + + if ("ui".equals(getRoleName())) { + result.add("-Xmx768m"); + } + + return result; + } + + public String getJvmOptsLine() { + return Optional.fromNullable(getShellEnvironment().get("JAVA_OPTS")).or(""); + } + + public List<String> getZookeeperServers() { + ZooKeeperEnsemble zooKeeperEnsemble = entity.getConfig(Storm.ZOOKEEPER_ENSEMBLE); + Supplier<List<String>> supplier = Entities.attributeSupplierWhenReady(zooKeeperEnsemble, ZooKeeperEnsemble.ZOOKEEPER_SERVERS); + return supplier.get(); + } + + public String getStormConfigTemplateUrl() { + return entity.getConfig(Storm.STORM_CONFIG_TEMPLATE_URL); + } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("storm-%s", getVersion())))); + } + + @Override + public void install() { + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + ImmutableList.Builder<String> commands= ImmutableList.<String> builder(); + if (!getLocation().getOsDetails().isMac()) { + commands.add(BashCommands.installPackage(ImmutableMap.of( + "yum", "libuuid-devel", + "apt", "build-essential uuid-dev pkg-config libtool automake"), + "libuuid-devel")); + commands.add(BashCommands.ifExecutableElse0("yum", BashCommands.sudo("yum -y groupinstall 'Development Tools'"))); + } + commands.add(BashCommands.installPackage(ImmutableMap.of("yum", "git"), "git")) + .add(BashCommands.INSTALL_UNZIP) + .addAll(installNativeDependencies()) + .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) + .add("unzip " + saveAs) + .add("mkdir -p " + getLocalDir()) + .add("chmod 777 " + getLocalDir()); // FIXME + newScript(INSTALLING) + .body.append(commands.build()) + .gatherOutput() + .execute(); + } + + public String getPidFile() { + return Os.mergePathsUnix(getRunDir(), format("%s.pid", getRoleName())); + } + + @Override + protected String getLogFileLocation() { + return Os.mergePathsUnix(getRunDir(), "logs", format("%s.log", getRoleName())); + } + + @Override + public void launch() { + boolean needsSleep = false; + if (getRoleName().equals("supervisor")) { + Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY); + if (nimbus == null) { + log.warn("No nimbus entity available; not blocking before starting supervisors"); + } else { + Entities.waitForServiceUp(nimbus, entity.getConfig(SoftwareProcess.START_TIMEOUT)); + needsSleep = true; + } + } + + String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get(); + log.info("Launching " + entity + " with role " + getRoleName() + " and " + "hostname (public) " + + getEntity().getAttribute(Attributes.HOSTNAME) + ", " + "hostname (subnet) " + subnetHostname + ")"); + + // ensure only one node at a time tries to start + // attempting to eliminate the causes of: + // 2013-12-12 09:21:45 supervisor [ERROR] Error on initialization of server mk-supervisor + // org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /assignments + // TODO use SoftwareProcess#START_LATCH instead here? + + Object startMutex = Optional.fromNullable(entity.getConfig(Storm.START_MUTEX)).or(new Object()); + synchronized (startMutex) { + if (needsSleep) { + // give 10s extra to make sure nimbus is ready; we see weird zookeeper no /assignments node error otherwise + // (this could be optimized by recording nimbus service_up time) + Time.sleep(Duration.TEN_SECONDS); + } + newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING) + .body.append(format("nohup ./bin/storm %s > %s 2>&1 &", getRoleName(), getLogFileLocation())) + .execute(); + } + } + + @Override + public boolean isRunning() { + return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; + } + + @Override + public void stop() { + newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); + } + + @Override + public void customize() { + log.debug("Customizing {}", entity); + Networking.checkPortsValid(getPortMap()); + + newScript(CUSTOMIZING) + .body.append(format("cp -R %s/* .", getExpandedInstallDir())) + .execute(); + + String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/storm.yaml"); + copyTemplate(getStormConfigTemplateUrl(), destinationConfigFile); + } + + protected List<String> installNativeDependencies() { + String zeromqUrl = format("http://download.zeromq.org/zeromq-%s.tar.gz", getZeromqVersion()); + String targz = format("zeromq-%s.tar.gz", getZeromqVersion()); + String jzmq = "https://github.com/nathanmarz/jzmq.git"; + + ImmutableList.Builder<String> commands = ImmutableList.<String>builder(); + if (getLocation().getOsDetails().isMac()) { + commands.add("export PATH=$PATH:/usr/local/bin") + .add("export JAVA_HOME=$(/usr/libexec/java_home)") + .add("cd " + getInstallDir()) + .add(BashCommands.installPackage(ImmutableMap.of("brew", "automake"), "make")) + .add(BashCommands.installPackage(ImmutableMap.of("brew", "libtool"), "libtool")) + .add(BashCommands.installPackage(ImmutableMap.of("brew", "pkg-config"), "pkg-config")) + .add(BashCommands.installPackage(ImmutableMap.of("brew", "zeromq"), "zeromq")) + .add("git clone https://github.com/asmaier/jzmq") + .add("cd jzmq") + .add("./autogen.sh") + .add("./configure") + .add("make") + .add((BashCommands.sudo("make install"))) + .add("cd " + getInstallDir()); + } else { + commands.add("export JAVA_HOME=$(dirname $(readlink -m `which java`))/../../ || export JAVA_HOME=/usr/lib/jvm/java") + .add("cd " + getInstallDir()) + .add(BashCommands.commandToDownloadUrlAs(zeromqUrl, targz)) + .add("tar xzf " + targz) + .add(format("cd zeromq-%s", getZeromqVersion())) + .add("./configure") + .add("make") + .add((BashCommands.sudo("make install"))) + // install jzmq + .add("cd " + getInstallDir()) + .add("git clone " + jzmq) + .add("cd jzmq") + .add("./autogen.sh") + .add("./configure") + + // hack needed on ubuntu 12.04; ignore if it fails + // see https://github.com/zeromq/jzmq/issues/114 + .add(BashCommands.ok( + "pushd src ; touch classdist_noinst.stamp ; CLASSPATH=.:./.:$CLASSPATH " + + "javac -d . org/zeromq/ZMQ.java org/zeromq/App.java org/zeromq/ZMQForwarder.java org/zeromq/EmbeddedLibraryTools.java org/zeromq/ZMQQueue.java org/zeromq/ZMQStreamer.java org/zeromq/ZMQException.java")) + .add(BashCommands.ok("popd")) + + .add("make") + .add((BashCommands.sudo("make install"))) + .add("cd " + getInstallDir()); + } + return commands.build(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java new file mode 100644 index 0000000..6cb5ab0 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.zookeeper; + +import java.util.concurrent.TimeUnit; + +import javax.management.ObjectName; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.entity.java.JavaSoftwareProcessDriver; +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.event.feed.jmx.JmxHelper; + +import com.google.common.base.Functions; +import com.google.common.base.Objects.ToStringHelper; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance. + */ +public abstract class AbstractZooKeeperImpl extends SoftwareProcessImpl implements ZooKeeperNode { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(AbstractZooKeeperImpl.class); + private static final ObjectName ZOOKEEPER_MBEAN = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1"); + + private volatile JmxFeed jmxFeed; + + public AbstractZooKeeperImpl() { + } + + @Override + public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); } + + @Override + public String getHostname() { return getAttribute(HOSTNAME); } + + @Override + public void waitForServiceUp(long duration, TimeUnit units) { + super.waitForServiceUp(duration, units); + + if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) { + // Wait for the MBean to exist + JmxHelper helper = new JmxHelper(this); + try { + helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration)); + } finally { + helper.terminate(); + } + } + } + + @Override + protected void connectSensors() { + connectServiceUpIsRunning(); + + if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) { + jmxFeed = JmxFeed.builder() + .entity(this) + .period(500, TimeUnit.MILLISECONDS) + .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS) + .objectName(ZOOKEEPER_MBEAN) + .attributeName("OutstandingRequests") + .onFailureOrException(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED) + .objectName(ZOOKEEPER_MBEAN) + .attributeName("PacketsReceived") + .onFailureOrException(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT) + .objectName(ZOOKEEPER_MBEAN) + .attributeName("PacketsSent") + .onFailureOrException(Functions.constant(-1l))) + .build(); + } + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + if (jmxFeed != null) jmxFeed.stop(); + } + + @Override + protected ToStringHelper toStringHelper() { + return super.toStringHelper() + .add("zookeeperPort", getZookeeperPort()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java new file mode 100644 index 0000000..36388f0 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.zookeeper; + +import brooklyn.entity.java.JavaSoftwareProcessDriver; + +public interface ZooKeeperDriver extends JavaSoftwareProcessDriver { + + Integer getZooKeeperPort(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java new file mode 100644 index 0000000..d29bc91 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.zookeeper; + +import java.util.List; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; + +import com.google.common.reflect.TypeToken; + +@Catalog(name="ZooKeeper ensemble", description="A cluster of ZooKeeper servers. " + + "Apache ZooKeeper enables highly reliable distributed coordination.") +@ImplementedBy(ZooKeeperEnsembleImpl.class) +public interface ZooKeeperEnsemble extends DynamicCluster { + + @SetFromFlag("clusterName") + BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String + .class, "zookeeper.cluster.name", "Name of the Zookeeper cluster", "BrooklynZookeeperCluster"); + + @SetFromFlag("initialSize") + public static final ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE, 3); + + @SuppressWarnings("serial") + AttributeSensor<List<String>> ZOOKEEPER_SERVERS = Sensors.newSensor(new TypeToken<List<String>>() { }, + "zookeeper.servers", "Hostnames to connect to cluster with"); + + String getClusterName(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java new file mode 100644 index 0000000..f9ce930 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.zookeeper; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.group.DynamicClusterImpl; + +import com.google.common.collect.Lists; + +public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements ZooKeeperEnsemble { + + private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleImpl.class); + private static final AtomicInteger myId = new AtomicInteger(); + + private MemberTrackingPolicy policy; + + public ZooKeeperEnsembleImpl() {} + + /** + * Sets the default {@link #MEMBER_SPEC} to describe the ZooKeeper nodes. + */ + @Override + protected EntitySpec<?> getMemberSpec() { + return getConfig(MEMBER_SPEC, EntitySpec.create(ZooKeeperNode.class)); + } + + @Override + public String getClusterName() { + return getAttribute(CLUSTER_NAME); + } + + @Override + public void init() { + log.info("Initializing the ZooKeeper Ensemble"); + super.init(); + + policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class) + .displayName("Members tracker") + .configure("group", this)); + } + + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override + protected void onEntityChange(Entity member) { + } + + @Override + protected void onEntityAdded(Entity member) { + if (member.getAttribute(ZooKeeperNode.MY_ID) == null) { + ((EntityInternal) member).setAttribute(ZooKeeperNode.MY_ID, myId.incrementAndGet()); + } + } + + @Override + protected void onEntityRemoved(Entity member) { + } + }; + + @Override + protected void initEnrichers() { + super.initEnrichers(); + + } + + @Override + public void start(Collection<? extends Location> locations) { + super.start(locations); + + List<String> zookeeperServers = Lists.newArrayList(); + for (Entity zookeeper : getMembers()) { + zookeeperServers.add(zookeeper.getAttribute(Attributes.HOSTNAME)); + } + setAttribute(ZOOKEEPER_SERVERS, zookeeperServers); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java new file mode 100644 index 0000000..6a67394 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.zookeeper; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance. + */ +@Catalog(name="ZooKeeper Node", description="Apache ZooKeeper is a server which enables " + + "highly reliable distributed coordination.") +@ImplementedBy(ZooKeeperNodeImpl.class) +public interface ZooKeeperNode extends SoftwareProcess { + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "3.4.5"); + @SetFromFlag("zookeeperPort") + PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+"); + @SetFromFlag("zookeeperLeaderPort") + PortAttributeSensorAndConfigKey ZOOKEEPER_LEADER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.leader.port", "Zookeeper leader ports", "2888+"); + @SetFromFlag("zookeeperElectionPort") + PortAttributeSensorAndConfigKey ZOOKEEPER_ELECTION_PORT = new PortAttributeSensorAndConfigKey("zookeeper.election.port", "Zookeeper election ports", "3888+"); + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + SoftwareProcess.DOWNLOAD_URL, "http://apache.fastbull.org/zookeeper/zookeeper-${version}/zookeeper-${version}.tar.gz"); + /** + * Location of the ZK configuration file template to be copied to the server. + */ + @SetFromFlag("zookeeperConfig") + ConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = ConfigKeys.newStringConfigKey( + "zookeeper.configTemplate", "Zookeeper configuration template (in freemarker format)", + "classpath://org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg"); + AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "zookeeper.outstandingRequests", "Outstanding request count"); + AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.received", "Total packets received"); + AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.sent", "Total packets sent"); + AttributeSensor<Integer> MY_ID = new BasicAttributeSensor<Integer>(Integer.class, "zookeeper.myid", "ZooKeeper node's myId"); + + Integer getZookeeperPort(); + + String getHostname(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java new file mode 100644 index 0000000..275e101 --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.zookeeper; + +/** + * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single standalone zookeeper instance. + */ +public class ZooKeeperNodeImpl extends AbstractZooKeeperImpl implements ZooKeeperNode { + + public ZooKeeperNodeImpl() {} + + @Override + public Class<?> getDriverInterface() { + return ZooKeeperDriver.class; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java new file mode 100644 index 0000000..709e44c --- /dev/null +++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.zookeeper; + +import static java.lang.String.format; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.brooklyn.api.entity.Entity; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import org.apache.brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.net.Networking; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +public class ZooKeeperSshDriver extends JavaSoftwareProcessSshDriver implements ZooKeeperDriver { + + public ZooKeeperSshDriver(ZooKeeperNodeImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + protected String getLogFileLocation() { return Os.mergePathsUnix(getRunDir(), "console.out"); } + + protected Map<String, Integer> getPortMap() { + return MutableMap.of("zookeeperPort", getZooKeeperPort()); + } + + protected String getConfigFileName() { + return entity.getConfig(ZooKeeperNode.ZOOKEEPER_CONFIG_TEMPLATE); + } + + protected int getMyId() { + return entity.getAttribute(ZooKeeperNode.MY_ID); + } + + // FIXME All for one, and one for all! If any node fails then we're stuck waiting for its hostname/port forever. + // Need a way to terminate the wait based on the entity going on-fire etc. + // FIXME Race in getMemebers. Should we change DynamicCluster.grow to create the members and only then call start on them all? + public List<ZooKeeperServerConfig> getZookeeperServers() throws ExecutionException, InterruptedException { + ZooKeeperEnsemble ensemble = (ZooKeeperEnsemble) entity.getParent(); + List<ZooKeeperServerConfig> result = Lists.newArrayList(); + + for (Entity member : ensemble.getMembers()) { + Integer myid = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.MY_ID).get(); + String hostname = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.HOSTNAME).get(); + Integer port = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_PORT).get(); + Integer leaderPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_LEADER_PORT).get(); + Integer electionPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_ELECTION_PORT).get(); + result.add(new ZooKeeperServerConfig(myid, hostname, port, leaderPort, electionPort)); + } + return result; + } + + @Override + public Integer getZooKeeperPort() { + return getEntity().getAttribute(ZooKeeperNode.ZOOKEEPER_PORT); + } + + @Override + public boolean isRunning() { + return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; + } + + @Override + public void stop() { + newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); + } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("zookeeper-%s", getVersion())))); + } + + @Override + public void install() { + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + List<String> commands = ImmutableList.<String> builder() + .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) + .add(BashCommands.INSTALL_TAR) + .add("tar xzfv " + saveAs) + .build(); + + newScript(INSTALLING) + .body.append(commands) + .execute(); + } + + @Override + public void customize() { + log.debug("Customizing {}", entity); + Networking.checkPortsValid(getPortMap()); + newScript(CUSTOMIZING) + .body.append( + format("cp -R %s/* .", getExpandedInstallDir()), + format("mkdir %s/zookeeper", getRunDir()), + format("echo %d > %s/zookeeper/myid", getMyId(), getRunDir()) + ) + .execute(); + + String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/zoo.cfg"); + copyTemplate(getConfigFileName(), destinationConfigFile); + } + + public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "zookeeper.pid"); } + + @Override + public void launch() { + newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING) + .body.append(format("nohup java $JAVA_OPTS -cp zookeeper-%s.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo.cfg > %s 2>&1 &", getVersion(), getLogFileLocation())) + .execute(); + } + + public static class ZooKeeperServerConfig { + private final Integer myid; + private final String hostname; + private final Integer port; + private final Integer leaderPort; + private final Integer electionPort; + + public ZooKeeperServerConfig(Integer myid, String hostname, Integer port, Integer leaderPort, Integer electionPort) { + this.myid = myid; + this.hostname = hostname; + this.port = port; + this.leaderPort = leaderPort; + this.electionPort = electionPort; + } + + public Integer getMyid() { return myid; } + public String getHostname() { return hostname; } + public Integer getPort() { return port; } + public Integer getLeaderPort() { return leaderPort; } + public Integer getElectionPort() { return electionPort; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml b/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml deleted file mode 100644 index 52114d1..0000000 --- a/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml +++ /dev/null @@ -1,154 +0,0 @@ -[#ftl] -<?xml version="1.0" encoding="UTF-8"?> -<!-- Based on standard file from ActiveMQ Version 5.7.0 --> -<!-- START SNIPPET: example --> -<beans - xmlns="http://www.springframework.org/schema/beans" - xmlns:amq="http://activemq.apache.org/schema/core" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd - http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> - - <!-- Allows us to use system properties as variables in this configuration file --> - <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> - <property name="locations"> - <value>file:[#noparse]${activemq.conf}[/#noparse]/credentials.properties</value> - </property> - </bean> - - <!-- - The <broker> element is used to configure the ActiveMQ broker. - --> - <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" brokerName="${entity.brokerName}" dataDirectory="[#noparse]${activemq.data}[/#noparse]"> - - <!-- - For better performances use VM cursor and small memory limit. - For more information, see: - - http://activemq.apache.org/message-cursors.html - - Also, if your producer is "hanging", it's probably due to producer flow control. - For more information, see: - http://activemq.apache.org/producer-flow-control.html - --> - - <destinationPolicy> - <policyMap> - <policyEntries> - <policyEntry topic=">" producerFlowControl="true"> - <!-- The constantPendingMessageLimitStrategy is used to prevent - slow topic consumers to block producers and affect other consumers - by limiting the number of messages that are retained - For more information, see: - - http://activemq.apache.org/slow-consumer-handling.html - - --> - <pendingMessageLimitStrategy> - <constantPendingMessageLimitStrategy limit="1000"/> - </pendingMessageLimitStrategy> - </policyEntry> - <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> - <!-- Use VM cursor for better latency - For more information, see: - - http://activemq.apache.org/message-cursors.html - - <pendingQueuePolicy> - <vmQueueCursor/> - </pendingQueuePolicy> - --> - </policyEntry> - </policyEntries> - </policyMap> - </destinationPolicy> - - - <!-- - The managementContext is used to configure how ActiveMQ is exposed in - JMX. By default, ActiveMQ uses the MBean server that is started by - the JVM. For more information, see: - - http://activemq.apache.org/jmx.html - --> - <managementContext> - [#if entity.jmxPort > 0] - <managementContext connectorPort="${entity.jmxPort?c}"/> - [#else] - <managementContext createConnector="false"/> - [/#if] - </managementContext> - - <!-- - Configure message persistence for the broker. The default persistence - mechanism is the KahaDB store (identified by the kahaDB tag). - For more information, see: - - http://activemq.apache.org/persistence.html - --> - <persistenceAdapter> - <kahaDB directory="[#noparse]${activemq.data}[/#noparse]/kahadb"/> - </persistenceAdapter> - - - <!-- - The systemUsage controls the maximum amount of space the broker will - use before slowing down producers. For more information, see: - http://activemq.apache.org/producer-flow-control.html - If using ActiveMQ embedded - the following limits could safely be used: - - <systemUsage> - <systemUsage> - <memoryUsage> - <memoryUsage limit="20 mb"/> - </memoryUsage> - <storeUsage> - <storeUsage limit="1 gb"/> - </storeUsage> - <tempUsage> - <tempUsage limit="100 mb"/> - </tempUsage> - </systemUsage> - </systemUsage> - --> - <systemUsage> - <systemUsage> - <memoryUsage> - <memoryUsage limit="64 mb"/> - </memoryUsage> - <storeUsage> - <storeUsage limit="100 gb"/> - </storeUsage> - <tempUsage> - <tempUsage limit="50 gb"/> - </tempUsage> - </systemUsage> - </systemUsage> - - <!-- - The transport connectors expose ActiveMQ over a given protocol to - clients and other brokers. For more information, see: - - http://activemq.apache.org/configuring-transports.html - --> - <transportConnectors> - <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> - <transportConnector name="openwire" uri="tcp://0.0.0.0:${entity.openWirePort?c}?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> - </transportConnectors> - - <!-- destroy the spring context on shutdown to stop jetty --> - <shutdownHooks> - <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> - </shutdownHooks> - - </broker> - - <!-- - Enable web consoles, REST and Ajax APIs and demos - - Take a look at [#noparse]${ACTIVEMQ_HOME}[/#noparse]/conf/jetty.xml for more details - --> - <import resource="jetty.xml"/> - -</beans> -<!-- END SNIPPET: example --> http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg deleted file mode 100644 index d600ef5..0000000 Binary files a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties deleted file mode 100644 index feb871f..0000000 --- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties +++ /dev/null @@ -1,112 +0,0 @@ -[#ftl] -# -## -# KafkaBroker configuration template for Brooklyn -# -# see kafka.server.KafkaConfig for additional details and defaults -## - -############################# Server Basics ############################# -# The id of the broker. This must be set to a unique integer for each broker. -broker.id=${entity.brokerId?c} - -############################# Socket Server Settings ############################# - -# The port the socket server listens on -port=${entity.kafkaPort?c} - -# Hostname the broker will bind to. If not set, the server will bind to all interfaces -host.name=${driver.hostname} - -# Hostname the broker will advertise to producers and consumers. If not set, it uses the -# value for "host.name" if configured. Otherwise, it will use the value returned from -# java.net.InetAddress.getCanonicalHostName(). -#advertised.host.name=<hostname routable by clients> - -# The port to publish to ZooKeeper for clients to use. If this is not set, -# it will publish the same port that the broker binds to. -#advertised.port=<port accessible by clients> - -# The number of threads handling network requests -num.network.threads=3 - -# The number of threads doing disk I/O -num.io.threads=8 - -# The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer.bytes=102400 - -# The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer.bytes=102400 - -# The maximum size of a request that the socket server will accept (protection against OOM) -max.socket.request.bytes=104857600 - - -############################# Log Basics ############################# - -# The directory under which to store log files -log.dir=${driver.runDir}/kafka-logs - -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. -num.partitions=1 - -# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. -# This value is recommended to be increased for installations with data dirs located in RAID array. -num.recovery.threads.per.data.dir=1 - -############################# Log Flush Policy ############################# - -# Messages are immediately written to the filesystem but by default we only fsync() to sync -# the OS cache lazily. The following configurations control the flush of data to disk. -# There are a few important trade-offs here: -# 1. Durability: Unflushed data may be lost if you are not using replication. -# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. -# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. -# The settings below allow one to configure the flush policy to flush data after a period of time or -# every N messages (or both). This can be done globally and overridden on a per-topic basis. - -# The number of messages to accept before forcing a flush of data to disk -log.flush.interval.messages=10000 - -# The maximum amount of time a message can sit in a log before we force a flush -log.flush.interval.ms=1000 - -############################# Log Retention Policy ############################# - -# The following configurations control the disposal of log segments. The policy can -# be set to delete segments after a period of time, or after a given size has accumulated. -# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens -# from the end of the log. - -# The minimum age of a log file to be eligible for deletion -log.retention.hours=168 - -# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.bytes. -#log.retention.bytes=1073741824 - -# The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.segment.bytes=1073741824 - -# The interval at which log segments are checked to see if they can be deleted according -# to the retention policies -log.retention.check.interval.ms=300000 - -# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. -# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. -log.cleaner.enable=false - -############################# Zookeeper ############################# - -# Zookeeper connection string (see zookeeper docs for details). -# This is a comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". -# You can also append an optional chroot string to the urls to specify the -# root directory for all kafka znodes. -zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c} - -# Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=1000000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties deleted file mode 100644 index 646d2f1..0000000 --- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties +++ /dev/null @@ -1,13 +0,0 @@ -[#ftl] -# - -## -# KafkaZookeeper configuration template for Brooklyn -## - -# the directory where the snapshot is stored. -dataDir=${driver.runDir}/zookeeper -# the port at which the clients will connect -clientPort=${entity.zookeeperPort?c} -# disable the per-ip limit on the number of connections since this is a non-production config -maxClientCnxns=0 http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config b/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config deleted file mode 100644 index b4428f0..0000000 --- a/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config +++ /dev/null @@ -1,5 +0,0 @@ -[ -<#if entity.enableManagementPlugin> - {rabbitmq_mochiweb, [{listeners, [{mgmt, [{port, ${entity.managementPort?c}}]}]}]} -</#if> -]. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml b/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml deleted file mode 100644 index 99f0c71..0000000 --- a/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml +++ /dev/null @@ -1,39 +0,0 @@ -[#ftl] -# -# Storm Configuration -[#if driver.zookeeperServers?has_content] - storm.zookeeper.servers: -[#list driver.zookeeperServers as zkServer] - - "${zkServer}" -[/#list] -[/#if] - - storm.local.dir: "${driver.localDir}" - -### ui.* configs are for the master - ui.port: ${driver.uiPort?c} - ui.childopts: "-Xmx768m" - -[#if driver.roleName == "ui"] - nimbus.host: "${driver.nimbusHostname}" -[/#if] - - nimbus.childopts: " ${driver.jvmOptsLine}" - worker.childopts: " ${driver.jvmOptsLine}" - supervisor.childopts: " ${driver.jvmOptsLine}" - -# ##### These may optionally be filled in: -# -## List of custom serializations -# topology.kryo.register: -# - org.mycompany.MyType -# - org.mycompany.MyType2: org.mycompany.MyType2Serializer -# -## List of custom kryo decorators -# topology.kryo.decorators: -# - org.mycompany.MyDecorator -# -## Locations of the drpc servers -# drpc.servers: -# - "server1" -# - "server2" http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg b/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg deleted file mode 100644 index 79721a6..0000000 --- a/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg +++ /dev/null @@ -1,42 +0,0 @@ -[#ftl] -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -## -# ZooKeeper configuration template for Brooklyn -## - -# The number of milliseconds of each tick -tickTime=2000 -# The number of ticks that the initial -# synchronization phase can take -initLimit=10 -# The number of ticks that can pass between -# sending a request and getting an acknowledgement -syncLimit=5 -# the directory where the snapshot is stored. -dataDir=${driver.runDir}/zookeeper -# the port at which the clients will connect -clientPort=${entity.zookeeperPort?c} -# disable the per-ip limit on the number of connections since this is a non-production config -maxClientCnxns=0 - -[#list driver.zookeeperServers as zkServer] -server.${zkServer.myid?c}=${zkServer.hostname}:${zkServer.leaderPort?c}:${zkServer.electionPort?c} -[/#list] http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml new file mode 100644 index 0000000..52114d1 --- /dev/null +++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml @@ -0,0 +1,154 @@ +[#ftl] +<?xml version="1.0" encoding="UTF-8"?> +<!-- Based on standard file from ActiveMQ Version 5.7.0 --> +<!-- START SNIPPET: example --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <!-- Allows us to use system properties as variables in this configuration file --> + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> + <property name="locations"> + <value>file:[#noparse]${activemq.conf}[/#noparse]/credentials.properties</value> + </property> + </bean> + + <!-- + The <broker> element is used to configure the ActiveMQ broker. + --> + <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" brokerName="${entity.brokerName}" dataDirectory="[#noparse]${activemq.data}[/#noparse]"> + + <!-- + For better performances use VM cursor and small memory limit. + For more information, see: + + http://activemq.apache.org/message-cursors.html + + Also, if your producer is "hanging", it's probably due to producer flow control. + For more information, see: + http://activemq.apache.org/producer-flow-control.html + --> + + <destinationPolicy> + <policyMap> + <policyEntries> + <policyEntry topic=">" producerFlowControl="true"> + <!-- The constantPendingMessageLimitStrategy is used to prevent + slow topic consumers to block producers and affect other consumers + by limiting the number of messages that are retained + For more information, see: + + http://activemq.apache.org/slow-consumer-handling.html + + --> + <pendingMessageLimitStrategy> + <constantPendingMessageLimitStrategy limit="1000"/> + </pendingMessageLimitStrategy> + </policyEntry> + <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> + <!-- Use VM cursor for better latency + For more information, see: + + http://activemq.apache.org/message-cursors.html + + <pendingQueuePolicy> + <vmQueueCursor/> + </pendingQueuePolicy> + --> + </policyEntry> + </policyEntries> + </policyMap> + </destinationPolicy> + + + <!-- + The managementContext is used to configure how ActiveMQ is exposed in + JMX. By default, ActiveMQ uses the MBean server that is started by + the JVM. For more information, see: + + http://activemq.apache.org/jmx.html + --> + <managementContext> + [#if entity.jmxPort > 0] + <managementContext connectorPort="${entity.jmxPort?c}"/> + [#else] + <managementContext createConnector="false"/> + [/#if] + </managementContext> + + <!-- + Configure message persistence for the broker. The default persistence + mechanism is the KahaDB store (identified by the kahaDB tag). + For more information, see: + + http://activemq.apache.org/persistence.html + --> + <persistenceAdapter> + <kahaDB directory="[#noparse]${activemq.data}[/#noparse]/kahadb"/> + </persistenceAdapter> + + + <!-- + The systemUsage controls the maximum amount of space the broker will + use before slowing down producers. For more information, see: + http://activemq.apache.org/producer-flow-control.html + If using ActiveMQ embedded - the following limits could safely be used: + + <systemUsage> + <systemUsage> + <memoryUsage> + <memoryUsage limit="20 mb"/> + </memoryUsage> + <storeUsage> + <storeUsage limit="1 gb"/> + </storeUsage> + <tempUsage> + <tempUsage limit="100 mb"/> + </tempUsage> + </systemUsage> + </systemUsage> + --> + <systemUsage> + <systemUsage> + <memoryUsage> + <memoryUsage limit="64 mb"/> + </memoryUsage> + <storeUsage> + <storeUsage limit="100 gb"/> + </storeUsage> + <tempUsage> + <tempUsage limit="50 gb"/> + </tempUsage> + </systemUsage> + </systemUsage> + + <!-- + The transport connectors expose ActiveMQ over a given protocol to + clients and other brokers. For more information, see: + + http://activemq.apache.org/configuring-transports.html + --> + <transportConnectors> + <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> + <transportConnector name="openwire" uri="tcp://0.0.0.0:${entity.openWirePort?c}?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> + </transportConnectors> + + <!-- destroy the spring context on shutdown to stop jetty --> + <shutdownHooks> + <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> + </shutdownHooks> + + </broker> + + <!-- + Enable web consoles, REST and Ajax APIs and demos + + Take a look at [#noparse]${ACTIVEMQ_HOME}[/#noparse]/conf/jetty.xml for more details + --> + <import resource="jetty.xml"/> + +</beans> +<!-- END SNIPPET: example --> http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg new file mode 100644 index 0000000..d600ef5 Binary files /dev/null and b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg differ http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties new file mode 100644 index 0000000..feb871f --- /dev/null +++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties @@ -0,0 +1,112 @@ +[#ftl] +# +## +# KafkaBroker configuration template for Brooklyn +# +# see kafka.server.KafkaConfig for additional details and defaults +## + +############################# Server Basics ############################# +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=${entity.brokerId?c} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=${entity.kafkaPort?c} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name=${driver.hostname} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name=<hostname routable by clients> + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +max.socket.request.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=${driver.runDir}/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties new file mode 100644 index 0000000..646d2f1 --- /dev/null +++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties @@ -0,0 +1,13 @@ +[#ftl] +# + +## +# KafkaZookeeper configuration template for Brooklyn +## + +# the directory where the snapshot is stored. +dataDir=${driver.runDir}/zookeeper +# the port at which the clients will connect +clientPort=${entity.zookeeperPort?c} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config new file mode 100644 index 0000000..b4428f0 --- /dev/null +++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config @@ -0,0 +1,5 @@ +[ +<#if entity.enableManagementPlugin> + {rabbitmq_mochiweb, [{listeners, [{mgmt, [{port, ${entity.managementPort?c}}]}]}]} +</#if> +]. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml new file mode 100644 index 0000000..99f0c71 --- /dev/null +++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml @@ -0,0 +1,39 @@ +[#ftl] +# +# Storm Configuration +[#if driver.zookeeperServers?has_content] + storm.zookeeper.servers: +[#list driver.zookeeperServers as zkServer] + - "${zkServer}" +[/#list] +[/#if] + + storm.local.dir: "${driver.localDir}" + +### ui.* configs are for the master + ui.port: ${driver.uiPort?c} + ui.childopts: "-Xmx768m" + +[#if driver.roleName == "ui"] + nimbus.host: "${driver.nimbusHostname}" +[/#if] + + nimbus.childopts: " ${driver.jvmOptsLine}" + worker.childopts: " ${driver.jvmOptsLine}" + supervisor.childopts: " ${driver.jvmOptsLine}" + +# ##### These may optionally be filled in: +# +## List of custom serializations +# topology.kryo.register: +# - org.mycompany.MyType +# - org.mycompany.MyType2: org.mycompany.MyType2Serializer +# +## List of custom kryo decorators +# topology.kryo.decorators: +# - org.mycompany.MyDecorator +# +## Locations of the drpc servers +# drpc.servers: +# - "server1" +# - "server2" http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg new file mode 100644 index 0000000..79721a6 --- /dev/null +++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg @@ -0,0 +1,42 @@ +[#ftl] +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +## +# ZooKeeper configuration template for Brooklyn +## + +# The number of milliseconds of each tick +tickTime=2000 +# The number of ticks that the initial +# synchronization phase can take +initLimit=10 +# The number of ticks that can pass between +# sending a request and getting an acknowledgement +syncLimit=5 +# the directory where the snapshot is stored. +dataDir=${driver.runDir}/zookeeper +# the port at which the clients will connect +clientPort=${entity.zookeeperPort?c} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 + +[#list driver.zookeeperServers as zkServer] +server.${zkServer.myid?c}=${zkServer.hostname}:${zkServer.leaderPort?c}:${zkServer.electionPort?c} +[/#list] http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java deleted file mode 100644 index aaffda8..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.activemq; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.testng.annotations.Test; - -import brooklyn.entity.AbstractEc2LiveTest; -import brooklyn.entity.trait.Startable; - -import com.google.common.collect.ImmutableList; - -public class ActiveMQEc2LiveTest extends AbstractEc2LiveTest { - - /** - * Test that can install+start, and use, ActiveMQ. - */ - @Override - protected void doTest(Location loc) throws Exception { - String queueName = "testQueue"; - int number = 10; - String content = "01234567890123456789012345678901"; - - // Start broker with a configured queue - ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName)); - - app.start(ImmutableList.of(loc)); - - EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true); - - // Check queue created - assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName)); - assertEquals(activeMQ.getChildren().size(), 1); - assertEquals(activeMQ.getQueues().size(), 1); - - // Get the named queue entity - ActiveMQQueue queue = activeMQ.getQueues().get(queueName); - assertNotNull(queue); - - // Connect to broker using JMS and send messages - Connection connection = getActiveMQConnection(activeMQ); - clearQueue(connection, queueName); - EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0); - sendMessages(connection, number, queueName, content); - - // Check messages arrived - EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number); - - connection.close(); - } - - private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception { - int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT); - String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port)); - Connection connection = factory.createConnection("admin", "activemq"); - connection.start(); - return connection; - } - - private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName); - MessageProducer messageProducer = session.createProducer(destination); - - for (int i = 0; i < count; i++) { - TextMessage message = session.createTextMessage(content); - messageProducer.send(message); - } - - session.close(); - } - - private int clearQueue(Connection connection, String queueName) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName); - MessageConsumer messageConsumer = session.createConsumer(destination); - - int received = 0; - while (messageConsumer.receive(500) != null) received++; - - session.close(); - - return received; - } - - @Test(enabled=false) - public void testDummy() {} // Convince testng IDE integration that this really does have test methods -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java deleted file mode 100644 index e26dc2d..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.activemq; - -import brooklyn.entity.AbstractGoogleComputeLiveTest; -import brooklyn.entity.trait.Startable; - -import com.google.common.collect.ImmutableList; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.test.EntityTestUtils; -import org.testng.annotations.Test; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; - -public class ActiveMQGoogleComputeLiveTest extends AbstractGoogleComputeLiveTest { - - /** - * Test that can install+start, and use, ActiveMQ. - */ - @Override - protected void doTest(Location loc) throws Exception { - String queueName = "testQueue"; - int number = 10; - String content = "01234567890123456789012345678901"; - - // Start broker with a configured queue - ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName)); - - app.start(ImmutableList.of(loc)); - - EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true); - - // Check queue created - assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName)); - assertEquals(activeMQ.getChildren().size(), 1); - assertEquals(activeMQ.getQueues().size(), 1); - - // Get the named queue entity - ActiveMQQueue queue = activeMQ.getQueues().get(queueName); - assertNotNull(queue); - - // Connect to broker using JMS and send messages - Connection connection = getActiveMQConnection(activeMQ); - clearQueue(connection, queueName); - EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0); - sendMessages(connection, number, queueName, content); - - // Check messages arrived - EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number); - - connection.close(); - } - - private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception { - int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT); - String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port)); - Connection connection = factory.createConnection("admin", "activemq"); - connection.start(); - return connection; - } - - private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName); - MessageProducer messageProducer = session.createProducer(destination); - - for (int i = 0; i < count; i++) { - TextMessage message = session.createTextMessage(content); - messageProducer.send(message); - } - - session.close(); - } - - private int clearQueue(Connection connection, String queueName) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName); - MessageConsumer messageConsumer = session.createConsumer(destination); - - int received = 0; - while (messageConsumer.receive(500) != null) received++; - - session.close(); - - return received; - } - - @Test(enabled=false) - public void testDummy() {} // Convince testng IDE integration that this really does have test methods -}
