Repository: brooklyn-library
Updated Branches:
  refs/heads/0.6.0 [created] 6acc62fa2


Tidy up Cassandra fabric and cluster interfaces


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/de2e685b
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/de2e685b
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/de2e685b

Branch: refs/heads/0.6.0
Commit: de2e685bd60c85bdfaf1d6bc7eac2f52e0c89bba
Parents: 06c8180
Author: Andrew Kennedy <[email protected]>
Authored: Thu Oct 24 15:40:35 2013 +0100
Committer: Andrew Kennedy <[email protected]>
Committed: Thu Oct 24 15:46:53 2013 +0100

----------------------------------------------------------------------
 .../nosql/cassandra/CassandraCluster.java       | 101 +++++++++++--------
 .../entity/nosql/cassandra/CassandraFabric.java |  39 +++----
 2 files changed, 80 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/de2e685b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java
index 9f87e80..ceb8c7c 100644
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java
+++ 
b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraCluster.java
@@ -3,7 +3,6 @@
  */
 package brooklyn.entity.nosql.cassandra;
 
-import java.util.Map;
 import java.util.Set;
 
 import brooklyn.catalog.Catalog;
@@ -22,14 +21,17 @@ import brooklyn.util.time.Duration;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Multimap;
+import com.google.common.reflect.TypeToken;
 
 /**
  * A cluster of {@link CassandraNode}s based on {@link DynamicCluster} which 
can be resized by a policy if required.
  * <p>
- * Note that due to how Cassandra assumes ports are the same across a cluster, 
- * it is NOT possible to deploy a cluster to localhost.
+ * Note that due to how Cassandra assumes ports are the same across a cluster,
+ * it is <em>NOT</em> possible to deploy a cluster to localhost.
  */
