Fix issue when Riak nodes do not leave cluster
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/af5b64f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/af5b64f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/af5b64f3 Branch: refs/heads/master Commit: af5b64f32131882c5617294184121ca741e181d6 Parents: 508f956 Author: Andrew Kennedy <[email protected]> Authored: Thu Mar 26 13:31:24 2015 +0000 Committer: Andrew Kennedy <[email protected]> Committed: Thu Mar 26 13:31:24 2015 +0000 ---------------------------------------------------------------------- .../entity/nosql/riak/RiakClusterImpl.java | 7 ++- .../brooklyn/entity/nosql/riak/RiakNode.java | 14 ++++-- .../entity/nosql/riak/RiakNodeDriver.java | 4 +- .../entity/nosql/riak/RiakNodeImpl.java | 9 +++- .../entity/nosql/riak/RiakNodeSshDriver.java | 48 ++++++++++++-------- 5 files changed, 54 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java index fcdb79c..9fff98d 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java @@ -41,9 +41,11 @@ import brooklyn.entity.group.AbstractMembershipTrackingPolicy; import brooklyn.entity.group.DynamicClusterImpl; import brooklyn.entity.proxying.EntitySpec; import brooklyn.entity.trait.Startable; +import brooklyn.event.basic.DependentConfiguration; import brooklyn.location.Location; import brooklyn.policy.EnricherSpec; import brooklyn.policy.PolicySpec; +import brooklyn.util.time.Duration; import brooklyn.util.time.Time; import com.google.common.base.Function; @@ -163,12 +165,13 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { } } else { if (nodes != null && nodes.containsKey(member)) { + boolean timeout = DependentConfiguration.attributeWhenReady(member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES); Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and( Predicates.instanceOf(RiakNode.class), EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), Predicates.not(Predicates.equalTo(member)))); - if (anyNodeInCluster.isPresent()) { - Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(member)).blockUntilEnded(); + if (timeout && anyNodeInCluster.isPresent()) { + Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.REMOVE_FROM_CLUSTER, getRiakName(member)).blockUntilEnded(); } nodes.remove(member); setAttribute(RIAK_CLUSTER_NODES, nodes); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java index 6b76a81..ad1a736 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java @@ -140,6 +140,7 @@ public interface RiakNode extends SoftwareProcess { MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster"); MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster"); + MethodEffector<Void> REMOVE_FROM_CLUSTER = new MethodEffector<Void>(RiakNode.class, "removeNode"); MethodEffector<Void> COMMIT_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "commitCluster"); AttributeSensor<Integer> RIAK_NODE_GET_FSM_TIME_MEAN = Sensors.newIntegerSensor("riak.node_get_fsm_time_mean", "Time between reception of client read request and subsequent response to client"); @@ -180,16 +181,19 @@ public interface RiakNode extends SoftwareProcess { String getOsMajorVersion(); - @Effector(description = "Add this riak node to the Riak cluster") + @Effector(description = "Join the Riak cluster on the given node") public void joinCluster(@EffectorParam(name = "nodeName") String nodeName); - @Effector(description = "Remove this Riak node from the cluster") - public void leaveCluster(@EffectorParam(name = "nodeName") String nodeName); + @Effector(description = "Leave the Riak cluster") + public void leaveCluster(); - @Effector(description = "Recover a failed Riak node and join it back to the cluster (by passing it a working node on the cluster 'node')") + @Effector(description = "Remove the given node from the Riak cluster") + public void removeNode(@EffectorParam(name = "nodeName") String nodeName); + + @Effector(description = "Recover and join the Riak cluster on the given node") public void recoverFailedNode(@EffectorParam(name = "nodeName") String nodeName); - @Effector(description = "Commit changes made to a Riak cluster") + @Effector(description = "Commit changes made to the Riak cluster") public void commitCluster(); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java index b81b7fc..7da618a 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java @@ -26,7 +26,9 @@ public interface RiakNodeDriver extends SoftwareProcessDriver { public void joinCluster(String nodeName); - public void leaveCluster(String nodeName); + public void leaveCluster(); + + public void removeNode(String nodeName); public void recoverFailedNode(String nodeName); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java index 954d54f..bcb45f1 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java @@ -191,8 +191,13 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode { } @Override - public void leaveCluster(String nodeName) { - getDriver().leaveCluster(nodeName); + public void leaveCluster() { + getDriver().leaveCluster(); + } + + @Override + public void removeNode(String nodeName) { + getDriver().removeNode(nodeName); } @Override http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/af5b64f3/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java index cb032b7..034b31e 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java @@ -37,12 +37,6 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.Entities; @@ -58,6 +52,12 @@ import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.ssh.SshTasks; import brooklyn.util.text.Strings; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + // TODO: Alter -env ERL_CRASH_DUMP path in vm.args public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implements RiakNodeDriver { @@ -331,7 +331,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen @Override public void stop() { - leaveCluster(""); // No node name means this node will leave + leaveCluster(); String command = format("%s stop", getRiakCmd()); command = isPackageInstall() ? sudo(command) : command; @@ -385,15 +385,14 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen return isPackageInstall() ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin"); } + // TODO find a way to batch commit the changes, instead of committing for every operation. + @Override public void joinCluster(String nodeName) { - //FIXME: find a way to batch commit the changes, instead of committing for every operation. - if (getRiakName().equals(nodeName)) { - log.warn("cannot join riak node: {} to itself", nodeName); + log.warn("Cannot join Riak node: {} to itself", nodeName); } else { if (!hasJoinedCluster()) { - ScriptHelper joinClusterScript = newScript("joinCluster") .body.append(sudo(format("%s cluster join %s", getRiakAdminCmd(), nodeName))) .failOnNonZeroResultCode(); @@ -414,14 +413,10 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen } @Override - public void leaveCluster(String nodeName) { - //TODO: add 'riak-admin cluster force-remove' for erroneous and unrecoverable nodes. - //FIXME: find a way to batch commit the changes, instead of committing for every operation. - //FIXME: find a way to check if the node is the last in the cluster to avoid removing the only member and getting "last node error" - + public void leaveCluster() { if (hasJoinedCluster()) { ScriptHelper leaveClusterScript = newScript("leaveCluster") - .body.append(sudo(format("%s cluster leave %s", getRiakAdminCmd(), nodeName))) + .body.append(sudo(format("%s cluster leave", getRiakAdminCmd()))) .body.append(sudo(format("%s cluster plan", getRiakAdminCmd()))) .body.append(sudo(format("%s cluster commit", getRiakAdminCmd()))) .failOnNonZeroResultCode(); @@ -436,11 +431,28 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.FALSE); } else { - log.warn("entity {}: is not in the riak cluster", entity.getId()); + log.warn("entity {}: has already left the riak cluster", entity.getId()); } } @Override + public void removeNode(String nodeName) { + ScriptHelper removeNodeScript = newScript("removeNode") + .body.append(sudo(format("%s cluster force-remove %s", getRiakAdminCmd(), nodeName))) + .body.append(sudo(format("%s cluster plan", getRiakAdminCmd()))) + .body.append(sudo(format("%s cluster commit", getRiakAdminCmd()))) + .failOnNonZeroResultCode(); + + if (!isRiakOnPath()) { + Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); + log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); + removeNodeScript.environmentVariablesReset(newPathVariable); + } + + removeNodeScript.execute(); + } + + @Override public void commitCluster() { if (hasJoinedCluster()) { ScriptHelper commitClusterScript = newScript("commitCluster")
