Repository: cassandra Updated Branches: refs/heads/trunk d4fd04eb7 -> 55fe548f6
Require forceful decommission if number of nodes is less than replication factor patch by Kurt Greaves; Reviewed by Paulo Motta for CASSANDRA-12510 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cbb9d5d8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cbb9d5d8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cbb9d5d8 Branch: refs/heads/trunk Commit: cbb9d5d8284d91d84c331c90780a9c5ce2c22b75 Parents: cc16ff1 Author: Kurt <[email protected]> Authored: Fri Oct 28 04:22:24 2016 +0000 Committer: Paulo Motta <[email protected]> Committed: Fri Dec 23 21:25:59 2016 -0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/StorageService.java | 53 ++++++++++++++++---- .../cassandra/service/StorageServiceMBean.java | 3 +- .../org/apache/cassandra/tools/NodeProbe.java | 4 +- .../cassandra/tools/nodetool/Decommission.java | 9 +++- 5 files changed, 55 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbb9d5d8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4cb3c45..c5fcec8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.12 + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510) * Allow IN restrictions on column families with collections (CASSANDRA-12654) * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034) * nodetool stopdaemon errors out (CASSANDRA-13030) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbb9d5d8/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5dfac21..834008d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3864,14 +3864,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE PendingRangeCalculatorService.instance.update(); } - public void decommission() throws InterruptedException - { - if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddress())) - throw new UnsupportedOperationException("local node is not a member of the token ring yet"); - if (tokenMetadata.cloneAfterAllLeft().sortedTokens().size() < 2) - throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless"); - if (operationMode != Mode.LEAVING && operationMode != Mode.NORMAL) - throw new UnsupportedOperationException("Node in " + operationMode + " state; wait for status to become normal or restart"); + public void decommission(boolean force) throws InterruptedException + { + TokenMetadata metadata = tokenMetadata.cloneAfterAllLeft(); + if (operationMode != Mode.LEAVING) + { + if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddress())) + throw new UnsupportedOperationException("local node is not a member of the token ring yet"); + if (metadata.getAllEndpoints().size() < 2) + throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless"); + if (operationMode != Mode.NORMAL) + throw new UnsupportedOperationException("Node in " + operationMode + " state; wait for status to become normal or restart"); + } if (isDecommissioning.compareAndSet(true, true)) throw new IllegalStateException("Node is still decommissioning. Check nodetool netstats."); @@ -3881,10 +3885,37 @@ public class StorageService extends NotificationBroadcasterSupport implements IE try { PendingRangeCalculatorService.instance.blockUntilFinished(); - for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) + + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + + if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges { - if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddress()).size() > 0) - throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring"); + int rf, numNodes; + for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) + { + if (!force) + { + Keyspace keyspace = Keyspace.open(keyspaceName); + if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) + { + NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); + rf = strategy.getReplicationFactor(dc); + numNodes = metadata.getTopology().getDatacenterEndpoints().get(dc).size(); + } + else + { + numNodes = metadata.getAllEndpoints().size(); + rf = keyspace.getReplicationStrategy().getReplicationFactor(); + } + + if (numNodes <= rf) + throw new UnsupportedOperationException("Not enough live nodes to maintain replication factor in keyspace " + + keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")." + + " Perform a forceful decommission to ignore."); + } + if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddress()).size() > 0) + throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring"); + } } startLeaving(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbb9d5d8/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 339b991..92a35e6 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -375,8 +375,9 @@ public interface StorageServiceMBean extends NotificationEmitter /** * transfer this node's data to other machines and remove it from service. + * @param force Decommission even if this will reduce N to be less than RF. */ - public void decommission() throws InterruptedException; + public void decommission(boolean force) throws InterruptedException; /** * @param newToken token to move this node to. http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbb9d5d8/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index a48baf8..da438cb 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -640,9 +640,9 @@ public class NodeProbe implements AutoCloseable ssProxy.joinRing(); } - public void decommission() throws InterruptedException + public void decommission(boolean force) throws InterruptedException { - ssProxy.decommission(); + ssProxy.decommission(force); } public void move(String newToken) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/cbb9d5d8/src/java/org/apache/cassandra/tools/nodetool/Decommission.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Decommission.java b/src/java/org/apache/cassandra/tools/nodetool/Decommission.java index 34890e0..294fe07 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Decommission.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Decommission.java @@ -18,6 +18,7 @@ package org.apache.cassandra.tools.nodetool; import io.airlift.command.Command; +import io.airlift.command.Option; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; @@ -25,12 +26,18 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd; @Command(name = "decommission", description = "Decommission the *node I am connecting to*") public class Decommission extends NodeToolCmd { + + @Option(title = "force", + name = {"-f", "--force"}, + description = "Force decommission of this node even when it reduces the number of replicas to below configured RF") + private boolean force = false; + @Override public void execute(NodeProbe probe) { try { - probe.decommission(); + probe.decommission(force); } catch (InterruptedException e) { throw new RuntimeException("Error decommissioning node", e);
