added leave cluster call if node exits cluster + added after cluster 
initialization cluster commit calls


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/dd001766
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/dd001766
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/dd001766

Branch: refs/pull/1411/merge
Commit: dd001766a080e14361cc23d48379cd7e9db919d2
Parents: 16c5abb
Author: ZaidM <zaid.moh...@cloudsoftcorp.com>
Authored: Thu May 22 18:00:56 2014 +0100
Committer: ZaidM <zaid.moh...@cloudsoftcorp.com>
Committed: Thu May 22 18:06:02 2014 +0100

----------------------------------------------------------------------
 .../java/brooklyn/demo/RiakClusterExample.java  | 11 ++++++++-
 .../brooklyn/entity/nosql/riak/RiakCluster.java |  1 +
 .../entity/nosql/riak/RiakClusterImpl.java      | 26 +++++++++++++++++---
 3 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd001766/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
----------------------------------------------------------------------
diff --git 
a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
 
b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
index debfb76..d332a95 100644
--- 
a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
+++ 
b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
@@ -12,9 +12,14 @@ import brooklyn.entity.basic.AbstractApplication;
 import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.StartableApplication;
+import brooklyn.entity.nosql.cassandra.CassandraDatacenter;
 import brooklyn.entity.nosql.riak.RiakCluster;
+import brooklyn.entity.nosql.riak.RiakNode;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.launcher.BrooklynLauncher;
+import brooklyn.policy.PolicySpec;
+import brooklyn.policy.ha.ServiceFailureDetector;
+import brooklyn.policy.ha.ServiceRestarter;
 import brooklyn.util.CommandLineUtil;
 
 @Catalog(name = "Riak Cluster Application", description = "Riak ring 
deployment blueprint")
@@ -43,7 +48,11 @@ public class RiakClusterExample extends AbstractApplication {
 
     public void init() {
         addChild(EntitySpec.create(RiakCluster.class)
-                .configure(RiakCluster.INITIAL_SIZE, 
getConfig(RIAK_RING_SIZE)));
+                .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE))
+                .configure(CassandraDatacenter.MEMBER_SPEC, 
EntitySpec.create(RiakNode.class)
+                        
.policy(PolicySpec.create(ServiceFailureDetector.class))
+                        .policy(PolicySpec.create(ServiceRestarter.class)
+                                
.configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, 
ServiceFailureDetector.ENTITY_FAILED))));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd001766/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
index 41aafd2..de65c74 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
@@ -23,4 +23,5 @@ public interface RiakCluster extends DynamicCluster {
     @SetFromFlag("delayBeforeAdvertisingCluster")
     ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = 
ConfigKeys.newConfigKey(Duration.class, 
"couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is 
started before checking and advertising its availability", Duration.seconds(2 * 
60));
 
+    AttributeSensor<Boolean> IS_CLUSTER_INIT = 
Sensors.newBooleanSensor("riak.cluster.isClusterInit", "flag to determine if 
the cluster was already initialized");
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd001766/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index 11ecf92..12dd5fd 100644
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -35,10 +35,9 @@ public class RiakClusterImpl extends DynamicClusterImpl 
implements RiakCluster {
     private AtomicBoolean isFirstNodeSet = new AtomicBoolean();
 
     public void init() {
-        log.info("Initializing the riak cluster...");
         super.init();
-
-
+        log.info("Initializing the riak cluster...");
+        setAttribute(IS_CLUSTER_INIT, false);
     }
 
     @Override
@@ -60,6 +59,7 @@ public class RiakClusterImpl extends DynamicClusterImpl 
implements RiakCluster {
         if (anyNode.isPresent()) {
             log.info("Planning and Committing cluster changes on node: {}, 
cluster: {}", anyNode.get().getId(), getId());
             Entities.invokeEffector(this, anyNode.get(), 
RiakNode.COMMIT_RIAK_CLUSTER);
+            setAttribute(IS_CLUSTER_INIT, true);
         } else {
             log.warn("No Riak Nodes are found on the cluster: {}. 
Initialization Failed", getId());
             setAttribute(SERVICE_STATE, Lifecycle.ON_FIRE);
@@ -137,9 +137,12 @@ public class RiakClusterImpl extends DynamicClusterImpl 
implements RiakCluster {
                     if (anyNodeInCluster.isPresent()) {
                         if (!nodes.containsKey(member) && 
!hasMemberJoinedCluster(member)) {
 
+
                             String anyNodeName = 
anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
                             Entities.invokeEffectorWithArgs(this, member, 
RiakNode.JOIN_RIAK_CLUSTER, anyNodeName);
-
+                            if (getAttribute(IS_CLUSTER_INIT)) {
+                                Entities.invokeEffector(RiakClusterImpl.this, 
anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+                            }
                             nodes.put(member, riakName);
                             setAttribute(RIAK_CLUSTER_NODES, nodes);
                             log.info("Adding riak node {}: {}; {} to cluster", 
new Object[]{this, member, getRiakName(member)});
@@ -152,9 +155,23 @@ public class RiakClusterImpl extends DynamicClusterImpl 
implements RiakCluster {
         } else {
             Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
             if (nodes != null && nodes.containsKey(member)) {
+                final Entity memberToBeRemoved = member;
+
+                Optional<Entity> anyNodeInCluster = 
Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
+
+                    @Override
+                    public boolean apply(@Nullable Entity node) {
+                        return (node instanceof RiakNode && 
hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved));
+                    }
+                });
+                if (anyNodeInCluster.isPresent()) {
+                    Entities.invokeEffectorWithArgs(this, 
anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, 
getRiakName(memberToBeRemoved));
+                }
+
                 nodes.remove(member);
                 setAttribute(RIAK_CLUSTER_NODES, nodes);
                 log.info("Removing riak node {}: {}; {} from cluster", new 
Object[]{this, member, getRiakName(member)});
+
             }
         }
         if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, 
member);
@@ -183,4 +200,5 @@ public class RiakClusterImpl extends DynamicClusterImpl 
implements RiakCluster {
     private Boolean hasMemberJoinedCluster(Entity member) {
         return 
Optional.fromNullable(member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)).or(Boolean.FALSE);
     }
+
 }

Reply via email to