Repository: brooklyn-library Updated Branches: refs/heads/0.6.0 [created] 6acc62fa2
Tidy up Cassandra fabric and cluster interfaces Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/de2e685b Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/de2e685b Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/de2e685b Branch: refs/heads/0.6.0 Commit: de2e685bd60c85bdfaf1d6bc7eac2f52e0c89bba Parents: 06c8180 Author: Andrew Kennedy <[email protected]> Authored: Thu Oct 24 15:40:35 2013 +0100 Committer: Andrew Kennedy <[email protected]> Committed: Thu Oct 24 15:46:53 2013 +0100 ---------------------------------------------------------------------- .../nosql/cassandra/CassandraCluster.java | 101 +++++++++++-------- .../entity/nosql/cassandra/CassandraFabric.java | 39 +++---- 2 files changed, 80 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/de2e685b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java index 9f87e80..ceb8c7c 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java @@ -3,7 +3,6 @@ */ package brooklyn.entity.nosql.cassandra; -import java.util.Map; import java.util.Set; import brooklyn.catalog.Catalog; @@ -22,14 +21,17 @@ import brooklyn.util.time.Duration; import com.google.common.base.Supplier; import com.google.common.collect.Multimap; +import com.google.common.reflect.TypeToken; /** * A cluster of {@link CassandraNode}s based on {@link DynamicCluster} which can be resized by a policy if required. * <p> - * Note that due to how Cassandra assumes ports are the same across a cluster, - * it is NOT possible to deploy a cluster to localhost. + * Note that due to how Cassandra assumes ports are the same across a cluster, + * it is <em>NOT</em> possible to deploy a cluster to localhost. */ -@Catalog(name="Apache Cassandra Database Cluster", description="Cassandra is a highly scalable, eventually consistent, distributed, structured key-value store which provides a ColumnFamily-based data model richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") +@Catalog(name="Apache Cassandra Database Cluster", description="Cassandra is a highly scalable, eventually " + + "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " + + "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") @ImplementedBy(CassandraClusterImpl.class) public interface CassandraCluster extends DynamicCluster { @@ -40,42 +42,48 @@ public interface CassandraCluster extends DynamicCluster { ConfigKey<String> ENDPOINT_SNITCH_NAME = ConfigKeys.newStringConfigKey("cassandra.cluster.snitchName", "Type of the Cassandra snitch", "SimpleSnitch"); @SetFromFlag("seedSupplier") - ConfigKey<Supplier<Set<Entity>>> SEED_SUPPLIER = (ConfigKey) ConfigKeys.newConfigKey(Supplier.class, "cassandra.cluster.seedSupplier", "For determining the seed nodes", null); + @SuppressWarnings("serial") + ConfigKey<Supplier<Set<Entity>>> SEED_SUPPLIER = ConfigKeys.newConfigKey(new TypeToken<Supplier<Set<Entity>>>() { }, "cassandra.cluster.seedSupplier", "For determining the seed nodes", null); - /** Additional time after the nodes in the cluster are up when starting before announcing the cluster as up; - * Useful to ensure nodes have synchronized. */ - // on 1.2.2 this could be as much as 120s when using 2 seed nodes, - // or just a few seconds with 1 seed node; - // on 1.2.9 it seems a few seconds is sufficient even with 2 seed nodes + /** + * Additional time after the nodes in the cluster are up when starting + * before announcing the cluster as up. + * <p> + * Useful to ensure nodes have synchronized. + * <p> + * On 1.2.2 this could be as much as 120s when using 2 seed nodes, + * or just a few seconds with 1 seed node. On 1.2.9 it seems a few + * seconds is sufficient even with 2 seed nodes + */ @SetFromFlag("delayBeforeAdvertisingCluster") ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "cassandra.cluster.delayBeforeAdvertisingCluster", "Type of the Cassandra snitch", Duration.TEN_SECONDS); - @SuppressWarnings({ "unchecked", "rawtypes" }) - AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = (AttributeSensor)Sensors.newSensor(Map.class, "cassandra.cluster.datacenterUsages", "Current set of datacenters in use, with nodes in each"); + @SuppressWarnings("serial") + AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = Sensors.newSensor(new TypeToken<Multimap<String,Entity>>() { }, "cassandra.cluster.datacenterUsages", "Current set of datacenters in use, with nodes in each"); - @SuppressWarnings({ "unchecked", "rawtypes" }) - AttributeSensor<Set<String>> DATACENTERS = (AttributeSensor)Sensors.newSensor(Set.class, "cassandra.cluster.datacenters", "Current set of datacenters in use"); + @SuppressWarnings("serial") + AttributeSensor<Set<String>> DATACENTERS = Sensors.newSensor(new TypeToken<Set<String>>() { }, "cassandra.cluster.datacenters", "Current set of datacenters in use"); AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = Sensors.newBooleanSensor("cassandra.cluster.seeds.hasPublished", "Whether we have published any seeds"); - - @SuppressWarnings({ "unchecked", "rawtypes" }) - AttributeSensor<Set<Entity>> CURRENT_SEEDS = (AttributeSensor)Sensors.newSensor(Set.class, "cassandra.cluster.seeds.current", "Current set of seeds to use to bootstrap the cluster"); - + + @SuppressWarnings("serial") + AttributeSensor<Set<Entity>> CURRENT_SEEDS = Sensors.newSensor(new TypeToken<Set<Entity>>() { }, "cassandra.cluster.seeds.current", "Current set of seeds to use to bootstrap the cluster"); + AttributeSensor<String> HOSTNAME = Sensors.newStringSensor("cassandra.cluster.hostname", "Hostname to connect to cluster with"); AttributeSensor<Integer> THRIFT_PORT = Sensors.newIntegerSensor("cassandra.cluster.thrift.port", "Cassandra Thrift RPC port to connect to cluster with"); - + AttributeSensor<Long> FIRST_NODE_STARTED_TIME_UTC = Sensors.newLongSensor("cassandra.cluster.first.node.started.utc", "Time (UTC) when the first node was started"); - - AttributeSensor<Integer> SCHEMA_VERSION_COUNT = Sensors.newIntegerSensor("cassandra.cluster.schema.versions.count", + + AttributeSensor<Integer> SCHEMA_VERSION_COUNT = Sensors.newIntegerSensor("cassandra.cluster.schema.versions.count", "Number of different schema versions in the cluster; should be 1 for a healthy cluster, 0 when off; " + - ">=2 indicats a Schema Disagreement Error (and keyspace access may fail)"); + "2 and above indicats a Schema Disagreement Error (and keyspace access may fail)"); AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.cluster.read.pending", "Current pending ReadStage tasks"); AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.read.active", "Current active ReadStage tasks"); AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.cluster.write.pending", "Current pending MutationStage tasks"); AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.write.active", "Current active MutationStage tasks"); - + AttributeSensor<Long> THRIFT_PORT_LATENCY_PER_NODE = Sensors.newLongSensor("cassandra.cluster.thrift.latency.perNode", "Latency for thrift port connection averaged over all nodes (ms)"); AttributeSensor<Double> READS_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.reads.perSec.last.perNode", "Reads/sec (last datapoint) averaged over all nodes"); AttributeSensor<Double> WRITES_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.write.perSec.last.perNode", "Writes/sec (last datapoint) averaged over all nodes"); @@ -88,32 +96,41 @@ public interface CassandraCluster extends DynamicCluster { MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraCluster.class, "update"); - /** sets the number of nodes used to seed the cluster; - * v1.2.2 is buggy and requires a big delay for 2 nodes both seeds to reconcile, - * see http://stackoverflow.com/questions/6770894/schemadisagreementexception/18639005 - * and posts to cassandra mailing list. (Alex, 9 Sept 2013) - * <p> - * with v1.2.9 this seems fine, with just a few seconds' delay after starting */ - public static final int DEFAULT_SEED_QUORUM = 2; - - /** can insert a delay after the first node comes up; - * is not needed with 1.2.9 (and does not help with the bug in 1.2.2) */ - public static final Duration DELAY_AFTER_FIRST = Duration.ZERO; - - /** whether to wait for the first node to start up */ - // not sure whether this is needed or not; need to test in env where not all nodes are seed nodes, - // what happens if non-seed nodes start before the seed nodes ? - public static final boolean WAIT_FOR_FIRST = true; - + /** + * Sets the number of nodes used to seed the cluster. + * <p> + * Version 1.2.2 is buggy and requires a big delay for 2 nodes both seeds to reconcile, + * with 1.2.9 this seems fine, with just a few seconds' delay after starting. + * + * @see <a href="http://stackoverflow.com/questions/6770894/schemadisagreementexception/18639005" /> + */ + int DEFAULT_SEED_QUORUM = 2; + + /** + * Can insert a delay after the first node comes up. + * <p> + * Not needed with 1.2.9 (and does not help with the bug in 1.2.2) + */ + Duration DELAY_AFTER_FIRST = Duration.ZERO; + + /** + * Whether to wait for the first node to start up + * <p> + * not sure whether this is needed or not. Need to test in env where not all nodes are seed nodes, + * what happens if non-seed nodes start before the seed nodes? + */ + boolean WAIT_FOR_FIRST = true; + @Effector(description="Updates the cluster members") void update(); - + /** * The name of the cluster. */ String getClusterName(); Set<Entity> gatherPotentialSeeds(); - + Set<Entity> gatherPotentialRunningSeeds(); + } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/de2e685b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java index 635caa5..35fb4d5 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java @@ -3,49 +3,52 @@ */ package brooklyn.entity.nosql.cassandra; -import java.util.Map; import java.util.Set; import brooklyn.catalog.Catalog; import brooklyn.config.ConfigKey; import brooklyn.entity.Entity; +import brooklyn.entity.annotation.Effector; import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.MethodEffector; import brooklyn.entity.group.DynamicFabric; import brooklyn.entity.proxying.ImplementedBy; import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.Sensors; import com.google.common.collect.Multimap; /** * A fabric of {@link CassandraNode}s, which forms a cluster spanning multiple locations. - * - * Each "CassandraCluster" child instance is actually just a part of the whole cluster. It consists of the + * <p> + * Each {@link CassandraCluster} child instance is actually just a part of the whole cluster. It consists of the * nodes in that single location (which normally corresponds to a "datacenter" in Cassandra terminology). */ -@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a highly scalable, eventually consistent, distributed, structured key-value store which provides a ColumnFamily-based data model richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") +@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a highly scalable, eventually " + + "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " + + "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") @ImplementedBy(CassandraFabricImpl.class) public interface CassandraFabric extends DynamicFabric { - + ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey( "fabric.initial.quorumSize", - "Initial fabric quorum size - number of initial nodes that must have been successfully started to report success (if < 0, then use value of INITIAL_SIZE)", + "Initial fabric quorum size - number of initial nodes that must have been successfully started " + + "to report success (if less than 0, then use value of INITIAL_SIZE)", -1); - @SuppressWarnings({ "unchecked", "rawtypes" }) - AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = (AttributeSensor)Sensors.newSensor(Map.class, "cassandra.cluster.datacenters", "Current set of datacenters in use, with nodes in each"); + AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = CassandraCluster.DATACENTER_USAGE; + + AttributeSensor<Set<String>> DATACENTERS = CassandraCluster.DATACENTERS; + + AttributeSensor<Set<Entity>> CURRENT_SEEDS = CassandraCluster.CURRENT_SEEDS; + + AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = CassandraCluster.HAS_PUBLISHED_SEEDS; - @SuppressWarnings({ "unchecked", "rawtypes" }) - AttributeSensor<Set<String>> DATACENTERS = (AttributeSensor)Sensors.newSensor(Set.class, "cassandra.cluster.datacenters", "Current set of datacenters in use"); + AttributeSensor<String> HOSTNAME = CassandraCluster.HOSTNAME; - @SuppressWarnings({ "unchecked", "rawtypes" }) - AttributeSensor<Set<Entity>> CURRENT_SEEDS = (AttributeSensor)Sensors.newSensor(Set.class, "cassandra.cluster.seeds.current", "Current set of seeds to use to bootstrap the cluster"); - - AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = Sensors.newBooleanSensor("cassandra.cluster.seeds.hasPublished", "Whether we have published any seeds"); - - AttributeSensor<String> HOSTNAME = Sensors.newStringSensor("cassandra.cluster.hostname", "Hostname to connect to cluster with"); + AttributeSensor<Integer> THRIFT_PORT = CassandraCluster.THRIFT_PORT; - AttributeSensor<Integer> THRIFT_PORT = Sensors.newIntegerSensor("cassandra.cluster.thrift.port", "Cassandra Thrift RPC port to connect to cluster with"); + MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraFabric.class, "update"); + @Effector(description="Updates the cluster members") void update(); }
