Fix issue when Riak nodes do not leave cluster

Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/af5b64f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/af5b64f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/af5b64f3

Branch: refs/heads/master
Commit: af5b64f32131882c5617294184121ca741e181d6
Parents: 508f956
Author: Andrew Kennedy <[email protected]>
Authored: Thu Mar 26 13:31:24 2015 +0000
Committer: Andrew Kennedy <[email protected]>
Committed: Thu Mar 26 13:31:24 2015 +0000

----------------------------------------------------------------------
 .../entity/nosql/riak/RiakClusterImpl.java      |  7 ++-
 .../brooklyn/entity/nosql/riak/RiakNode.java    | 14 ++++--
 .../entity/nosql/riak/RiakNodeDriver.java       |  4 +-
 .../entity/nosql/riak/RiakNodeImpl.java         |  9 +++-
 .../entity/nosql/riak/RiakNodeSshDriver.java    | 48 ++++++++++++--------
 5 files changed, 54 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index fcdb79c..9fff98d 100644
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -41,9 +41,11 @@ import 
brooklyn.entity.group.AbstractMembershipTrackingPolicy;
 import brooklyn.entity.group.DynamicClusterImpl;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.entity.trait.Startable;
+import brooklyn.event.basic.DependentConfiguration;
 import brooklyn.location.Location;
 import brooklyn.policy.EnricherSpec;
 import brooklyn.policy.PolicySpec;
+import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
 
 import com.google.common.base.Function;