-@Catalog(name="Apache Cassandra Database Cluster", description="Cassandra is a 
highly scalable, eventually consistent, distributed, structured key-value store 
which provides a ColumnFamily-based data model richer than typical key/value 
systems", iconUrl="classpath:///cassandra-logo.jpeg")
+@Catalog(name="Apache Cassandra Database Cluster", description="Cassandra is a 
highly scalable, eventually " +
+        "consistent, distributed, structured key-value store which provides a 
ColumnFamily-based data model " +
+        "richer than typical key/value systems", 
iconUrl="classpath:///cassandra-logo.jpeg")
 @ImplementedBy(CassandraClusterImpl.class)
 public interface CassandraCluster extends DynamicCluster {
 
@@ -40,42 +42,48 @@ public interface CassandraCluster extends DynamicCluster {
     ConfigKey<String> ENDPOINT_SNITCH_NAME = 
ConfigKeys.newStringConfigKey("cassandra.cluster.snitchName", "Type of the 
Cassandra snitch", "SimpleSnitch");
 
     @SetFromFlag("seedSupplier")
-    ConfigKey<Supplier<Set<Entity>>> SEED_SUPPLIER = (ConfigKey) 
ConfigKeys.newConfigKey(Supplier.class, "cassandra.cluster.seedSupplier", "For 
determining the seed nodes", null);
+    @SuppressWarnings("serial")
+    ConfigKey<Supplier<Set<Entity>>> SEED_SUPPLIER = 
ConfigKeys.newConfigKey(new TypeToken<Supplier<Set<Entity>>>() { }, 
"cassandra.cluster.seedSupplier", "For determining the seed nodes", null);
 
-    /** Additional time after the nodes in the cluster are up when starting 
before announcing the cluster as up;
-     * Useful to ensure nodes have synchronized.  */
-    // on 1.2.2 this could be as much as 120s when using 2 seed nodes, 
-    // or just a few seconds with 1 seed node;
-    // on 1.2.9 it seems a few seconds is sufficient even with 2 seed nodes
+    /**
+     * Additional time after the nodes in the cluster are up when starting
+     * before announcing the cluster as up.
+     * <p>
+     * Useful to ensure nodes have synchronized.
+     * <p>
+     * On 1.2.2 this could be as much as 120s when using 2 seed nodes,
+     * or just a few seconds with 1 seed node. On 1.2.9 it seems a few
+     * seconds is sufficient even with 2 seed nodes
+     */
     @SetFromFlag("delayBeforeAdvertisingCluster")
     ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = 
ConfigKeys.newConfigKey(Duration.class, 
"cassandra.cluster.delayBeforeAdvertisingCluster", "Type of the Cassandra 
snitch", Duration.TEN_SECONDS);
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = 
(AttributeSensor)Sensors.newSensor(Map.class, 
"cassandra.cluster.datacenterUsages", "Current set of datacenters in use, with 
nodes in each");
+    @SuppressWarnings("serial")
+    AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = 
Sensors.newSensor(new TypeToken<Multimap<String,Entity>>() { }, 
"cassandra.cluster.datacenterUsages", "Current set of datacenters in use, with 
nodes in each");
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    AttributeSensor<Set<String>> DATACENTERS = 
(AttributeSensor)Sensors.newSensor(Set.class, "cassandra.cluster.datacenters", 
"Current set of datacenters in use");
+    @SuppressWarnings("serial")
+    AttributeSensor<Set<String>> DATACENTERS = Sensors.newSensor(new 
TypeToken<Set<String>>() { }, "cassandra.cluster.datacenters", "Current set of 
datacenters in use");
 
     AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = 
Sensors.newBooleanSensor("cassandra.cluster.seeds.hasPublished", "Whether we 
have published any seeds");
-    
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    AttributeSensor<Set<Entity>> CURRENT_SEEDS = 
(AttributeSensor)Sensors.newSensor(Set.class, 
"cassandra.cluster.seeds.current", "Current set of seeds to use to bootstrap 
the cluster");
-    
+
+    @SuppressWarnings("serial")
+    AttributeSensor<Set<Entity>> CURRENT_SEEDS = Sensors.newSensor(new 
TypeToken<Set<Entity>>() { }, "cassandra.cluster.seeds.current", "Current set 
of seeds to use to bootstrap the cluster");
+
     AttributeSensor<String> HOSTNAME = 
Sensors.newStringSensor("cassandra.cluster.hostname", "Hostname to connect to 
cluster with");
 
     AttributeSensor<Integer> THRIFT_PORT = 
Sensors.newIntegerSensor("cassandra.cluster.thrift.port", "Cassandra Thrift RPC 
port to connect to cluster with");
-    
+
     AttributeSensor<Long> FIRST_NODE_STARTED_TIME_UTC = 
Sensors.newLongSensor("cassandra.cluster.first.node.started.utc", "Time (UTC) 
when the first node was started");
-    
-    AttributeSensor<Integer> SCHEMA_VERSION_COUNT = 
Sensors.newIntegerSensor("cassandra.cluster.schema.versions.count", 
+
+    AttributeSensor<Integer> SCHEMA_VERSION_COUNT = 
Sensors.newIntegerSensor("cassandra.cluster.schema.versions.count",
             "Number of different schema versions in the cluster; should be 1 
for a healthy cluster, 0 when off; " +
-            ">=2 indicats a Schema Disagreement Error (and keyspace access may 
fail)");
+            "2 and above indicats a Schema Disagreement Error (and keyspace 
access may fail)");
 
     AttributeSensor<Long> READ_PENDING = 
Sensors.newLongSensor("cassandra.cluster.read.pending", "Current pending 
ReadStage tasks");
     AttributeSensor<Integer> READ_ACTIVE = 
Sensors.newIntegerSensor("cassandra.cluster.read.active", "Current active 
ReadStage tasks");
     AttributeSensor<Long> WRITE_PENDING = 
Sensors.newLongSensor("cassandra.cluster.write.pending", "Current pending 
MutationStage tasks");
     AttributeSensor<Integer> WRITE_ACTIVE = 
Sensors.newIntegerSensor("cassandra.cluster.write.active", "Current active 
MutationStage tasks");
-    
+
     AttributeSensor<Long> THRIFT_PORT_LATENCY_PER_NODE = 
Sensors.newLongSensor("cassandra.cluster.thrift.latency.perNode", "Latency for 
thrift port connection  averaged over all nodes (ms)");
     AttributeSensor<Double> READS_PER_SECOND_LAST_PER_NODE = 
Sensors.newDoubleSensor("cassandra.reads.perSec.last.perNode", "Reads/sec (last 
datapoint) averaged over all nodes");
     AttributeSensor<Double> WRITES_PER_SECOND_LAST_PER_NODE = 
Sensors.newDoubleSensor("cassandra.write.perSec.last.perNode", "Writes/sec 
(last datapoint) averaged over all nodes");
@@ -88,32 +96,41 @@ public interface CassandraCluster extends DynamicCluster {
 
     MethodEffector<Void> UPDATE = new 
MethodEffector<Void>(CassandraCluster.class, "update");
 
-    /** sets the number of nodes used to seed the cluster;
-     *  v1.2.2 is buggy and requires a big delay for 2 nodes both seeds to 
reconcile, 
-     *  see 
http://stackoverflow.com/questions/6770894/schemadisagreementexception/18639005
-     *  and posts to cassandra mailing list. (Alex, 9 Sept 2013)
-     *  <p>
-     *  with v1.2.9 this seems fine, with just a few seconds' delay after 
starting */
-    public static final int DEFAULT_SEED_QUORUM = 2;
-    
-    /** can insert a delay after the first node comes up;
-     * is not needed with 1.2.9 (and does not help with the bug in 1.2.2) */
-    public static final Duration DELAY_AFTER_FIRST = Duration.ZERO;
-    
-    /** whether to wait for the first node to start up */
-    // not sure whether this is needed or not; need to test in env where not 
all nodes are seed nodes,
-    // what happens if non-seed nodes start before the seed nodes ?
-    public static final boolean WAIT_FOR_FIRST = true;
-    
+    /**
+     * Sets the number of nodes used to seed the cluster.
+     * <p>
+     * Version 1.2.2 is buggy and requires a big delay for 2 nodes both seeds 
to reconcile,
+     * with 1.2.9 this seems fine, with just a few seconds' delay after 
starting.
+     *
+     * @see <a 
href="http://stackoverflow.com/questions/6770894/schemadisagreementexception/18639005";
 />
+     */
+    int DEFAULT_SEED_QUORUM = 2;
+
+    /**
+     * Can insert a delay after the first node comes up.
+     * <p>
+     * Not needed with 1.2.9 (and does not help with the bug in 1.2.2)
+     */
+    Duration DELAY_AFTER_FIRST = Duration.ZERO;
+
+    /**
+     * Whether to wait for the first node to start up
+     * <p>
+     * not sure whether this is needed or not. Need to test in env where not 
all nodes are seed nodes,
+     * what happens if non-seed nodes start before the seed nodes?
+     */
+    boolean WAIT_FOR_FIRST = true;
+
     @Effector(description="Updates the cluster members")
     void update();
-    
+
     /**
      * The name of the cluster.
      */
     String getClusterName();
 
     Set<Entity> gatherPotentialSeeds();
-    
+
     Set<Entity> gatherPotentialRunningSeeds();
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/de2e685b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java
index 635caa5..35fb4d5 100644
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java
+++ 
b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java
@@ -3,49 +3,52 @@
  */
 package brooklyn.entity.nosql.cassandra;
 
-import java.util.Map;
 import java.util.Set;
 
 import brooklyn.catalog.Catalog;
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.Entity;
+import brooklyn.entity.annotation.Effector;
 import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.MethodEffector;
 import brooklyn.entity.group.DynamicFabric;
 import brooklyn.entity.proxying.ImplementedBy;
 import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.Sensors;
 
 import com.google.common.collect.Multimap;
 
 /**
  * A fabric of {@link CassandraNode}s, which forms a cluster spanning multiple 
locations.
- * 
- * Each "CassandraCluster" child instance is actually just a part of the whole 
cluster. It consists of the
+ * <p>
+ * Each {@link CassandraCluster} child instance is actually just a part of the 
whole cluster. It consists of the
  * nodes in that single location (which normally corresponds to a "datacenter" 
in Cassandra terminology).
  */
-@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a 
highly scalable, eventually consistent, distributed, structured key-value store 
which provides a ColumnFamily-based data model richer than typical key/value 
systems", iconUrl="classpath:///cassandra-logo.jpeg")
+@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a 
highly scalable, eventually " +
+        "consistent, distributed, structured key-value store which provides a 
ColumnFamily-based data model " +
+        "richer than typical key/value systems", 
iconUrl="classpath:///cassandra-logo.jpeg")
 @ImplementedBy(CassandraFabricImpl.class)
 public interface CassandraFabric extends DynamicFabric {
-    
+
     ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey(
             "fabric.initial.quorumSize",
-            "Initial fabric quorum size - number of initial nodes that must 
have been successfully started to report success (if < 0, then use value of 
INITIAL_SIZE)", 
+            "Initial fabric quorum size - number of initial nodes that must 
have been successfully started " +
+            "to report success (if less than 0, then use value of 
INITIAL_SIZE)",
             -1);
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = 
(AttributeSensor)Sensors.newSensor(Map.class, "cassandra.cluster.datacenters", 
"Current set of datacenters in use, with nodes in each");
+    AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = 
CassandraCluster.DATACENTER_USAGE;
+
+    AttributeSensor<Set<String>> DATACENTERS = CassandraCluster.DATACENTERS;
+
+    AttributeSensor<Set<Entity>> CURRENT_SEEDS = 
CassandraCluster.CURRENT_SEEDS;
+
+    AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = 
CassandraCluster.HAS_PUBLISHED_SEEDS;
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    AttributeSensor<Set<String>> DATACENTERS = 
(AttributeSensor)Sensors.newSensor(Set.class, "cassandra.cluster.datacenters", 
"Current set of datacenters in use");
+    AttributeSensor<String> HOSTNAME = CassandraCluster.HOSTNAME;
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    AttributeSensor<Set<Entity>> CURRENT_SEEDS = 
(AttributeSensor)Sensors.newSensor(Set.class, 
"cassandra.cluster.seeds.current", "Current set of seeds to use to bootstrap 
the cluster");
-    
-    AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = 
Sensors.newBooleanSensor("cassandra.cluster.seeds.hasPublished", "Whether we 
have published any seeds");
-    
-    AttributeSensor<String> HOSTNAME = 
Sensors.newStringSensor("cassandra.cluster.hostname", "Hostname to connect to 
cluster with");
+    AttributeSensor<Integer> THRIFT_PORT = CassandraCluster.THRIFT_PORT;
 
-    AttributeSensor<Integer> THRIFT_PORT = 
Sensors.newIntegerSensor("cassandra.cluster.thrift.port", "Cassandra Thrift RPC 
port to connect to cluster with");
+    MethodEffector<Void> UPDATE = new 
MethodEffector<Void>(CassandraFabric.class, "update");
 
+    @Effector(description="Updates the cluster members")
     void update();
 }

Reply via email to