Changes based on review comments, including: - Updating to use latest 0.5.0 APIs - Adding general Zookeeper entity interface - Make KafkaCluster implement Group
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/64486e44 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/64486e44 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/64486e44 Branch: refs/heads/0.5.0 Commit: 64486e44f0cf633f7306718ec85cdaebcb3a2435 Parents: 9825780 Author: Andrew Kennedy <[email protected]> Authored: Mon Apr 1 19:00:44 2013 +0100 Committer: Andrew Kennedy <[email protected]> Committed: Fri Apr 19 10:36:07 2013 +0100 ---------------------------------------------------------------------- .../java/brooklyn/demo/KafkaClusterExample.java | 10 +- .../brooklyn/demo/SimpleCassandraCluster.java | 2 +- .../brooklyn/demo/SimpleCouchDBCluster.java | 2 +- .../java/brooklyn/demo/SimpleRedisCluster.java | 2 +- .../kafka/AbstractfKafkaSshDriver.java | 2 +- .../entity/messaging/kafka/KafkaBroker.java | 12 +- .../entity/messaging/kafka/KafkaBrokerImpl.java | 78 ++++------ .../messaging/kafka/KafkaBrokerSshDriver.java | 2 +- .../entity/messaging/kafka/KafkaCluster.java | 83 +++-------- .../messaging/kafka/KafkaClusterImpl.java | 122 ++++++++-------- .../entity/messaging/kafka/KafkaZookeeper.java | 28 ++-- .../messaging/kafka/KafkaZookeeperImpl.java | 100 +------------ .../kafka/KafkaZookeeperSshDriver.java | 2 +- .../entity/zookeeper/AbstractZookeeperImpl.java | 122 ++++++++++++++++ .../brooklyn/entity/zookeeper/Zookeeper.java | 50 +++++++ .../activemq/ActiveMQIntegrationTest.groovy | 10 +- .../messaging/kafka/KafkaIntegrationTest.groovy | 126 ---------------- .../messaging/kafka/KafkaIntegrationTest.java | 144 +++++++++++++++++++ .../entity/messaging/kafka/KafkaSupport.java | 24 +++- 19 files changed, 487 insertions(+), 434 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java ---------------------------------------------------------------------- diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java index fae6bb6..06bbbed 100644 --- a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java +++ b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java @@ -5,7 +5,7 @@ import java.util.List; import brooklyn.entity.basic.ApplicationBuilder; import brooklyn.entity.basic.Entities; import brooklyn.entity.messaging.kafka.KafkaCluster; -import brooklyn.entity.proxying.BasicEntitySpec; +import brooklyn.entity.proxying.EntitySpecs; import brooklyn.launcher.BrooklynLauncher; import brooklyn.util.CommandLineUtil; @@ -18,10 +18,10 @@ public class KafkaClusterExample extends ApplicationBuilder { /** Configure the application. */ protected void doBuild() { - createChild(BasicEntitySpec.newInstance(KafkaCluster.class) + addChild(EntitySpecs.spec(KafkaCluster.class) + .configure("startTimeout", 300) // 5 minutes .configure("initialSize", 2)); - - appDisplayName("Kafka cluster application"); + // TODO set application display name? } public static void main(String[] argv) { @@ -30,7 +30,7 @@ public class KafkaClusterExample extends ApplicationBuilder { String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION); BrooklynLauncher launcher = BrooklynLauncher.newInstance() - .application(new KafkaClusterExample()) + .application(new KafkaClusterExample().appDisplayName("Kafka cluster application")) .webconsolePort(port) .location(location) .start(); http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java ---------------------------------------------------------------------- diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java index 50c62a8..b538ec7 100644 --- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java +++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCassandraCluster.java @@ -24,7 +24,7 @@ public class SimpleCassandraCluster extends ApplicationBuilder { /** Create entities. */ protected void doBuild() { - createChild(EntitySpecs.spec(CassandraCluster.class) + addChild(EntitySpecs.spec(CassandraCluster.class) .configure("initialSize", "2") .configure("clusterName", "Brooklyn") .configure("jmxPort", "11099+") http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java ---------------------------------------------------------------------- diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java index 179443e..5de676b 100644 --- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java +++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleCouchDBCluster.java @@ -24,7 +24,7 @@ public class SimpleCouchDBCluster extends ApplicationBuilder { /** Create entities. */ protected void doBuild() { - createChild(EntitySpecs.spec(CouchDBCluster.class) + addChild(EntitySpecs.spec(CouchDBCluster.class) .configure("initialSize", "2") .configure("clusterName", "Brooklyn") .configure("httpPort", "8000+")); http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java ---------------------------------------------------------------------- diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java index da80e39..0f818f5 100644 --- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java +++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/SimpleRedisCluster.java @@ -24,7 +24,7 @@ public class SimpleRedisCluster extends ApplicationBuilder { /** Create entities. */ protected void doBuild() { - createChild(EntitySpecs.spec(RedisCluster.class) + addChild(EntitySpecs.spec(RedisCluster.class) .configure("initialSize", "2") .configure("clusterName", "Brooklyn")); } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java index f6c7c8d..21e7092 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; import brooklyn.BrooklynVersion; import brooklyn.config.ConfigKey; import brooklyn.entity.basic.EntityLocal; -import brooklyn.entity.basic.lifecycle.CommonCommands; +import brooklyn.util.ssh.CommonCommands; import brooklyn.entity.drivers.downloads.DownloadResolver; import brooklyn.entity.java.JavaSoftwareProcessSshDriver; import brooklyn.location.basic.SshMachineLocation; http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java index 2a82b13..c2d7632 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java @@ -20,6 +20,7 @@ import brooklyn.entity.basic.SoftwareProcess; import brooklyn.entity.java.UsesJmx; import brooklyn.entity.messaging.MessageBroker; import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.entity.zookeeper.Zookeeper; import brooklyn.event.AttributeSensor; import brooklyn.event.basic.BasicAttributeSensor; import brooklyn.event.basic.BasicConfigKey; @@ -42,12 +43,13 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka PortAttributeSensorAndConfigKey KAFKA_PORT = new PortAttributeSensorAndConfigKey("kafka.port", "Kafka port", "9092+"); /** Location of the configuration file template to be copied to the server.*/ - @SetFromFlag("serverConfig") - ConfigKey<String> SERVER_CONFIG_TEMPLATE = new BasicConfigKey<String>( - String.class, "kafka.broker.configTemplate", "Server configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/server.properties"); + @SetFromFlag("kafkaServerConfig") + ConfigKey<String> KAFKA_BROKER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class, + "kafka.broker.configTemplate", "Kafka broker configuration template (in freemarker format)", + "classpath://brooklyn/entity/messaging/kafka/server.properties"); @SetFromFlag("zookeeper") - ConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "kafka.broker.zookeeper", "Kafka zookeeper entity"); + ConfigKey<Zookeeper> ZOOKEEPER = new BasicConfigKey<Zookeeper>(Zookeeper.class, "kafka.broker.zookeeper", "Kafka zookeeper entity"); AttributeSensor<Integer> BROKER_ID = new BasicAttributeSensor<Integer>(Integer.class, "kafka.broker.id", "Kafka unique broker ID"); @@ -66,6 +68,6 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka Integer getBrokerId(); - KafkaZookeeper getZookeeper(); + Zookeeper getZookeeper(); } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java index 0dedf9c..5f8add8 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java @@ -15,11 +15,7 @@ */ package brooklyn.entity.messaging.kafka; -import java.io.IOException; -import java.util.Collection; import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import javax.management.ObjectName; @@ -31,23 +27,24 @@ import brooklyn.entity.Entity; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.SoftwareProcessImpl; import brooklyn.entity.messaging.MessageBroker; -import brooklyn.event.feed.function.FunctionFeed; -import brooklyn.event.feed.function.FunctionPollConfig; +import brooklyn.entity.zookeeper.Zookeeper; import brooklyn.event.feed.jmx.JmxAttributePollConfig; import brooklyn.event.feed.jmx.JmxFeed; import brooklyn.event.feed.jmx.JmxHelper; import brooklyn.util.MutableMap; -import brooklyn.util.exceptions.Exceptions; import com.google.common.base.Functions; import com.google.common.base.Objects.ToStringHelper; -import com.google.common.collect.Sets; /** * An {@link brooklyn.entity.Entity} that represents a single Kafka broker instance. */ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker { + private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class); + private static final ObjectName SOCKET_SERVER_STATS_MBEAN = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats"); + + private volatile JmxFeed jmxFeed; public KafkaBrokerImpl() { super(); @@ -63,7 +60,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke } @Override - public void postConstruct() { + public void init() { setAttribute(BROKER_ID, Math.abs(hashCode())); // Must be positive for partitioning to work } @@ -74,7 +71,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke public Integer getBrokerId() { return getAttribute(BROKER_ID); } @Override - public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); } + public Zookeeper getZookeeper() { return getConfig(ZOOKEEPER); } public KafkaTopic createTopic(Map<?, ?> properties) { KafkaTopic result = new KafkaTopic(properties, this); @@ -88,98 +85,85 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke return KafkaBrokerDriver.class; } - private ObjectName socketServerStatsMbean = JmxHelper.createObjectName("kafka:type=kafka.SocketServerStats"); - private volatile FunctionFeed functionFeed; - private volatile JmxFeed jmxFeed; - - /** Wait for five minutes to start. */ - @Override - public void waitForServiceUp() { waitForServiceUp(5, TimeUnit.MINUTES); } - @Override public void waitForServiceUp(long duration, TimeUnit units) { super.waitForServiceUp(duration, units); // Wait for the MBean to exist - JmxHelper helper = null; + JmxHelper helper = new JmxHelper(this); try { - helper = new JmxHelper(this); - helper.connect(); - helper.assertMBeanExistsEventually(socketServerStatsMbean, units.toMillis(duration)); - } catch (IOException e) { - throw Exceptions.propagate(e); + helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration)); } finally { - if (helper != null) helper.disconnect(); + helper.disconnect(); } } @Override protected void connectSensors() { - functionFeed = FunctionFeed.builder() - .entity(this) - .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP) - .period(500, TimeUnit.MILLISECONDS) - .callable(new Callable<Boolean>() { - public Boolean call() throws Exception { - return getDriver().isRunning(); - } - }) - .onError(Functions.constant(Boolean.FALSE))) - .build(); + connectServiceUpIsRunning(); jmxFeed = JmxFeed.builder() .entity(this) .period(500, TimeUnit.MILLISECONDS) .pollAttribute(new JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT) - .objectName(socketServerStatsMbean) + .objectName(SOCKET_SERVER_STATS_MBEAN) .attributeName("NumFetchRequests") .onError(Functions.constant(-1l))) .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_FETCH_TIME) - .objectName(socketServerStatsMbean) + .objectName(SOCKET_SERVER_STATS_MBEAN) .attributeName("TotalFetchRequestMs") .onError(Functions.constant(-1l))) .pollAttribute(new JmxAttributePollConfig<Double>(MAX_FETCH_TIME) - .objectName(socketServerStatsMbean) + .objectName(SOCKET_SERVER_STATS_MBEAN) .attributeName("MaxFetchRequestMs") .onError(Functions.constant(-1.0d))) .pollAttribute(new JmxAttributePollConfig<Long>(PRODUCE_REQUEST_COUNT) - .objectName(socketServerStatsMbean) + .objectName(SOCKET_SERVER_STATS_MBEAN) .attributeName("NumProduceRequests") .onError(Functions.constant(-1l))) .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_PRODUCE_TIME) - .objectName(socketServerStatsMbean) + .objectName(SOCKET_SERVER_STATS_MBEAN) .attributeName("TotalProduceRequestMs") .onError(Functions.constant(-1l))) .pollAttribute(new JmxAttributePollConfig<Double>(MAX_PRODUCE_TIME) - .objectName(socketServerStatsMbean) + .objectName(SOCKET_SERVER_STATS_MBEAN) .attributeName("MaxProduceRequestMs") .onError(Functions.constant(-1.0d))) .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_RECEIVED) - .objectName(socketServerStatsMbean) + .objectName(SOCKET_SERVER_STATS_MBEAN) .attributeName("TotalBytesRead") .onError(Functions.constant(-1l))) .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_SENT) - .objectName(socketServerStatsMbean) + .objectName(SOCKET_SERVER_STATS_MBEAN) .attributeName("TotalBytesWritten") .onError(Functions.constant(-1l))) .build(); + + setBrokerUrl(); } @Override public void disconnectSensors() { super.disconnectSensors(); - if (functionFeed != null) functionFeed.stop(); + disconnectServiceUpIsRunning(); if (jmxFeed != null) jmxFeed.stop(); } @Override protected ToStringHelper toStringHelper() { - return super.toStringHelper().add("kafkaPort", getKafkaPort()); + return super.toStringHelper() + .add("kafkaPort", getKafkaPort()); } + /** Use the {@link #getZookeeper() zookeeper} details if available, otherwise use our own host and port. */ @Override public void setBrokerUrl() { - // TODO + Zookeeper zookeeper = getZookeeper(); + if (zookeeper != null) { + setAttribute(BROKER_URL, String.format("zookeeper://%s:%d", zookeeper.getAttribute(HOSTNAME), zookeeper.getZookeeperPort())); + } else { + setAttribute(BROKER_URL, String.format("kafka://%s:%d", getAttribute(HOSTNAME), getKafkaPort())); + } } } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java index 40e7234..40df6b4 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java @@ -34,7 +34,7 @@ public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements Kaf @Override protected ConfigKey<String> getConfigTemplateKey() { - return KafkaBroker.SERVER_CONFIG_TEMPLATE; + return KafkaBroker.KAFKA_BROKER_CONFIG_TEMPLATE; } @Override http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java index 96e46ff..d1e123a 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java @@ -18,16 +18,15 @@ package brooklyn.entity.messaging.kafka; import brooklyn.catalog.Catalog; import brooklyn.config.ConfigKey; import brooklyn.entity.Entity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.ConfigurableEntityFactory; +import brooklyn.entity.Group; import brooklyn.entity.basic.ConfigKeys; import brooklyn.entity.group.Cluster; import brooklyn.entity.group.DynamicCluster; -import brooklyn.entity.proxying.BasicEntitySpec; import brooklyn.entity.proxying.EntitySpec; import brooklyn.entity.proxying.ImplementedBy; import brooklyn.entity.trait.Resizable; import brooklyn.entity.trait.Startable; +import brooklyn.entity.zookeeper.Zookeeper; import brooklyn.event.AttributeSensor; import brooklyn.event.basic.BasicAttributeSensor; import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; @@ -35,93 +34,53 @@ import brooklyn.event.basic.BasicConfigKey; import brooklyn.util.flags.SetFromFlag; /** - * This entity contains the sub-groups and entities that go in to a single location (e.g. datacenter) - * to provide Kafka cluster functionality. + * Provides Kafka cluster functionality through a group of {@link KafkaBroker brokers} controlled + * by a single {@link KafkaZookeeper zookeeper} entity. * <p> - * You can customise the broker by customising the factory (by reference in calling code) - * or supplying your own factory (as a config flag). + * You can customise the Kafka zookeeper and brokers by supplying {@link EntitySpec entity specifications} + * to be used when creating them. An existing {@link Zookeeper} entity may also be provided instead of the + * Kafka zookeeper. * <p> - * The contents of this group entity are: + * The contents of this entity are: * <ul> * <li>a {@link brooklyn.entity.group.DynamicCluster} of {@link KafkaBroker}s - * <li>a {@link KafkaZookeeper} - * <li>a {@link brooklyn.policy.Policy} to resize the DynamicCluster + * <li>a {@link KafkaZookeeper} or {@link Zookeeper} + * <li>a {@link brooklyn.policy.Policy} to resize the broker cluster * </ul> + * The {@link Group group} and {@link Resizable} interface methods are delegated to the broker cluster, so calling + * {@link Resizable#resize(Integer) resize} will change the number of brokers. */ @SuppressWarnings({ "unchecked", "rawtypes" }) @Catalog(name="Kafka", description="Apache Kafka is a distributed publish-subscribe messaging system") @ImplementedBy(KafkaClusterImpl.class) -public interface KafkaCluster extends Entity, Startable, Resizable { - - class Spec<T extends KafkaCluster, S extends Spec<T,S>> extends BasicEntitySpec<T,S> { - - private static class ConcreteSpec extends Spec<KafkaCluster, ConcreteSpec> { - ConcreteSpec() { - super(KafkaCluster.class); - } - } - - public static Spec<KafkaCluster, ?> newInstance() { - return new ConcreteSpec(); - } - - protected Spec(Class<T> type) { - super(type); - } - - public S initialSize(int val) { - configure(INITIAL_SIZE, val); - return self(); - } - - public S zookeeper(KafkaZookeeper val) { - configure(ZOOKEEPER, val); - return self(); - } - - public S brokerSpec(EntitySpec<KafkaBroker> val) { - configure(BROKER_SPEC, val); - return self(); - } - - public S brokerFactory(ConfigurableEntityFactory<KafkaBroker> val) { - configure(BROKER_FACTORY, val); - return self(); - } - } +public interface KafkaCluster extends Entity, Startable, Resizable, Group { @SetFromFlag("startTimeout") - public static final ConfigKey<Integer> START_TIMEOUT = ConfigKeys.START_TIMEOUT; + ConfigKey<Integer> START_TIMEOUT = ConfigKeys.START_TIMEOUT; @SetFromFlag("initialSize") ConfigKey<Integer> INITIAL_SIZE = new BasicConfigKey<Integer>(Cluster.INITIAL_SIZE, 1); + /** Zookeeper for the cluster. If null a default be will created. */ @SetFromFlag("zookeeper") - BasicAttributeSensorAndConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<KafkaZookeeper>( - KafkaZookeeper.class, "kafka.cluster.zookeeper", "Kafka zookeeper for the cluster; if null a default will created"); + BasicAttributeSensorAndConfigKey<Zookeeper> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<Zookeeper>( + Zookeeper.class, "kafka.cluster.zookeeper", "The zookeeper for the cluster; if null a default be will created"); + /** Spec for creating the default Kafka zookeeper entity. */ @SetFromFlag("zookeeperSpec") BasicAttributeSensorAndConfigKey<EntitySpec<KafkaZookeeper>> ZOOKEEPER_SPEC = new BasicAttributeSensorAndConfigKey( EntitySpec.class, "kafka.cluster.zookeeperSpec", "Spec for creating the kafka zookeeper"); - /** Factory to create a Kafka broker, given flags */ - @SetFromFlag("brokerFactory") - BasicAttributeSensorAndConfigKey<ConfigurableEntityFactory<KafkaBroker>> BROKER_FACTORY = new BasicAttributeSensorAndConfigKey( - ConfigurableEntityFactory.class, "kafka.cluster.brokerFactory", "Factory to create a Kafka broker"); - - /** Spec for Kafka broker entiites to be created */ + /** Spec for Kafka broker entities to be created. */ @SetFromFlag("brokerSpec") BasicAttributeSensorAndConfigKey<EntitySpec<KafkaBroker>> BROKER_SPEC = new BasicAttributeSensorAndConfigKey( EntitySpec.class, "kafka.cluster.brokerSpec", "Spec for Kafka broker entiites to be created"); + /** Underlying Kafka broker cluster. */ AttributeSensor<DynamicCluster> CLUSTER = new BasicAttributeSensor<DynamicCluster>( DynamicCluster.class, "kafka.cluster.brokerCluster", "Underlying Kafka broker cluster"); - AttributeSensor<String> HOSTNAME = Attributes.HOSTNAME; - - KafkaZookeeper getZookeeper(); - - ConfigurableEntityFactory<KafkaBroker> getBrokerFactory(); + Zookeeper getZookeeper(); DynamicCluster getCluster(); http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java index efc14fc..1938efa 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java @@ -18,7 +18,6 @@ package brooklyn.entity.messaging.kafka; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,19 +25,19 @@ import org.slf4j.LoggerFactory; import brooklyn.enricher.basic.SensorPropagatingEnricher; import brooklyn.entity.Entity; import brooklyn.entity.basic.AbstractEntity; -import brooklyn.entity.basic.ConfigurableEntityFactory; import brooklyn.entity.basic.Entities; import brooklyn.entity.group.DynamicCluster; -import brooklyn.entity.proxying.BasicEntitySpec; import brooklyn.entity.proxying.EntitySpec; -import brooklyn.entity.proxying.WrappingEntitySpec; +import brooklyn.entity.proxying.EntitySpecs; import brooklyn.entity.trait.Startable; +import brooklyn.entity.zookeeper.Zookeeper; import brooklyn.event.feed.ConfigToAttributes; import brooklyn.location.Location; import brooklyn.util.MutableList; import brooklyn.util.MutableMap; -import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.CompoundRuntimeException; +import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -65,104 +64,97 @@ public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster { } @Override - public void postConstruct() { - ConfigToAttributes.apply(this, BROKER_FACTORY); + public void init() { ConfigToAttributes.apply(this, BROKER_SPEC); ConfigToAttributes.apply(this, ZOOKEEPER); ConfigToAttributes.apply(this, ZOOKEEPER_SPEC); log.debug("creating zookeeper child for {}", this); - KafkaZookeeper zookeeper = getAttribute(ZOOKEEPER); + Zookeeper zookeeper = getAttribute(ZOOKEEPER); if (zookeeper == null) { EntitySpec<KafkaZookeeper> zookeeperSpec = getAttribute(ZOOKEEPER_SPEC); if (zookeeperSpec == null) { log.debug("creating zookeeper using default spec for {}", this); - zookeeperSpec = BasicEntitySpec.newInstance(KafkaZookeeper.class); + zookeeperSpec = EntitySpecs.spec(KafkaZookeeper.class); setAttribute(ZOOKEEPER_SPEC, zookeeperSpec); } else { log.debug("creating zookeeper using custom spec for {}", this); } - zookeeper = getEntityManager().createEntity(WrappingEntitySpec.newInstance(zookeeperSpec).parent(this)); + zookeeper = addChild(zookeeperSpec); if (Entities.isManaged(this)) Entities.manage(zookeeper); setAttribute(ZOOKEEPER, zookeeper); } log.debug("creating cluster child for {}", this); - ConfigurableEntityFactory<KafkaBroker> brokerFactory = getAttribute(BROKER_FACTORY); EntitySpec<KafkaBroker> brokerSpec = getAttribute(BROKER_SPEC); - if (brokerFactory == null && brokerSpec == null) { + if (brokerSpec == null) { log.debug("creating default broker spec for {}", this); - brokerSpec = BasicEntitySpec.newInstance(KafkaBroker.class); + brokerSpec = EntitySpecs.spec(KafkaBroker.class); setAttribute(BROKER_SPEC, brokerSpec); } - // Note relies on initial_size being inherited by DynamicCluster, because key id is identical - // We add the zookeeper configuration to the KafkaBroker specification or factory here - Map<String,Object> flags; - if (brokerSpec != null) { - flags = MutableMap.<String, Object>of("memberSpec", WrappingEntitySpec.newInstance(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper)); - } else { - brokerFactory.configure(KafkaBroker.ZOOKEEPER, zookeeper); - flags = MutableMap.<String, Object>of("factory", brokerFactory); - } - DynamicCluster cluster = getEntityManager().createEntity(BasicEntitySpec.newInstance(DynamicCluster.class) - .parent(this) - .configure(flags)); + // Relies on initialSize being inherited by DynamicCluster, because key id is identical + // We add the zookeeper configuration to the KafkaBroker specification here + DynamicCluster cluster = addChild(EntitySpecs.spec(DynamicCluster.class) + .configure("memberSpec", EntitySpecs.wrapSpec(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper))); if (Entities.isManaged(this)) Entities.manage(cluster); setAttribute(CLUSTER, cluster); } @Override - public KafkaZookeeper getZookeeper() { + public Zookeeper getZookeeper() { return getAttribute(ZOOKEEPER); } @Override - public synchronized ConfigurableEntityFactory<KafkaBroker> getBrokerFactory() { - return (ConfigurableEntityFactory<KafkaBroker>) getAttribute(BROKER_FACTORY); - } - - @Override - public synchronized DynamicCluster getCluster() { + public DynamicCluster getCluster() { return getAttribute(CLUSTER); } @Override public void start(Collection<? extends Location> locations) { if (isLegacyConstruction()) { - postConstruct(); + init(); } - if (locations.isEmpty()) locations = this.getLocations(); - Iterables.getOnlyElement(locations); //assert just one + if (locations.isEmpty()) locations = getLocations(); + Iterables.getOnlyElement(locations); // Assert just one addLocations(locations); List<Entity> childrenToStart = MutableList.<Entity>of(getCluster()); // Set the KafkaZookeeper entity as child of cluster, if it does not already have a parent if (getZookeeper().getParent() == null) { addChild(getZookeeper()); - } - // And only start zookeeper if we are parent - if (this.equals(getZookeeper().getParent())) childrenToStart.add(getZookeeper()); - try { - Entities.invokeEffectorList(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).get(); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } catch (ExecutionException e) { - throw Exceptions.propagate(e); - } + } // And only start zookeeper if we are parent + if (Objects.equal(this, getZookeeper().getParent())) childrenToStart.add(getZookeeper()); + Entities.invokeEffectorList(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).getUnchecked(); connectSensors(); } @Override public void stop() { - if (this.equals(getZookeeper().getParent())) { - getZookeeper().stop(); + List<Exception> errors = Lists.newArrayList(); + if (getZookeeper() != null && Objects.equal(this, getZookeeper().getParent())) { + try { + getZookeeper().stop(); + } catch (Exception e) { + errors.add(e); + } + } + if (getCurrentSize() > 0) { + try { + getCluster().stop(); + } catch (Exception e) { + errors.add(e); + } } - getCluster().stop(); - super.getLocations().clear(); + getLocations().clear(); setAttribute(SERVICE_UP, false); + + if (errors.size() != 0) { + throw new CompoundRuntimeException("Error stopping Kafka cluster", errors); + } } @Override @@ -181,14 +173,32 @@ public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster { .addToEntityAndEmitAll(this); } + /* + * All Group and Resizable interface methods are delegated to the broker cluster. + */ + + /** {@inheritDoc} */ @Override - public Integer resize(Integer desiredSize) { - return getCluster().resize(desiredSize); - } + public Collection<Entity> getMembers() { return getCluster().getMembers(); } - /** @return the current size of the group. */ - public Integer getCurrentSize() { - return getCluster().getCurrentSize(); - } + /** {@inheritDoc} */ + @Override + public boolean hasMember(Entity member) { return getCluster().hasMember(member); } + + /** {@inheritDoc} */ + @Override + public boolean addMember(Entity member) { return getCluster().addMember(member); } + + /** {@inheritDoc} */ + @Override + public boolean removeMember(Entity member) { return getCluster().removeMember(member); } + + /** {@inheritDoc} */ + @Override + public Integer getCurrentSize() { return getCluster().getCurrentSize(); } + + /** {@inheritDoc} */ + @Override + public Integer resize(Integer desiredSize) { return getCluster().resize(desiredSize); } } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java index a001a29..a0d7a46 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java @@ -17,40 +17,28 @@ package brooklyn.entity.messaging.kafka; import brooklyn.config.ConfigKey; import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.java.UsesJmx; import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.entity.zookeeper.Zookeeper; import brooklyn.event.basic.BasicConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; import brooklyn.util.flags.SetFromFlag; /** * An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper instance. */ @ImplementedBy(KafkaZookeeperImpl.class) -public interface KafkaZookeeper extends SoftwareProcess, UsesJmx, Kafka { +public interface KafkaZookeeper extends Zookeeper, Kafka { @SetFromFlag("startTimeout") public static final ConfigKey<Integer> START_TIMEOUT = SoftwareProcess.START_TIMEOUT; + /** The Kafka version, not the Zookeeper version. */ @SetFromFlag("version") ConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION; - @SetFromFlag("zookeeperPort") - PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+"); - - /** Location of the configuration file template to be copied to the server. */ - @SetFromFlag("zookeeperConfig") - ConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>( - String.class, "kafka.zookeeper.configTemplate", "Zookeeper configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties"); - - AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.outstandingRequests", "Outstanding request count"); - AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.received", "Total packets received"); - AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.sent", "Total packets sent"); - - Integer getZookeeperPort(); - - String getHostname(); + /** Location of the kafka configuration file template to be copied to the server. */ + @SetFromFlag("kafkaZookeeperConfig") + ConfigKey<String> KAFKA_ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(String.class, + "kafka.zookeeper.configTemplate", "Kafka zookeeper configuration template (in freemarker format)", + "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties"); } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java index 00f892b..79a6cf6 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java @@ -15,37 +15,22 @@ */ package brooklyn.entity.messaging.kafka; -import java.io.IOException; -import java.util.Collection; import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import javax.management.ObjectName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Objects.ToStringHelper; + import brooklyn.entity.Entity; -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.event.feed.function.FunctionFeed; -import brooklyn.event.feed.function.FunctionPollConfig; -import brooklyn.event.feed.jmx.JmxAttributePollConfig; -import brooklyn.event.feed.jmx.JmxFeed; -import brooklyn.event.feed.jmx.JmxHelper; +import brooklyn.entity.zookeeper.AbstractZookeeperImpl; import brooklyn.util.MutableMap; -import brooklyn.util.exceptions.Exceptions; - -import com.google.common.base.Functions; -import com.google.common.base.Objects.ToStringHelper; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Sets; /** * An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper instance. */ -public class KafkaZookeeperImpl extends SoftwareProcessImpl implements KafkaZookeeper { +public class KafkaZookeeperImpl extends AbstractZookeeperImpl implements KafkaZookeeper { + private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperImpl.class); public KafkaZookeeperImpl() { @@ -62,83 +47,8 @@ public class KafkaZookeeperImpl extends SoftwareProcessImpl implements KafkaZook } @Override - public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); } - - @Override - public String getHostname() { return getAttribute(HOSTNAME); } - - @Override public Class<?> getDriverInterface() { return KafkaZookeeperDriver.class; } - private ObjectName zookeeperMbean = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1"); - private volatile FunctionFeed functionFeed; - private volatile JmxFeed jmxFeed; - - /** Wait for five minutes to start. */ - @Override - public void waitForServiceUp() { waitForServiceUp(5, TimeUnit.MINUTES); } - - @Override - public void waitForServiceUp(long duration, TimeUnit units) { - super.waitForServiceUp(duration, units); - - // Wait for the MBean to exist - JmxHelper helper = null; - try { - helper = new JmxHelper(this); - helper.connect(); - helper.assertMBeanExistsEventually(zookeeperMbean, units.toMillis(duration)); - } catch (IOException e) { - throw Exceptions.propagate(e); - } finally { - if (helper != null) helper.disconnect(); - } - } - - @Override - protected void connectSensors() { - functionFeed = FunctionFeed.builder() - .entity(this) - .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP) - .period(500, TimeUnit.MILLISECONDS) - .callable(new Callable<Boolean>() { - public Boolean call() throws Exception { - return getDriver().isRunning(); - } - }) - .onError(Functions.constant(Boolean.FALSE))) - .build(); - - jmxFeed = JmxFeed.builder() - .entity(this) - .period(500, TimeUnit.MILLISECONDS) - .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS) - .objectName(zookeeperMbean) - .attributeName("OutstandingRequests") - .onError(Functions.constant(-1l))) - .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED) - .objectName(zookeeperMbean) - .attributeName("PacketsReceived") - .onError(Functions.constant(-1l))) - .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT) - .objectName(zookeeperMbean) - .attributeName("PacketsSent") - .onError(Functions.constant(-1l))) - .build(); - } - - @Override - public void disconnectSensors() { - super.disconnectSensors(); - if (functionFeed != null) functionFeed.stop(); - if (jmxFeed != null) jmxFeed.stop(); - } - - @Override - protected ToStringHelper toStringHelper() { - return super.toStringHelper().add("zookeeperPort", getZookeeperPort()); - } - } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java index a35aab6..df417e0 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java @@ -34,7 +34,7 @@ public class KafkaZookeeperSshDriver extends AbstractfKafkaSshDriver implements @Override protected ConfigKey<String> getConfigTemplateKey() { - return KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE; + return KafkaZookeeper.KAFKA_ZOOKEEPER_CONFIG_TEMPLATE; } @Override http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java new file mode 100644 index 0000000..2e2fc73 --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java @@ -0,0 +1,122 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.zookeeper; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import javax.management.ObjectName; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.event.feed.function.FunctionPollConfig; +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.event.feed.jmx.JmxHelper; +import brooklyn.util.MutableMap; +import brooklyn.util.exceptions.Exceptions; + +import com.google.common.base.Functions; +import com.google.common.base.Objects.ToStringHelper; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; + +/** + * An {@link brooklyn.entity.Entity} that represents a single Apache Zookeeper instance. + */ +public abstract class AbstractZookeeperImpl extends SoftwareProcessImpl implements Zookeeper { + + private static final Logger log = LoggerFactory.getLogger(AbstractZookeeperImpl.class); + private static final ObjectName ZOOKEEPER_MBEAN = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1"); + + private volatile JmxFeed jmxFeed; + + public AbstractZookeeperImpl() { + super(); + } + public AbstractZookeeperImpl(Map<?, ?> properties) { + this(properties, null); + } + public AbstractZookeeperImpl(Entity parent) { + this(MutableMap.of(), parent); + } + public AbstractZookeeperImpl(Map<?, ?> properties, Entity parent) { + super(properties, parent); + } + + @Override + public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); } + + @Override + public String getHostname() { return getAttribute(HOSTNAME); } + + @Override + public void waitForServiceUp(long duration, TimeUnit units) { + super.waitForServiceUp(duration, units); + + // Wait for the MBean to exist + JmxHelper helper = new JmxHelper(this); + try { + helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration)); + } finally { + helper.disconnect(); + } + } + + @Override + protected void connectSensors() { + connectServiceUpIsRunning(); + + jmxFeed = JmxFeed.builder() + .entity(this) + .period(500, TimeUnit.MILLISECONDS) + .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS) + .objectName(ZOOKEEPER_MBEAN) + .attributeName("OutstandingRequests") + .onError(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED) + .objectName(ZOOKEEPER_MBEAN) + .attributeName("PacketsReceived") + .onError(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT) + .objectName(ZOOKEEPER_MBEAN) + .attributeName("PacketsSent") + .onError(Functions.constant(-1l))) + .build(); + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + if (jmxFeed != null) jmxFeed.stop(); + } + + @Override + protected ToStringHelper toStringHelper() { + return super.toStringHelper() + .add("zookeeperPort", getZookeeperPort()); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java new file mode 100644 index 0000000..369ff61 --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/zookeeper/Zookeeper.java @@ -0,0 +1,50 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.zookeeper; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.java.UsesJmx; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.util.flags.SetFromFlag; + +/** + * An {@link brooklyn.entity.Entity} that represents a single Apache Zookeeper instance. + * <p> + * Currently {@code abstract} as there is no generic Zookeeper driver. + */ +@ImplementedBy(AbstractZookeeperImpl.class) +public interface Zookeeper extends SoftwareProcess, UsesJmx { + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = new BasicConfigKey<String>(SoftwareProcess.SUGGESTED_VERSION, "3.3.3"); + + @SetFromFlag("zookeeperPort") + PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+"); + + AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "zookeeper.outstandingRequests", "Outstanding request count"); + AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.received", "Total packets received"); + AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.sent", "Total packets sent"); + + Integer getZookeeperPort(); + + String getHostname(); + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy index 28ff308..8733cb0 100644 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy @@ -41,19 +41,15 @@ public class ActiveMQIntegrationTest { private Location testLocation private ActiveMQBroker activeMQ - @BeforeMethod(groups = "Integration") + @BeforeMethod(alwaysRun = true) public void setup() { app = ApplicationBuilder.newManagedApp(TestApplication.class); testLocation = new LocalhostMachineProvisioningLocation() } - @AfterMethod(groups = "Integration") + @AfterMethod(alwaysRun = true) public void shutdown() { - try { - if (app != null) Entities.destroyAll(app); - } catch (Exception e) { - log.warn("Error stopping entities", e); - } + if (app != null) Entities.destroyAll(app); } /** http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy deleted file mode 100644 index 2ef95c5..0000000 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright 2013 by Cloudsoft Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package brooklyn.entity.messaging.kafka; - -import static brooklyn.test.TestUtils.* -import static java.util.concurrent.TimeUnit.* -import static org.testng.Assert.* - -import java.util.concurrent.TimeUnit - -import javax.jms.Connection -import javax.jms.MessageConsumer -import javax.jms.MessageProducer -import javax.jms.Queue -import javax.jms.Session -import javax.jms.TextMessage - -import org.apache.activemq.ActiveMQConnectionFactory -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import org.testng.annotations.AfterMethod -import org.testng.annotations.BeforeMethod -import org.testng.annotations.Test - -import brooklyn.entity.basic.ApplicationBuilder -import brooklyn.entity.basic.Entities -import brooklyn.entity.proxying.BasicEntitySpec -import brooklyn.entity.trait.Startable -import brooklyn.location.Location -import brooklyn.location.basic.LocalhostMachineProvisioningLocation -import brooklyn.test.entity.TestApplication -import brooklyn.util.internal.TimeExtras - -/** - * Test the operation of the {@link ActiveMQBroker} class. - * - * TODO test that sensors update. - */ -public class KafkaIntegrationTest { - private static final Logger log = LoggerFactory.getLogger(KafkaIntegrationTest.class) - - static { TimeExtras.init() } - - private TestApplication app - private Location testLocation - - @BeforeMethod(groups = "Integration") - public void setup() { - app = ApplicationBuilder.builder(TestApplication.class).manage(); - testLocation = new LocalhostMachineProvisioningLocation() - } - - @AfterMethod(groups = "Integration") - public void shutdown() { - if (app != null) Entities.destroyAll(app); - } - - /** - * Test that we can start a zookeeper. - */ - @Test(groups = "Integration") - public void testZookeeper() { - KafkaZookeeper zookeeper = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class)); - - zookeeper.start([ testLocation ]) - executeUntilSucceedsWithShutdown(zookeeper, timeout:600*TimeUnit.SECONDS) { - assertTrue zookeeper.getAttribute(Startable.SERVICE_UP) - } - assertFalse zookeeper.getAttribute(Startable.SERVICE_UP) - } - - /** - * Test that we can start a broker and zookeeper together. - */ - @Test(groups = "Integration") - public void testBrokerPlusZookeeper() { - KafkaZookeeper zookeeper = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class)); - KafkaBroker broker = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper)); - - zookeeper.start([ testLocation ]) - executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) { - assertTrue zookeeper.getAttribute(Startable.SERVICE_UP) - } - - broker.start([ testLocation ]) - executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) { - assertTrue broker.getAttribute(Startable.SERVICE_UP) - } - } - - /** - * Test that we can start a cluster with zookeeper and one broker. - * - * Connects to the zookeeper controller and tests sending and receiving messages on a topic. - */ - @Test(groups = "Integration") - public void testSingleBrokerCluster() { - KafkaCluster cluster = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaCluster.class).configure(KafkaCluster.INITIAL_SIZE, 1)); - - cluster.start([ testLocation ]) - executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) { - assertTrue cluster.getAttribute(Startable.SERVICE_UP) - } - - Entities.dumpInfo(cluster); - - KafkaSupport support = new KafkaSupport(cluster.getZookeeper()); - support.sendMessage("brooklyn", "TEST_MESSAGE") - String message = support.getMessage("brooklyn"); - assertEquals(message, "TEST_MESSAGE"); - } - -} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java new file mode 100644 index 0000000..54f698a --- /dev/null +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java @@ -0,0 +1,144 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.util.concurrent.Callable; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.messaging.activemq.ActiveMQBroker; +import brooklyn.entity.proxying.EntitySpecs; +import brooklyn.entity.trait.Startable; +import brooklyn.location.Location; +import brooklyn.location.basic.LocalhostMachineProvisioningLocation; +import brooklyn.test.Asserts; +import brooklyn.test.entity.TestApplication; +import brooklyn.util.MutableMap; + +import com.google.common.collect.ImmutableList; + +/** + * Test the operation of the {@link ActiveMQBroker} class. + * + * TODO test that sensors update. + */ +public class KafkaIntegrationTest { + + private TestApplication app; + private Location testLocation; + + @BeforeMethod(alwaysRun = true) + public void setup() { + app = ApplicationBuilder.newManagedApp(TestApplication.class); + testLocation = new LocalhostMachineProvisioningLocation(); + } + + @AfterMethod(alwaysRun = true) + public void shutdown() { + if (app != null) Entities.destroyAll(app); + } + + /** + * Test that we can start a zookeeper. + */ + @Test(groups = "Integration") + public void testZookeeper() { + final KafkaZookeeper zookeeper = app.createAndManageChild(EntitySpecs.spec(KafkaZookeeper.class)); + + zookeeper.start(ImmutableList.of(testLocation)); + Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() { + @Override + public Void call() { + assertTrue(zookeeper.getAttribute(Startable.SERVICE_UP)); + return null; + } + }); + + zookeeper.stop(); + assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP)); + } + + /** + * Test that we can start a broker and zookeeper together. + */ + @Test(groups = "Integration") + public void testBrokerPlusZookeeper() { + final KafkaZookeeper zookeeper = app.createAndManageChild(EntitySpecs.spec(KafkaZookeeper.class)); + final KafkaBroker broker = app.createAndManageChild(EntitySpecs.spec(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper)); + + zookeeper.start(ImmutableList.of(testLocation)); + Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() { + @Override + public Void call() { + assertTrue(zookeeper.getAttribute(Startable.SERVICE_UP)); + return null; + } + }); + + broker.start(ImmutableList.of(testLocation)); + Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() { + @Override + public Void call() { + assertTrue(broker.getAttribute(Startable.SERVICE_UP)); + return null; + } + }); + + zookeeper.stop(); + assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP)); + + broker.stop(); + assertFalse(broker.getAttribute(Startable.SERVICE_UP)); + } + + /** + * Test that we can start a cluster with zookeeper and one broker. + * + * Connects to the zookeeper controller and tests sending and receiving messages on a topic. + */ + @Test(groups = "Integration") + public void testSingleBrokerCluster() { + final KafkaCluster cluster = app.createAndManageChild(EntitySpecs.spec(KafkaCluster.class)); + + cluster.start(ImmutableList.of(testLocation)); + Asserts.succeedsEventually(MutableMap.of("timeout", 60000l), new Callable<Void>() { + @Override + public Void call() { + assertTrue(cluster.getAttribute(Startable.SERVICE_UP)); + assertTrue(cluster.getZookeeper().getAttribute(Startable.SERVICE_UP)); + assertEquals(cluster.getCurrentSize().intValue(), 1); + return null; + } + }); + + Entities.dumpInfo(cluster); + + KafkaSupport support = new KafkaSupport(cluster); + + support.sendMessage("brooklyn", "TEST_MESSAGE"); + String message = support.getMessage("brooklyn"); + assertEquals(message, "TEST_MESSAGE"); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/64486e44/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java index d9372a9..019a65b 100644 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java @@ -31,40 +31,54 @@ import kafka.javaapi.producer.ProducerData; import kafka.message.Message; import kafka.producer.ProducerConfig; import brooklyn.entity.basic.Attributes; +import brooklyn.entity.zookeeper.Zookeeper; -import com.beust.jcommander.internal.Lists; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +/** + * Kafka test framework for integration and live tests, using the Kafka Java API. + */ public class KafkaSupport { - private final KafkaZookeeper zookeeper; + private final KafkaCluster cluster; - public KafkaSupport(KafkaZookeeper zookeeper) { - this.zookeeper = zookeeper; + public KafkaSupport(KafkaCluster cluster) { + this.cluster = cluster; } + /** + * Send a message to the {@link KafkaCluster} on the given topic. + */ public void sendMessage(String topic, String message) { + Zookeeper zookeeper = cluster.getZookeeper(); Properties props = new Properties(); props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort())); props.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); + Producer<String, String> producer = new Producer<String, String>(config); ProducerData<String, String> data = new ProducerData<String, String>(topic, message); producer.send(data); producer.close(); } + /** + * Retrieve the next message on the given topic from the {@link KafkaCluster}. + */ public String getMessage(String topic) { + Zookeeper zookeeper = cluster.getZookeeper(); Properties props = new Properties(); props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort())); - props.put("zk.connectiontimeout.ms", "1000000"); + props.put("zk.connectiontimeout.ms", "120000"); // two minutes props.put("groupid", "brooklyn"); ConsumerConfig consumerConfig = new ConsumerConfig(props); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic); ConsumerIterator<Message> iterator = Iterables.getOnlyElement(streams).iterator(); Message msg = iterator.next(); + assertTrue(msg.isValid()); ByteBuffer buf = msg.payload(); byte[] data = new byte[buf.remaining()];
