added leave cluster call if node exits cluster + added after cluster initialization cluster commit calls
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/447db550 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/447db550 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/447db550 Branch: refs/pull/1411/merge Commit: 447db5508545c0a04c404863d45a83453017f6ae Parents: 16c5abb Author: ZaidM <zaid.moh...@cloudsoftcorp.com> Authored: Thu May 22 18:00:56 2014 +0100 Committer: ZaidM <zaid.moh...@cloudsoftcorp.com> Committed: Thu May 22 18:00:56 2014 +0100 ---------------------------------------------------------------------- .../java/brooklyn/demo/RiakClusterExample.java | 13 +++++++++- .../brooklyn/entity/nosql/riak/RiakCluster.java | 1 + .../entity/nosql/riak/RiakClusterImpl.java | 26 +++++++++++++++++--- 3 files changed, 35 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/447db550/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java ---------------------------------------------------------------------- diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java index debfb76..d33c844 100644 --- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java +++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java @@ -12,9 +12,16 @@ import brooklyn.entity.basic.AbstractApplication; import brooklyn.entity.basic.ConfigKeys; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.StartableApplication; +import brooklyn.entity.nosql.cassandra.CassandraDatacenter; +import brooklyn.entity.nosql.cassandra.CassandraNode; import brooklyn.entity.nosql.riak.RiakCluster; +import brooklyn.entity.nosql.riak.RiakNode; import brooklyn.entity.proxying.EntitySpec; import brooklyn.launcher.BrooklynLauncher; +import brooklyn.policy.PolicySpec; +import brooklyn.policy.ha.ServiceFailureDetector; +import brooklyn.policy.ha.ServiceReplacer; +import brooklyn.policy.ha.ServiceRestarter; import brooklyn.util.CommandLineUtil; @Catalog(name = "Riak Cluster Application", description = "Riak ring deployment blueprint") @@ -43,7 +50,11 @@ public class RiakClusterExample extends AbstractApplication { public void init() { addChild(EntitySpec.create(RiakCluster.class) - .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE))); + .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE)) + .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(RiakNode.class) + .policy(PolicySpec.create(ServiceFailureDetector.class)) + .policy(PolicySpec.create(ServiceRestarter.class) + .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, ServiceFailureDetector.ENTITY_FAILED)))); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/447db550/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java index 41aafd2..de65c74 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java @@ -23,4 +23,5 @@ public interface RiakCluster extends DynamicCluster { @SetFromFlag("delayBeforeAdvertisingCluster") ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.seconds(2 * 60)); + AttributeSensor<Boolean> IS_CLUSTER_INIT = Sensors.newBooleanSensor("riak.cluster.isClusterInit", "flag to determine if the cluster was already initialized"); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/447db550/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java index 11ecf92..12dd5fd 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java @@ -35,10 +35,9 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { private AtomicBoolean isFirstNodeSet = new AtomicBoolean(); public void init() { - log.info("Initializing the riak cluster..."); super.init(); - - + log.info("Initializing the riak cluster..."); + setAttribute(IS_CLUSTER_INIT, false); } @Override @@ -60,6 +59,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { if (anyNode.isPresent()) { log.info("Planning and Committing cluster changes on node: {}, cluster: {}", anyNode.get().getId(), getId()); Entities.invokeEffector(this, anyNode.get(), RiakNode.COMMIT_RIAK_CLUSTER); + setAttribute(IS_CLUSTER_INIT, true); } else { log.warn("No Riak Nodes are found on the cluster: {}. Initialization Failed", getId()); setAttribute(SERVICE_STATE, Lifecycle.ON_FIRE); @@ -137,9 +137,12 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { if (anyNodeInCluster.isPresent()) { if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) { + String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME); Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName); - + if (getAttribute(IS_CLUSTER_INIT)) { + Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER); + } nodes.put(member, riakName); setAttribute(RIAK_CLUSTER_NODES, nodes); log.info("Adding riak node {}: {}; {} to cluster", new Object[]{this, member, getRiakName(member)}); @@ -152,9 +155,23 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { } else { Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES); if (nodes != null && nodes.containsKey(member)) { + final Entity memberToBeRemoved = member; + + Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() { + + @Override + public boolean apply(@Nullable Entity node) { + return (node instanceof RiakNode && hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved)); + } + }); + if (anyNodeInCluster.isPresent()) { + Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(memberToBeRemoved)); + } + nodes.remove(member); setAttribute(RIAK_CLUSTER_NODES, nodes); log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this, member, getRiakName(member)}); + } } if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member); @@ -183,4 +200,5 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { private Boolean hasMemberJoinedCluster(Entity member) { return Optional.fromNullable(member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)).or(Boolean.FALSE); } + }