Don't update system table for unknown nodes/dead states Patch by Tyler Hobbs, reviewed by brandonwilliams for CASSANDRA-6053
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1c3d8f3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1c3d8f3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1c3d8f3 Branch: refs/heads/trunk Commit: c1c3d8f3f351a6834e5b02c12790d0d1163107bf Parents: 15cd55f Author: Brandon Williams <[email protected]> Authored: Fri Jan 10 12:49:38 2014 -0600 Committer: Brandon Williams <[email protected]> Committed: Fri Jan 10 12:49:38 2014 -0600 ---------------------------------------------------------------------- .../cassandra/service/StorageService.java | 95 +++++++++++--------- .../service/LeaveAndBootstrapTest.java | 31 +++++++ 2 files changed, 84 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1c3d8f3/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 102e0d8..1ab97b2 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1307,48 +1307,59 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) { - switch (state) - { - case STATUS: - String apStateValue = value.value; - String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1); - assert (pieces.length > 0); - - String moveName = pieces[0]; - - if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING)) - handleStateBootstrap(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_NORMAL)) - handleStateNormal(endpoint, pieces); - else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN)) - handleStateRemoving(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_LEAVING)) - handleStateLeaving(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_LEFT)) - handleStateLeft(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_MOVING)) - handleStateMoving(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_RELOCATING)) - handleStateRelocating(endpoint, pieces); - break; - case RELEASE_VERSION: - SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value)); - break; - case DC: - SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value)); - break; - case RACK: - SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value)); - break; - case RPC_ADDRESS: - SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", quote(value.value)); - break; - case SCHEMA: - SystemKeyspace.updatePeerInfo(endpoint, "schema_version", value.value); - break; - case HOST_ID: - SystemKeyspace.updatePeerInfo(endpoint, "host_id", value.value); - break; + if (state.equals(ApplicationState.STATUS)) + { + String apStateValue = value.value; + String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1); + assert (pieces.length > 0); + + String moveName = pieces[0]; + + if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING)) + handleStateBootstrap(endpoint, pieces); + else if (moveName.equals(VersionedValue.STATUS_NORMAL)) + handleStateNormal(endpoint, pieces); + else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN)) + handleStateRemoving(endpoint, pieces); + else if (moveName.equals(VersionedValue.STATUS_LEAVING)) + handleStateLeaving(endpoint, pieces); + else if (moveName.equals(VersionedValue.STATUS_LEFT)) + handleStateLeft(endpoint, pieces); + else if (moveName.equals(VersionedValue.STATUS_MOVING)) + handleStateMoving(endpoint, pieces); + else if (moveName.equals(VersionedValue.STATUS_RELOCATING)) + handleStateRelocating(endpoint, pieces); + } + else + { + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (epState == null || Gossiper.instance.isDeadState(epState)) + { + logger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); + return; + } + + switch (state) + { + case RELEASE_VERSION: + SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value)); + break; + case DC: + SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value)); + break; + case RACK: + SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value)); + break; + case RPC_ADDRESS: + SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", quote(value.value)); + break; + case SCHEMA: + SystemKeyspace.updatePeerInfo(endpoint, "schema_version", value.value); + break; + case HOST_ID: + SystemKeyspace.updatePeerInfo(endpoint, "host_id", value.value); + break; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1c3d8f3/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java index 44b3400..a9d8057 100644 --- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java +++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java @@ -24,6 +24,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.config.Schema; import org.junit.Test; @@ -650,6 +651,36 @@ public class LeaveAndBootstrapTest assertFalse(tmd.isLeaving(hosts.get(2))); } + /** + * Tests that the system.peers table is not updated after a node has been removed. (See CASSANDRA-6053) + */ + @Test + public void testStateChangeOnRemovedNode() throws UnknownHostException + { + StorageService ss = StorageService.instance; + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + + // create a ring of 2 nodes + ArrayList<Token> endpointTokens = new ArrayList<>(); + List<InetAddress> hosts = new ArrayList<>(); + Util.createInitialRing(ss, partitioner, endpointTokens, new ArrayList<Token>(), hosts, new ArrayList<UUID>(), 2); + + InetAddress toRemove = hosts.get(1); + SystemKeyspace.updatePeerInfo(toRemove, "data_center", "'dc42'"); + SystemKeyspace.updatePeerInfo(toRemove, "rack", "'rack42'"); + assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack")); + + // mark the node as removed + Gossiper.instance.injectApplicationState(toRemove, ApplicationState.STATUS, + valueFactory.left(Collections.singleton(endpointTokens.get(1)), Gossiper.computeExpireTime())); + assertTrue(Gossiper.instance.isDeadState(Gossiper.instance.getEndpointStateForEndpoint(hosts.get(1)))); + + // state changes made after the endpoint has left should be ignored + ss.onChange(hosts.get(1), ApplicationState.RACK, + valueFactory.rack("rack9999")); + assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack")); + } + private static Collection<InetAddress> makeAddrs(String... hosts) throws UnknownHostException { ArrayList<InetAddress> addrs = new ArrayList<InetAddress>(hosts.length);