@@ -163,12 +165,13 @@ public class RiakClusterImpl extends DynamicClusterImpl 
implements RiakCluster {
                 }
             } else {
                 if (nodes != null && nodes.containsKey(member)) {
+                    boolean timeout = 
DependentConfiguration.attributeWhenReady(member, 
RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, 
Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES);
                     Optional<Entity> anyNodeInCluster = 
Iterables.tryFind(nodes.keySet(), Predicates.and(
                             Predicates.instanceOf(RiakNode.class),
                             
EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true),
                             Predicates.not(Predicates.equalTo(member))));
-                    if (anyNodeInCluster.isPresent()) {
-                        Entities.invokeEffectorWithArgs(this, 
anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, 
getRiakName(member)).blockUntilEnded();
+                    if (timeout && anyNodeInCluster.isPresent()) {
+                        Entities.invokeEffectorWithArgs(this, 
anyNodeInCluster.get(), RiakNode.REMOVE_FROM_CLUSTER, 
getRiakName(member)).blockUntilEnded();
                     }
                     nodes.remove(member);
                     setAttribute(RIAK_CLUSTER_NODES, nodes);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
index 6b76a81..ad1a736 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
@@ -140,6 +140,7 @@ public interface RiakNode extends SoftwareProcess {
 
     MethodEffector<Void> JOIN_RIAK_CLUSTER = new 
MethodEffector<Void>(RiakNode.class, "joinCluster");
     MethodEffector<Void> LEAVE_RIAK_CLUSTER = new 
MethodEffector<Void>(RiakNode.class, "leaveCluster");
+    MethodEffector<Void> REMOVE_FROM_CLUSTER = new 
MethodEffector<Void>(RiakNode.class, "removeNode");
     MethodEffector<Void> COMMIT_RIAK_CLUSTER = new 
MethodEffector<Void>(RiakNode.class, "commitCluster");
 
     AttributeSensor<Integer> RIAK_NODE_GET_FSM_TIME_MEAN = 
Sensors.newIntegerSensor("riak.node_get_fsm_time_mean", "Time between reception 
of client read request and subsequent response to client");
@@ -180,16 +181,19 @@ public interface RiakNode extends SoftwareProcess {
 
     String getOsMajorVersion();
 
-    @Effector(description = "Add this riak node to the Riak cluster")
+    @Effector(description = "Join the Riak cluster on the given node")
     public void joinCluster(@EffectorParam(name = "nodeName") String nodeName);
 
-    @Effector(description = "Remove this Riak node from the cluster")
-    public void leaveCluster(@EffectorParam(name = "nodeName") String 
nodeName);
+    @Effector(description = "Leave the Riak cluster")
+    public void leaveCluster();
 
-    @Effector(description = "Recover a failed Riak node and join it back to 
the cluster (by passing it a working node on the cluster 'node')")
+    @Effector(description = "Remove the given node from the Riak cluster")
+    public void removeNode(@EffectorParam(name = "nodeName") String nodeName);
+
+    @Effector(description = "Recover and join the Riak cluster on the given 
node")
     public void recoverFailedNode(@EffectorParam(name = "nodeName") String 
nodeName);
 
-    @Effector(description = "Commit changes made to a Riak cluster")
+    @Effector(description = "Commit changes made to the Riak cluster")
     public void commitCluster();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
index b81b7fc..7da618a 100644
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
+++ 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
@@ -26,7 +26,9 @@ public interface RiakNodeDriver extends SoftwareProcessDriver 
{
 
     public void joinCluster(String nodeName);
 
-    public void leaveCluster(String nodeName);
+    public void leaveCluster();
+
+    public void removeNode(String nodeName);
 
     public void recoverFailedNode(String nodeName);
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
index 954d54f..bcb45f1 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
@@ -191,8 +191,13 @@ public class RiakNodeImpl extends SoftwareProcessImpl 
implements RiakNode {
     }
 
     @Override
-    public void leaveCluster(String nodeName) {
-        getDriver().leaveCluster(nodeName);
+    public void leaveCluster() {
+        getDriver().leaveCluster();
+    }
+
+    @Override
+    public void removeNode(String nodeName) {
+        getDriver().removeNode(nodeName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
index cb032b7..034b31e 100644
--- 
a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
+++ 
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
@@ -37,12 +37,6 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
 import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
 import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.Entities;
@@ -58,6 +52,12 @@ import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.ssh.SshTasks;
 import brooklyn.util.text.Strings;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
 // TODO: Alter -env ERL_CRASH_DUMP path in vm.args
 public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver 
implements RiakNodeDriver {
 
@@ -331,7 +331,7 @@ public class RiakNodeSshDriver extends 
AbstractSoftwareProcessSshDriver implemen
 
     @Override
     public void stop() {
-        leaveCluster(""); // No node name means this node will leave
+        leaveCluster();
 
         String command = format("%s stop", getRiakCmd());
         command = isPackageInstall() ? sudo(command) : command;
@@ -385,15 +385,14 @@ public class RiakNodeSshDriver extends 
AbstractSoftwareProcessSshDriver implemen
         return isPackageInstall() ? "riak-admin" : 
Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin");
     }
 
+    // TODO find a way to batch commit the changes, instead of committing for 
every operation.
+
     @Override
     public void joinCluster(String nodeName) {
-        //FIXME: find a way to batch commit the changes, instead of committing 
for every operation.
-
         if (getRiakName().equals(nodeName)) {
-            log.warn("cannot join riak node: {} to itself", nodeName);
+            log.warn("Cannot join Riak node: {} to itself", nodeName);
         } else {
             if (!hasJoinedCluster()) {
-
                 ScriptHelper joinClusterScript = newScript("joinCluster")
                         .body.append(sudo(format("%s cluster join %s", 
getRiakAdminCmd(), nodeName)))
                         .failOnNonZeroResultCode();
@@ -414,14 +413,10 @@ public class RiakNodeSshDriver extends 
AbstractSoftwareProcessSshDriver implemen
     }
 
     @Override
-    public void leaveCluster(String nodeName) {
-        //TODO: add 'riak-admin cluster force-remove' for erroneous and 
unrecoverable nodes.
-        //FIXME: find a way to batch commit the changes, instead of committing 
for every operation.
-        //FIXME: find a way to check if the node is the last in the cluster to 
avoid removing the only member and getting "last node error"
-
+    public void leaveCluster() {
         if (hasJoinedCluster()) {
             ScriptHelper leaveClusterScript = newScript("leaveCluster")
-                    .body.append(sudo(format("%s cluster leave %s", 
getRiakAdminCmd(), nodeName)))
+                    .body.append(sudo(format("%s cluster leave", 
getRiakAdminCmd())))
                     .body.append(sudo(format("%s cluster plan", 
getRiakAdminCmd())))
                     .body.append(sudo(format("%s cluster commit", 
getRiakAdminCmd())))
                     .failOnNonZeroResultCode();
@@ -436,11 +431,28 @@ public class RiakNodeSshDriver extends 
AbstractSoftwareProcessSshDriver implemen
 
             entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, 
Boolean.FALSE);
         } else {
-            log.warn("entity {}: is not in the riak cluster", entity.getId());
+            log.warn("entity {}: has already left the riak cluster", 
entity.getId());
         }
     }
 
     @Override
+    public void removeNode(String nodeName) {
+        ScriptHelper removeNodeScript = newScript("removeNode")
+                .body.append(sudo(format("%s cluster force-remove %s", 
getRiakAdminCmd(), nodeName)))
+                .body.append(sudo(format("%s cluster plan", 
getRiakAdminCmd())))
+                .body.append(sudo(format("%s cluster commit", 
getRiakAdminCmd())))
+                .failOnNonZeroResultCode();
+
+        if (!isRiakOnPath()) {
+            Map<String, String> newPathVariable = ImmutableMap.of("PATH", 
sbinPath);
+            log.warn("riak command not found on PATH. Altering future 
commands' environment variables from {} to {}", getShellEnvironment(), 
newPathVariable);
+            removeNodeScript.environmentVariablesReset(newPathVariable);
+        }
+
+        removeNodeScript.execute();
+    }
+
+    @Override
     public void commitCluster() {
         if (hasJoinedCluster()) {
             ScriptHelper commitClusterScript = newScript("commitCluster")

Reply via email to