http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java deleted file mode 100644 index 7b810ae..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.management.ObjectName; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.entity.messaging.MessageBroker; -import brooklyn.entity.zookeeper.ZooKeeperNode; -import brooklyn.event.feed.jmx.JmxAttributePollConfig; -import brooklyn.event.feed.jmx.JmxFeed; -import brooklyn.event.feed.jmx.JmxHelper; - -import com.google.common.base.Functions; -import com.google.common.base.Objects.ToStringHelper; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka broker instance. - */ -public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker { - - @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class); - private static final ObjectName SOCKET_SERVER_STATS_MBEAN = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats"); - - private volatile JmxFeed jmxFeed; - - public KafkaBrokerImpl() { - super(); - } - - @Override - public void init() { - super.init(); - setAttribute(BROKER_ID, Math.abs(hashCode())); // Must be positive for partitioning to work - } - - @Override - public Integer getKafkaPort() { return getAttribute(KAFKA_PORT); } - - @Override - public Integer getBrokerId() { return getAttribute(BROKER_ID); } - - @Override - public ZooKeeperNode getZookeeper() { return getConfig(ZOOKEEPER); } - - @Override - public Class<?> getDriverInterface() { - return KafkaBrokerDriver.class; - } - - @Override - public void waitForServiceUp(long duration, TimeUnit units) { - super.waitForServiceUp(duration, units); - - if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) { - // Wait for the MBean to exist - JmxHelper helper = new JmxHelper(this); - try { - helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration)); - } finally { - helper.terminate(); - } - } - } - - @Override - protected void connectSensors() { - connectServiceUpIsRunning(); - boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS); - - if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) { - jmxFeed = JmxFeed.builder() - .entity(this) - .period(500, TimeUnit.MILLISECONDS) - .pollAttribute(new JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT) - .objectName(SOCKET_SERVER_STATS_MBEAN) - .attributeName("NumFetchRequests") - .onException(Functions.constant(-1l)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_FETCH_TIME) - .objectName(SOCKET_SERVER_STATS_MBEAN) - .attributeName("TotalFetchRequestMs") - .onException(Functions.constant(-1l)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Double>(MAX_FETCH_TIME) - .objectName(SOCKET_SERVER_STATS_MBEAN) - .attributeName("MaxFetchRequestMs") - .onException(Functions.constant(-1.0d)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Long>(PRODUCE_REQUEST_COUNT) - .objectName(SOCKET_SERVER_STATS_MBEAN) - .attributeName("NumProduceRequests") - .onException(Functions.constant(-1l)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_PRODUCE_TIME) - .objectName(SOCKET_SERVER_STATS_MBEAN) - .attributeName("TotalProduceRequestMs") - .onException(Functions.constant(-1l)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Double>(MAX_PRODUCE_TIME) - .objectName(SOCKET_SERVER_STATS_MBEAN) - .attributeName("MaxProduceRequestMs") - .onException(Functions.constant(-1.0d)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_RECEIVED) - .objectName(SOCKET_SERVER_STATS_MBEAN) - .attributeName("TotalBytesRead") - .onException(Functions.constant(-1l)) - .enabled(retrieveUsageMetrics)) - .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_SENT) - .objectName(SOCKET_SERVER_STATS_MBEAN) - .attributeName("TotalBytesWritten") - .onException(Functions.constant(-1l)) - .enabled(retrieveUsageMetrics)) - .build(); - } - - setBrokerUrl(); - } - - @Override - public void disconnectSensors() { - super.disconnectSensors(); - disconnectServiceUpIsRunning(); - if (jmxFeed != null) jmxFeed.stop(); - } - - @Override - protected ToStringHelper toStringHelper() { - return super.toStringHelper() - .add("kafkaPort", getKafkaPort()); - } - - /** Use the {@link #getZookeeper() zookeeper} details if available, otherwise use our own host and port. */ - @Override - public void setBrokerUrl() { - ZooKeeperNode zookeeper = getZookeeper(); - if (zookeeper != null) { - setAttribute(BROKER_URL, String.format("zookeeper://%s:%d", zookeeper.getAttribute(HOSTNAME), zookeeper.getZookeeperPort())); - } else { - setAttribute(BROKER_URL, String.format("kafka://%s:%d", getAttribute(HOSTNAME), getKafkaPort())); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java deleted file mode 100644 index 7892ac5..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.java.UsesJmx; -import brooklyn.entity.java.UsesJmx.JmxAgentModes; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; - -public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements KafkaBrokerDriver { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerSshDriver.class); - - public KafkaBrokerSshDriver(KafkaBrokerImpl entity, SshMachineLocation machine) { - super(entity, machine); - } - - @Override - protected Map<String, Integer> getPortMap() { - return MutableMap.of("kafkaPort", getKafkaPort()); - } - - @Override - protected ConfigKey<String> getConfigTemplateKey() { - return KafkaBroker.KAFKA_BROKER_CONFIG_TEMPLATE; - } - - @Override - protected String getConfigFileName() { - return "server.properties"; - } - - @Override - protected String getLaunchScriptName() { - return "kafka-server-start.sh"; - } - - @Override - public String getTopicsScriptName() { - return "kafka-topics.sh"; - } - - @Override - protected String getProcessIdentifier() { - return "kafka\\.Kafka"; - } - - @Override - public Integer getKafkaPort() { - return getEntity().getAttribute(KafkaBroker.KAFKA_PORT); - } - - @Override - public Map<String, String> getShellEnvironment() { - JmxAgentModes jmxAgentMode = getEntity().getConfig(KafkaBroker.JMX_AGENT_MODE); - String jmxPort; - if (jmxAgentMode == JmxAgentModes.NONE) { - // seems odd to pass RMI port here, as it gets assigned to com.sun.mgmt.jmx.port in kafka-run-class.sh - // but RMI server/registry port works, whereas JMX port does not - jmxPort = String.valueOf(entity.getAttribute(UsesJmx.JMX_PORT)); - } else { - /* - * See ./bin/kafka-server-start.sh and ./bin/kafka-run-class.sh - * Really hard to turn off jmxremote on kafka! And can't use default because - * uses 9999, which means could only run one kafka broker per server. - */ - jmxPort = String.valueOf(entity.getAttribute(KafkaBroker.INTERNAL_JMX_PORT)); - } - - return MutableMap.<String, String> builder() - .putAll(super.getShellEnvironment()) - .put("JMX_PORT", jmxPort) - .build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java deleted file mode 100644 index 3a24377..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; -import org.apache.brooklyn.api.event.AttributeSensor; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.BrooklynConfigKeys; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.group.Cluster; -import brooklyn.entity.group.DynamicCluster; -import brooklyn.entity.trait.Resizable; -import brooklyn.entity.trait.Startable; -import brooklyn.entity.zookeeper.ZooKeeperNode; -import brooklyn.event.basic.BasicAttributeSensor; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.util.time.Duration; - -/** - * Provides Kafka cluster functionality through a group of {@link KafkaBroker brokers} controlled - * by a single {@link KafkaZookeeper zookeeper} entity. - * <p> - * You can customise the Kafka zookeeper and brokers by supplying {@link EntitySpec entity specifications} - * to be used when creating them. An existing {@link Zookeeper} entity may also be provided instead of the - * Kafka zookeeper. - * <p> - * The contents of this entity are: - * <ul> - * <li>a {@link brooklyn.entity.group.DynamicCluster} of {@link KafkaBroker}s - * <li>a {@link KafkaZookeeper} or {@link Zookeeper} - * <li>a {@link org.apache.brooklyn.api.policy.Policy} to resize the broker cluster - * </ul> - * The {@link Group group} and {@link Resizable} interface methods are delegated to the broker cluster, so calling - * {@link Resizable#resize(Integer) resize} will change the number of brokers. - */ -@SuppressWarnings({ "unchecked", "rawtypes" }) -@Catalog(name="Kafka", description="Apache Kafka is a distributed publish-subscribe messaging system", iconUrl="classpath://brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg") -@ImplementedBy(KafkaClusterImpl.class) -public interface KafkaCluster extends Entity, Startable, Resizable, Group { - - @SetFromFlag("startTimeout") - ConfigKey<Duration> START_TIMEOUT = BrooklynConfigKeys.START_TIMEOUT; - - @SetFromFlag("initialSize") - ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(Cluster.INITIAL_SIZE, 1); - - /** Zookeeper for the cluster. If null a default be will created. */ - @SetFromFlag("zookeeper") - BasicAttributeSensorAndConfigKey<ZooKeeperNode> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<ZooKeeperNode>( - ZooKeeperNode.class, "kafka.cluster.zookeeper", "The zookeeper for the cluster; if null a default be will created"); - - /** Spec for creating the default Kafka zookeeper entity. */ - @SetFromFlag("zookeeperSpec") - BasicAttributeSensorAndConfigKey<EntitySpec<KafkaZooKeeper>> ZOOKEEPER_SPEC = new BasicAttributeSensorAndConfigKey( - EntitySpec.class, "kafka.cluster.zookeeperSpec", "Spec for creating the kafka zookeeper"); - - /** Spec for Kafka broker entities to be created. */ - @SetFromFlag("brokerSpec") - BasicAttributeSensorAndConfigKey<EntitySpec<KafkaBroker>> BROKER_SPEC = new BasicAttributeSensorAndConfigKey( - EntitySpec.class, "kafka.cluster.brokerSpec", "Spec for Kafka broker entiites to be created"); - - /** Underlying Kafka broker cluster. */ - AttributeSensor<DynamicCluster> CLUSTER = new BasicAttributeSensor<DynamicCluster>( - DynamicCluster.class, "kafka.cluster.brokerCluster", "Underlying Kafka broker cluster"); - - ZooKeeperNode getZooKeeper(); - - DynamicCluster getCluster(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java deleted file mode 100644 index b5b8449..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import java.util.Collection; -import java.util.List; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.enricher.Enrichers; -import brooklyn.entity.basic.AbstractEntity; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.group.DynamicCluster; -import brooklyn.entity.trait.Startable; -import brooklyn.entity.zookeeper.ZooKeeperNode; -import brooklyn.event.feed.ConfigToAttributes; -import brooklyn.util.collections.MutableList; -import brooklyn.util.exceptions.CompoundRuntimeException; - -import com.google.common.base.Objects; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -/** - * Implementation of a Kafka cluster containing a {@link KafkaZookeeper} node and a group of {@link KafkaBroker}s. - */ -public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster { - - public static final Logger log = LoggerFactory.getLogger(KafkaClusterImpl.class); - - public KafkaClusterImpl() { - } - - @Override - public void init() { - super.init(); - - setAttribute(SERVICE_UP, false); - ConfigToAttributes.apply(this, BROKER_SPEC); - ConfigToAttributes.apply(this, ZOOKEEPER); - ConfigToAttributes.apply(this, ZOOKEEPER_SPEC); - - log.debug("creating zookeeper child for {}", this); - ZooKeeperNode zookeeper = getAttribute(ZOOKEEPER); - if (zookeeper == null) { - EntitySpec<KafkaZooKeeper> zookeeperSpec = getAttribute(ZOOKEEPER_SPEC); - if (zookeeperSpec == null) { - log.debug("creating zookeeper using default spec for {}", this); - zookeeperSpec = EntitySpec.create(KafkaZooKeeper.class); - setAttribute(ZOOKEEPER_SPEC, zookeeperSpec); - } else { - log.debug("creating zookeeper using custom spec for {}", this); - } - zookeeper = addChild(zookeeperSpec); - if (Entities.isManaged(this)) Entities.manage(zookeeper); - setAttribute(ZOOKEEPER, zookeeper); - } - - log.debug("creating cluster child for {}", this); - EntitySpec<KafkaBroker> brokerSpec = getAttribute(BROKER_SPEC); - if (brokerSpec == null) { - log.debug("creating default broker spec for {}", this); - brokerSpec = EntitySpec.create(KafkaBroker.class); - setAttribute(BROKER_SPEC, brokerSpec); - } - // Relies on initialSize being inherited by DynamicCluster, because key id is identical - // We add the zookeeper configuration to the KafkaBroker specification here - DynamicCluster cluster = addChild(EntitySpec.create(DynamicCluster.class) - .configure("memberSpec", EntitySpec.create(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper))); - if (Entities.isManaged(this)) Entities.manage(cluster); - setAttribute(CLUSTER, cluster); - - connectSensors(); - } - - @Override - public ZooKeeperNode getZooKeeper() { - return getAttribute(ZOOKEEPER); - } - - @Override - public DynamicCluster getCluster() { - return getAttribute(CLUSTER); - } - - @Override - public void start(Collection<? extends Location> locations) { - if (isLegacyConstruction()) { - init(); - } - - if (locations.isEmpty()) locations = getLocations(); - Iterables.getOnlyElement(locations); // Assert just one - addLocations(locations); - - List<Entity> childrenToStart = MutableList.<Entity>of(getCluster()); - // Set the KafkaZookeeper entity as child of cluster, if it does not already have a parent - if (getZooKeeper().getParent() == null) { - addChild(getZooKeeper()); - } // And only start zookeeper if we are parent - if (Objects.equal(this, getZooKeeper().getParent())) childrenToStart.add(getZooKeeper()); - Entities.invokeEffector(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).getUnchecked(); - } - - @Override - public void stop() { - List<Exception> errors = Lists.newArrayList(); - if (getZooKeeper() != null && Objects.equal(this, getZooKeeper().getParent())) { - try { - getZooKeeper().stop(); - } catch (Exception e) { - errors.add(e); - } - } - if (getCurrentSize() > 0) { - try { - getCluster().stop(); - } catch (Exception e) { - errors.add(e); - } - } - - clearLocations(); - setAttribute(SERVICE_UP, false); - - if (errors.size() != 0) { - throw new CompoundRuntimeException("Error stopping Kafka cluster", errors); - } - } - - @Override - public void restart() { - // TODO prod the entities themselves to restart, instead? - Collection<Location> locations = Lists.newArrayList(getLocations()); - - stop(); - start(locations); - } - - void connectSensors() { - addEnricher(Enrichers.builder() - .propagatingAllBut(SERVICE_UP) - .from(getCluster()) - .build()); - addEnricher(Enrichers.builder() - .propagating(SERVICE_UP) - .from(getZooKeeper()) - .build()); - } - - /* - * All Group and Resizable interface methods are delegated to the broker cluster. - */ - - /** {@inheritDoc} */ - @Override - public Collection<Entity> getMembers() { return getCluster().getMembers(); } - - /** {@inheritDoc} */ - @Override - public boolean hasMember(Entity member) { return getCluster().hasMember(member); } - - /** {@inheritDoc} */ - @Override - public boolean addMember(Entity member) { return getCluster().addMember(member); } - - /** {@inheritDoc} */ - @Override - public boolean removeMember(Entity member) { return getCluster().removeMember(member); } - - /** {@inheritDoc} */ - @Override - public Integer getCurrentSize() { return getCluster().getCurrentSize(); } - - /** {@inheritDoc} */ - @Override - public Integer resize(Integer desiredSize) { return getCluster().resize(desiredSize); } - - @Override - public <T extends Entity> T addMemberChild(EntitySpec<T> spec) { return getCluster().addMemberChild(spec); } - - @Override - public <T extends Entity> T addMemberChild(T child) { return getCluster().addMemberChild(child); } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java deleted file mode 100644 index 106690a..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.annotation.Effector; -import brooklyn.entity.annotation.EffectorParam; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.zookeeper.ZooKeeperNode; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.BasicConfigKey; -import brooklyn.util.time.Duration; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka zookeeper instance. - */ -@ImplementedBy(KafkaZooKeeperImpl.class) -public interface KafkaZooKeeper extends ZooKeeperNode, Kafka { - - @SetFromFlag("startTimeout") - ConfigKey<Duration> START_TIMEOUT = SoftwareProcess.START_TIMEOUT; - - /** The Kafka version, not the Zookeeper version. */ - @SetFromFlag("version") - ConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION; - - /** The Kafka version, not the Zookeeper version. */ - @SetFromFlag("downloadUrl") - BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = Kafka.DOWNLOAD_URL; - - /** Location of the kafka configuration file template to be copied to the server. */ - @SetFromFlag("kafkaZookeeperConfig") - ConfigKey<String> KAFKA_ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class, - "kafka.zookeeper.configTemplate", "Kafka zookeeper configuration template (in freemarker format)", - "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties"); - - @Effector(description = "Create a topic with a single partition and only one replica") - void createTopic(@EffectorParam(name = "topic") String topic); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java deleted file mode 100644 index 97edc8b..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import brooklyn.entity.java.JavaSoftwareProcessDriver; - -public interface KafkaZooKeeperDriver extends JavaSoftwareProcessDriver { - - Integer getZookeeperPort(); - - void createTopic(String topic); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java deleted file mode 100644 index c9a1148..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import brooklyn.entity.annotation.EffectorParam; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.zookeeper.AbstractZooKeeperImpl; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Kafka zookeeper instance. - */ -public class KafkaZooKeeperImpl extends AbstractZooKeeperImpl implements KafkaZooKeeper { - - @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(KafkaZooKeeperImpl.class); - - public KafkaZooKeeperImpl() { - } - - @Override - public Class<?> getDriverInterface() { - return KafkaZooKeeperDriver.class; - } - - @Override - public void createTopic(String topic) { - ((KafkaZooKeeperDriver)getDriver()).createTopic(topic); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java deleted file mode 100644 index dc7688f..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import java.util.Map; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.Attributes; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; - -import static brooklyn.util.text.StringEscapes.BashStringEscapes.escapeLiteralForDoubleQuotedBash; - -public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements KafkaZooKeeperDriver { - - public KafkaZooKeeperSshDriver(KafkaZooKeeperImpl entity, SshMachineLocation machine) { - super(entity, machine); - } - - @Override - protected Map<String, Integer> getPortMap() { - return MutableMap.of("zookeeperPort", getZookeeperPort()); - } - - @Override - protected ConfigKey<String> getConfigTemplateKey() { - return KafkaZooKeeper.KAFKA_ZOOKEEPER_CONFIG_TEMPLATE; - } - - @Override - protected String getConfigFileName() { - return "zookeeper.properties"; - } - - @Override - protected String getLaunchScriptName() { - return "zookeeper-server-start.sh"; - } - - @Override - protected String getTopicsScriptName() { - return "kafka-topics.sh"; - } - - @Override - protected String getProcessIdentifier() { - return "quorum\\.QuorumPeerMain"; - } - - @Override - public Integer getZookeeperPort() { - return getEntity().getAttribute(KafkaZooKeeper.ZOOKEEPER_PORT); - } - - @Override - public void createTopic(String topic) { - String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + ":" + getZookeeperPort(); - newScript(CUSTOMIZING) - .failOnNonZeroResultCode() - .body.append(String.format("./bin/%s --create --zookeeper \"%s\" --replication-factor 1 --partitions 1 --topic \"%s\"", - getTopicsScriptName(), - escapeLiteralForDoubleQuotedBash(zookeeperUrl), - escapeLiteralForDoubleQuotedBash(topic))) - .execute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBroker.java deleted file mode 100644 index a2af8a4..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBroker.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import java.util.Map; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.java.UsesJmx; -import brooklyn.entity.messaging.MessageBroker; -import brooklyn.entity.messaging.amqp.AmqpServer; -import brooklyn.entity.messaging.jms.JMSBroker; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.BasicConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Qpid broker instance, using AMQP 0-10. - */ -@Catalog(name="Qpid Broker", description="Apache Qpid is an open-source messaging system, implementing the Advanced Message Queuing Protocol (AMQP)", iconUrl="classpath:///qpid-logo.jpeg") -@ImplementedBy(QpidBrokerImpl.class) -public interface QpidBroker extends SoftwareProcess, MessageBroker, UsesJmx, AmqpServer, JMSBroker<QpidQueue, QpidTopic> { - - /* Qpid runtime file locations for convenience. */ - - public static final String CONFIG_XML = "etc/config.xml"; - public static final String VIRTUALHOSTS_XML = "etc/virtualhosts.xml"; - public static final String PASSWD = "etc/passwd"; - - @SetFromFlag("version") - public static final ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.20"); - - @SetFromFlag("downloadUrl") - public static final BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - Attributes.DOWNLOAD_URL, "http://download.nextag.com/apache/qpid/${version}/qpid-java-broker-${version}.tar.gz"); - - @SetFromFlag("amqpPort") - public static final PortAttributeSensorAndConfigKey AMQP_PORT = AmqpServer.AMQP_PORT; - - @SetFromFlag("virtualHost") - public static final BasicAttributeSensorAndConfigKey<String> VIRTUAL_HOST_NAME = AmqpServer.VIRTUAL_HOST_NAME; - - @SetFromFlag("amqpVersion") - public static final BasicAttributeSensorAndConfigKey<String> AMQP_VERSION = new BasicAttributeSensorAndConfigKey<String>( - AmqpServer.AMQP_VERSION, AmqpServer.AMQP_0_10); - - @SetFromFlag("httpManagementPort") - public static final PortAttributeSensorAndConfigKey HTTP_MANAGEMENT_PORT = new PortAttributeSensorAndConfigKey("qpid.http-management.port", "Qpid HTTP management plugin port"); - - @SetFromFlag("jmxUser") - public static final BasicAttributeSensorAndConfigKey<String> JMX_USER = new BasicAttributeSensorAndConfigKey<String>( - UsesJmx.JMX_USER, "admin"); - - @SetFromFlag("jmxPassword") - public static final BasicAttributeSensorAndConfigKey<String> JMX_PASSWORD = new BasicAttributeSensorAndConfigKey<String>( - UsesJmx.JMX_PASSWORD, "admin"); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java deleted file mode 100644 index baf487d..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidBrokerImpl.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import static java.lang.String.format; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.Entities; -import brooklyn.entity.java.JmxSupport; -import brooklyn.entity.messaging.jms.JMSBrokerImpl; -import brooklyn.event.feed.jmx.JmxAttributePollConfig; -import brooklyn.event.feed.jmx.JmxFeed; -import brooklyn.event.feed.jmx.JmxHelper; -import brooklyn.util.exceptions.Exceptions; - -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Objects.ToStringHelper; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Qpid broker instance, using AMQP 0-10. - */ -public class QpidBrokerImpl extends JMSBrokerImpl<QpidQueue, QpidTopic> implements QpidBroker { - private static final Logger log = LoggerFactory.getLogger(QpidBrokerImpl.class); - - private volatile JmxFeed jmxFeed; - - public QpidBrokerImpl() { - super(); - } - - public String getVirtualHost() { return getAttribute(VIRTUAL_HOST_NAME); } - public String getAmqpVersion() { return getAttribute(AMQP_VERSION); } - public Integer getAmqpPort() { return getAttribute(AMQP_PORT); } - - public void setBrokerUrl() { - String urlFormat = "amqp://guest:guest@/%s?brokerlist='tcp://%s:%d'"; - setAttribute(BROKER_URL, format(urlFormat, getAttribute(VIRTUAL_HOST_NAME), getAttribute(HOSTNAME), getAttribute(AMQP_PORT))); - } - - @Override - public void init() { - super.init(); - new JmxSupport(this, null).recommendJmxRmiCustomAgent(); - } - - public void waitForServiceUp(long duration, TimeUnit units) { - super.waitForServiceUp(duration, units); - - // Also wait for the MBean to exist (as used when creating queue/topic) - JmxHelper helper = new JmxHelper(this); - try { - String virtualHost = getConfig(QpidBroker.VIRTUAL_HOST_NAME); - ObjectName virtualHostManager = new ObjectName(format("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"%s\"", virtualHost)); - helper.connect(); - helper.assertMBeanExistsEventually(virtualHostManager, units.toMillis(duration)); - } catch (MalformedObjectNameException e) { - throw Exceptions.propagate(e); - } catch (IOException e) { - throw Exceptions.propagate(e); - } finally { - if (helper != null) helper.terminate(); - } - } - - public QpidQueue createQueue(Map properties) { - QpidQueue result = addChild(EntitySpec.create(QpidQueue.class).configure(properties)); - Entities.manage(result); - result.create(); - return result; - } - - public QpidTopic createTopic(Map properties) { - QpidTopic result = addChild(EntitySpec.create(QpidTopic.class).configure(properties)); - Entities.manage(result); - result.create(); - return result; - } - - @Override - public Class getDriverInterface() { - return QpidDriver.class; - } - - @Override - protected void connectSensors() { - super.connectSensors(); - String serverInfoMBeanName = "org.apache.qpid:type=ServerInformation,name=ServerInformation"; - - jmxFeed = JmxFeed.builder() - .entity(this) - .period(500, TimeUnit.MILLISECONDS) - .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP) - .objectName(serverInfoMBeanName) - .attributeName("ProductVersion") - .onSuccess(new Function<Object,Boolean>() { - private boolean hasWarnedOfVersionMismatch; - @Override public Boolean apply(Object input) { - if (input == null) return false; - if (!hasWarnedOfVersionMismatch && !getConfig(QpidBroker.SUGGESTED_VERSION).equals(input)) { - log.warn("Qpid version mismatch: ProductVersion is {}, requested version is {}", input, getConfig(QpidBroker.SUGGESTED_VERSION)); - hasWarnedOfVersionMismatch = true; - } - return true; - }}) - .onException(Functions.constant(false)) - .suppressDuplicates(true)) - .build(); - } - - @Override - public void disconnectSensors() { - super.disconnectSensors(); - if (jmxFeed != null) jmxFeed.stop(); - } - - @Override - protected ToStringHelper toStringHelper() { - return super.toStringHelper().add("amqpPort", getAmqpPort()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestination.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestination.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestination.java deleted file mode 100644 index 9250e2a..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestination.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import brooklyn.entity.messaging.amqp.AmqpExchange; -import brooklyn.entity.messaging.jms.JMSDestination; - -public interface QpidDestination extends JMSDestination, AmqpExchange { - - public void create(); - - /** - * Return the AMQP name for the queue. - */ - public String getQueueName(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java deleted file mode 100644 index 0dc6390..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDestinationImpl.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import static java.lang.String.format; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.brooklyn.api.entity.basic.EntityLocal; -import org.apache.brooklyn.core.util.flags.SetFromFlag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.java.UsesJmx; -import brooklyn.entity.messaging.amqp.AmqpServer; -import brooklyn.entity.messaging.jms.JMSDestinationImpl; -import brooklyn.event.feed.jmx.JmxFeed; -import brooklyn.event.feed.jmx.JmxHelper; -import brooklyn.util.exceptions.Exceptions; - -public abstract class QpidDestinationImpl extends JMSDestinationImpl implements QpidDestination { - public static final Logger log = LoggerFactory.getLogger(QpidDestination.class); - - @SetFromFlag - String virtualHost; - - protected ObjectName virtualHostManager; - protected ObjectName exchange; - protected transient JmxHelper jmxHelper; - protected volatile JmxFeed jmxFeed; - - public QpidDestinationImpl() { - } - - @Override - public QpidBroker getParent() { - return (QpidBroker) super.getParent(); - } - - @Override - public void onManagementStarting() { - super.onManagementStarting(); - - // TODO Would be nice to share the JmxHelper for all destinations, so just one connection. - // But tricky for if brooklyn were distributed - try { - if (virtualHost == null) virtualHost = getConfig(QpidBroker.VIRTUAL_HOST_NAME); - setAttribute(QpidBroker.VIRTUAL_HOST_NAME, virtualHost); - virtualHostManager = new ObjectName(format("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"%s\"", virtualHost)); - jmxHelper = new JmxHelper((EntityLocal)getParent()); - } catch (MalformedObjectNameException e) { - throw Exceptions.propagate(e); - } - } - - @Override - protected void disconnectSensors() { - if (jmxFeed != null) jmxFeed.stop(); - } - - @Override - public void create() { - jmxHelper.operation(virtualHostManager, "createNewQueue", getName(), getParent().getAttribute(UsesJmx.JMX_USER), true); - jmxHelper.operation(exchange, "createNewBinding", getName(), getName()); - connectSensors(); - } - - @Override - public void delete() { - jmxHelper.operation(exchange, "removeBinding", getName(), getName()); - jmxHelper.operation(virtualHostManager, "deleteQueue", getName()); - disconnectSensors(); - } - - @Override - public String getQueueName() { - - if (AmqpServer.AMQP_0_10.equals(getParent().getAmqpVersion())) { - return String.format("'%s'/'%s'; { assert: never }", getExchangeName(), getName()); - } else { - return getName(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDriver.java deleted file mode 100644 index 26d0e5a..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidDriver.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import brooklyn.entity.java.JavaSoftwareProcessDriver; - -public interface QpidDriver extends JavaSoftwareProcessDriver { - - Integer getAmqpPort(); - - String getAmqpVersion(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueue.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueue.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueue.java deleted file mode 100644 index 42c3065..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueue.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; - -import brooklyn.entity.messaging.Queue; - -@ImplementedBy(QpidQueueImpl.class) -public interface QpidQueue extends QpidDestination, Queue { - @Override - public String getExchangeName(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueueImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueueImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueueImpl.java deleted file mode 100644 index 1774583..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidQueueImpl.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import static java.lang.String.format; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import brooklyn.entity.messaging.amqp.AmqpExchange; -import brooklyn.event.feed.jmx.JmxAttributePollConfig; -import brooklyn.event.feed.jmx.JmxFeed; -import brooklyn.util.exceptions.Exceptions; - -public class QpidQueueImpl extends QpidDestinationImpl implements QpidQueue { - public QpidQueueImpl() { - } - - @Override - public void onManagementStarting() { - super.onManagementStarting(); - setAttribute(QUEUE_NAME, getName()); - try { - exchange = new ObjectName(format("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"%s\",name=\"%s\",ExchangeType=direct", virtualHost, getExchangeName())); - } catch (MalformedObjectNameException e) { - throw Exceptions.propagate(e); - } - } - - @Override - protected void connectSensors() { - String queue = format("org.apache.qpid:type=VirtualHost.Queue,VirtualHost=\"%s\",name=\"%s\"", virtualHost, getName()); - - jmxFeed = JmxFeed.builder() - .entity(this) - .helper(jmxHelper) - .pollAttribute(new JmxAttributePollConfig<Integer>(QUEUE_DEPTH_BYTES) - .objectName(queue) - .attributeName("QueueDepth")) - .pollAttribute(new JmxAttributePollConfig<Integer>(QUEUE_DEPTH_MESSAGES) - .objectName(queue) - .attributeName("MessageCount")) - .build(); - } - - @Override - public String getExchangeName() { - return AmqpExchange.DIRECT; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidSshDriver.java deleted file mode 100644 index 2b1a8ec..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidSshDriver.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import static java.lang.String.format; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.Entities; -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.net.Networking; -import brooklyn.util.os.Os; -import brooklyn.util.ssh.BashCommands; - -import com.google.common.collect.ImmutableMap; - -public class QpidSshDriver extends JavaSoftwareProcessSshDriver implements QpidDriver{ - - private static final Logger log = LoggerFactory.getLogger(QpidSshDriver.class); - - public QpidSshDriver(QpidBrokerImpl entity, SshMachineLocation machine) { - super(entity, machine); - } - - @Override - protected String getLogFileLocation() { return Os.mergePaths(getRunDir(), "log", "qpid.log"); } - - @Override - public Integer getAmqpPort() { return entity.getAttribute(QpidBroker.AMQP_PORT); } - - @Override - public String getAmqpVersion() { return entity.getAttribute(QpidBroker.AMQP_VERSION); } - - public Integer getHttpManagementPort() { return entity.getAttribute(QpidBroker.HTTP_MANAGEMENT_PORT); } - - @Override - public void preInstall() { - resolver = Entities.newDownloader(this); - setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("qpid-broker-%s", getVersion())))); - } - - @Override - public void install() { - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - - List<String> commands = new LinkedList<String>(); - commands.addAll( BashCommands.commandsToDownloadUrlsAs(urls, saveAs)); - commands.add(BashCommands.INSTALL_TAR); - commands.add("tar xzfv "+saveAs); - - newScript(INSTALLING) - .body.append(commands) - .execute(); - } - - @Override - public void customize() { - Networking.checkPortsValid(MutableMap.of("jmxPort", getJmxPort(), "amqpPort", getAmqpPort())); - newScript(CUSTOMIZING) - .body.append( - format("cp -R %s/{bin,etc,lib} .", getExpandedInstallDir()), - "mkdir lib/opt" - ) - .execute(); - } - - @Override - public void launch() { - newScript(ImmutableMap.of(USE_PID_FILE, false), LAUNCHING) - .body.append("nohup ./bin/qpid-server -b '*' > qpid-server-launch.log 2>&1 &") - .execute(); - } - - public String getPidFile() { return "qpid-server.pid"; } - - @Override - public boolean isRunning() { - return newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; - } - - @Override - public void stop() { - newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); - } - - @Override - public void kill() { - newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), KILLING).execute(); - } - - @Override - public Map<String, Object> getCustomJavaSystemProperties() { - return MutableMap.<String, Object>builder() - .putAll(super.getCustomJavaSystemProperties()) - .put("connector.port", getAmqpPort()) - .put("management.enabled", "true") - .put("management.jmxport.registryServer", getRmiRegistryPort()) - .put("management.jmxport.connectorServer", getJmxPort()) - .put("management.http.enabled", Boolean.toString(getHttpManagementPort() != null)) - .putIfNotNull("management.http.port", getHttpManagementPort()) - .build(); - } - - @Override - public Map<String, String> getShellEnvironment() { - return MutableMap.<String, String>builder() - .putAll(super.getShellEnvironment()) - .put("QPID_HOME", getRunDir()) - .put("QPID_WORK", getRunDir()) - .renameKey("JAVA_OPTS", "QPID_OPTS") - .build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopic.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopic.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopic.java deleted file mode 100644 index 3ec1c06..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopic.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; - -import brooklyn.entity.messaging.Topic; - -@ImplementedBy(QpidTopicImpl.class) -public interface QpidTopic extends QpidDestination, Topic { -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopicImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopicImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopicImpl.java deleted file mode 100644 index 1d8e3c3..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/qpid/QpidTopicImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.qpid; - -import static java.lang.String.format; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import brooklyn.entity.messaging.amqp.AmqpExchange; -import brooklyn.util.exceptions.Exceptions; - -public class QpidTopicImpl extends QpidDestinationImpl implements QpidTopic { - - public QpidTopicImpl() { - } - - @Override - public void onManagementStarting() { - super.onManagementStarting(); - setAttribute(TOPIC_NAME, getName()); - try { - String virtualHost = getParent().getVirtualHost(); - exchange = new ObjectName(format("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"%s\",name=\"%s\",ExchangeType=topic", virtualHost, getExchangeName())); - } catch (MalformedObjectNameException e) { - throw Exceptions.propagate(e); - } - } - - // TODO sensors - @Override - public void connectSensors() { - } - - @Override - public String getExchangeName() { return AmqpExchange.TOPIC; } - - @Override - public String getTopicName() { return getQueueName(); } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBroker.java deleted file mode 100644 index a70bfce..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBroker.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.rabbit; - -import java.util.Map; - -import com.google.common.annotations.Beta; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.proxying.ImplementedBy; -import org.apache.brooklyn.api.event.AttributeSensor; -import org.apache.brooklyn.core.util.flags.SetFromFlag; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.messaging.MessageBroker; -import brooklyn.entity.messaging.amqp.AmqpServer; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.BasicConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; -import brooklyn.event.basic.Sensors; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Rabbit MQ broker instance, using AMQP 0-9-1. - */ -@Catalog(name="RabbitMQ Broker", description="RabbitMQ is an open source message broker software (i.e. message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP) standard", iconUrl="classpath:///RabbitMQLogo.png") -@ImplementedBy(RabbitBrokerImpl.class) -public interface RabbitBroker extends SoftwareProcess, MessageBroker, AmqpServer { - - @SetFromFlag("version") - public static final ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.8.7"); - - @SetFromFlag("downloadUrl") - public static final BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - SoftwareProcess.DOWNLOAD_URL, "http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/rabbitmq-server-generic-unix-${version}.tar.gz"); - - @SetFromFlag("erlangVersion") - public static final BasicConfigKey<String> ERLANG_VERSION = new BasicConfigKey<String>(String.class, "erlang.version", "Erlang runtime version", "R15B"); - - @SetFromFlag("rabbitmqConfigTemplateUrl") - ConfigKey<String> CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( - "rabbitmq.templateUrl", "Template file (in freemarker format) for the rabbitmq.config config file", - "classpath://brooklyn/entity/messaging/rabbit/rabbitmq.config"); - - @SetFromFlag("amqpPort") - public static final PortAttributeSensorAndConfigKey AMQP_PORT = AmqpServer.AMQP_PORT; - - @SetFromFlag("virtualHost") - public static final BasicAttributeSensorAndConfigKey<String> VIRTUAL_HOST_NAME = AmqpServer.VIRTUAL_HOST_NAME; - - @SetFromFlag("amqpVersion") - public static final BasicAttributeSensorAndConfigKey<String> AMQP_VERSION = new BasicAttributeSensorAndConfigKey<String>( - AmqpServer.AMQP_VERSION, AmqpServer.AMQP_0_9_1); - - @SetFromFlag("managmentPort") - public static final PortAttributeSensorAndConfigKey MANAGEMENT_PORT = new PortAttributeSensorAndConfigKey( - "rabbitmq.management.port", "Port on which management interface will be available", "15672+"); - - public static AttributeSensor<String> MANAGEMENT_URL = Sensors.newStringSensor( - "rabbitmq.management.url", "Management URL is only available if management plugin flag is true"); - - @SetFromFlag("enableManagementPlugin") - public static final ConfigKey<Boolean> ENABLE_MANAGEMENT_PLUGIN = ConfigKeys.newBooleanConfigKey( - "rabbitmq.management.plugin", "Management plugin will be enabled", false); - - RabbitQueue createQueue(Map properties); - - // TODO required by RabbitDestination due to close-coupling between that and RabbitBroker; how best to improve? - @Beta - Map<String, String> getShellEnvironment(); - - @Beta - String getRunDir(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java deleted file mode 100644 index 5d96d92..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitBrokerImpl.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.rabbit; - -import static java.lang.String.format; - -import java.util.Map; - -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects.ToStringHelper; - -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.SoftwareProcessImpl; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Rabbit MQ broker instance, using AMQP 0-9-1. - */ -public class RabbitBrokerImpl extends SoftwareProcessImpl implements RabbitBroker { - private static final Logger log = LoggerFactory.getLogger(RabbitBrokerImpl.class); - - public String getVirtualHost() { return getAttribute(VIRTUAL_HOST_NAME); } - public String getAmqpVersion() { return getAttribute(AMQP_VERSION); } - public Integer getAmqpPort() { return getAttribute(AMQP_PORT); } - - public RabbitBrokerImpl() { - super(); - } - - @Override - public RabbitDriver getDriver() { - return (RabbitDriver) super.getDriver(); - } - - @Override - public Map<String, String> getShellEnvironment() { - return getDriver().getShellEnvironment(); - } - - @Override - public String getRunDir() { - return getDriver().getRunDir(); - } - - @Override - protected void postStart() { - super.postStart(); - - getDriver().configure(); - - // TODO implement this using AMQP connection, no external mechanism available - // queueNames.each { String name -> addQueue(name) } - } - - public void setBrokerUrl() { - String urlFormat = "amqp://guest:guest@%s:%d/%s"; - setAttribute(BROKER_URL, format(urlFormat, getAttribute(HOSTNAME), getAttribute(AMQP_PORT), getAttribute(VIRTUAL_HOST_NAME))); - } - - public RabbitQueue createQueue(Map properties) { - RabbitQueue result = addChild(EntitySpec.create(RabbitQueue.class).configure(properties)); - Entities.manage(result); - result.create(); - return result; - } - - @Override - public Class<? extends RabbitDriver> getDriverInterface() { - return RabbitDriver.class; - } - - @Override - protected void connectSensors() { - super.connectSensors(); - - connectServiceUpIsRunning(); - - setBrokerUrl(); - - if (getEnableManagementPlugin()) { - setAttribute(MANAGEMENT_URL, format("http://%s:%s/", getAttribute(HOSTNAME), getAttribute(MANAGEMENT_PORT))); - } - } - - @Override - public void disconnectSensors() { - super.disconnectSensors(); - disconnectServiceUpIsRunning(); - } - - public boolean getEnableManagementPlugin() { - return Boolean.TRUE.equals(getConfig(ENABLE_MANAGEMENT_PLUGIN)); - } - - public Integer getManagementPort() { - return getAttribute(MANAGEMENT_PORT); - } - - @Override - protected ToStringHelper toStringHelper() { - return super.toStringHelper().add("amqpPort", getAmqpPort()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDestination.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDestination.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDestination.java deleted file mode 100644 index 14e0e47..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDestination.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.rabbit; - -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.AbstractEntity; -import brooklyn.entity.messaging.amqp.AmqpExchange; -import org.apache.brooklyn.location.basic.SshMachineLocation; - -import com.google.common.base.Objects.ToStringHelper; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; - -public abstract class RabbitDestination extends AbstractEntity implements AmqpExchange { - public static final Logger log = LoggerFactory.getLogger(RabbitDestination.class); - - private String virtualHost; - private String exchange; - protected SshMachineLocation machine; - protected Map<String,String> shellEnvironment; - - public RabbitDestination() { - } - - @Override - public void onManagementStarting() { - super.onManagementStarting(); - - exchange = (getConfig(EXCHANGE_NAME) != null) ? getConfig(EXCHANGE_NAME) : getDefaultExchangeName(); - virtualHost = getConfig(RabbitBroker.VIRTUAL_HOST_NAME); - setAttribute(RabbitBroker.VIRTUAL_HOST_NAME, virtualHost); - - machine = (SshMachineLocation) Iterables.find(getParent().getLocations(), Predicates.instanceOf(SshMachineLocation.class)); - shellEnvironment = getParent().getShellEnvironment(); - } - - // FIXME Should return RabbitBroker; won't work if gets a proxy rather than "real" entity - @Override - public RabbitBroker getParent() { - return (RabbitBroker) super.getParent(); - } - - public void create() { - connectSensors(); - } - - public void delete() { - disconnectSensors(); - } - - protected void connectSensors() { } - - protected void disconnectSensors() { } - - public String getVirtualHost() { - return virtualHost; - } - - @Override - public String getExchangeName() { - return exchange; - } - - public String getDefaultExchangeName() { - return AmqpExchange.DIRECT; - } - - @Override - protected ToStringHelper toStringHelper() { - return super.toStringHelper().add("virtualHost", getParent().getVirtualHost()).add("exchange", getExchangeName()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDriver.java deleted file mode 100644 index da07477..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitDriver.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.rabbit; - -import java.util.Map; - -import brooklyn.entity.basic.SoftwareProcessDriver; - -public interface RabbitDriver extends SoftwareProcessDriver { - - public void configure(); - - public Map<String, String> getShellEnvironment(); - - public String getRunDir(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitQueue.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitQueue.java b/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitQueue.java deleted file mode 100644 index 31f1876..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/rabbit/RabbitQueue.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.rabbit; - -import brooklyn.entity.messaging.Queue; -import brooklyn.event.feed.ssh.SshFeed; -import brooklyn.event.feed.ssh.SshPollConfig; -import brooklyn.event.feed.ssh.SshPollValue; - -import com.google.common.base.Function; -import com.google.common.base.Functions; - -public class RabbitQueue extends RabbitDestination implements Queue { - - private SshFeed sshFeed; - - public RabbitQueue() { - } - - public String getName() { - return getDisplayName(); - } - - @Override - public void create() { - setAttribute(QUEUE_NAME, getName()); - super.create(); - } - - @Override - protected void connectSensors() { - String runDir = getParent().getRunDir(); - String cmd = String.format("%s/sbin/rabbitmqctl list_queues -p /%s | grep '%s'", runDir, getVirtualHost(), getQueueName()); - - sshFeed = SshFeed.builder() - .entity(this) - .machine(machine) - .poll(new SshPollConfig<Integer>(QUEUE_DEPTH_BYTES) - .env(shellEnvironment) - .command(cmd) - .onFailure(Functions.constant(-1)) - .onSuccess(new Function<SshPollValue, Integer>() { - @Override public Integer apply(SshPollValue input) { - return 0; // TODO parse out queue depth from output - }})) - .poll(new SshPollConfig<Integer>(QUEUE_DEPTH_MESSAGES) - .env(shellEnvironment) - .command(cmd) - .onFailure(Functions.constant(-1)) - .onSuccess(new Function<SshPollValue, Integer>() { - @Override public Integer apply(SshPollValue input) { - return 0; // TODO parse out queue depth from output - }})) - .build(); - } - - @Override - protected void disconnectSensors() { - if (sshFeed != null) sshFeed.stop(); - super.disconnectSensors(); - } - - /** - * Return the AMQP name for the queue. - */ - public String getQueueName() { - return getName(); - } -}
