Repository: cassandra Updated Branches: refs/heads/cassandra-3.X 599dbbc9f -> 4901e4b1e refs/heads/trunk 066ba25c5 -> d8049ae10
add method to get size of endpoints to TokenMetadata Patch by Dikang Gu; reviewed by Jason Brown for CASSANDRA-12999 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4901e4b1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4901e4b1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4901e4b1 Branch: refs/heads/cassandra-3.X Commit: 4901e4b1e97975a9fbc57d004bd8fe668ebc5d57 Parents: 599dbbc Author: Dikang Gu <[email protected]> Authored: Mon Dec 5 12:19:30 2016 -0800 Committer: Dikang Gu <[email protected]> Committed: Fri Dec 9 16:17:47 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/batchlog/BatchlogManager.java | 2 +- .../org/apache/cassandra/dht/RangeStreamer.java | 2 +- .../cassandra/hints/HintsDispatchExecutor.java | 2 +- .../apache/cassandra/locator/TokenMetadata.java | 32 +++++++++++++++- .../cassandra/service/StorageService.java | 6 +-- .../cassandra/locator/TokenMetadataTest.java | 39 ++++++++++++++++++++ .../service/LeaveAndBootstrapTest.java | 4 +- .../org/apache/cassandra/service/MoveTest.java | 4 +- .../apache/cassandra/service/RemoveTest.java | 4 +- 10 files changed, 83 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4aac593..3c49a8a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.12 + * add method to get size of endpoints to TokenMetadata (CASSANDRA-12999) * Fix primary index calculation for SASI (CASSANDRA-12910) * Expose time spent waiting in thread pool queue (CASSANDRA-8398) * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 9cb3b10..d23103c 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -187,7 +187,7 @@ public class BatchlogManager implements BatchlogManagerMBean // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). - int endpointsCount = StorageService.instance.getTokenMetadata().getAllEndpoints().size(); + int endpointsCount = StorageService.instance.getTokenMetadata().getSizeOfAllEndpoints(); if (endpointsCount <= 0) { logger.trace("Replay cancelled as there are no peers in the ring."); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index fd33d19..504ef7e 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -202,7 +202,7 @@ public class RangeStreamer AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy(); return useStrictConsistency && tokens != null - && metadata.getAllEndpoints().size() != strat.getReplicationFactor(); + && metadata.getSizeOfAllEndpoints() != strat.getReplicationFactor(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java index d7ccf81..afe7c08 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java @@ -199,7 +199,7 @@ final class HintsDispatchExecutor // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster, // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time). - int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1); + int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getSizeOfAllEndpoints() - 1); int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount; this.rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 8712916..50d9744 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -254,7 +254,7 @@ public class TokenMetadata UUID storedId = endpointToHostIdMap.get(endpoint); if ((storedId != null) && (!storedId.equals(hostId))) logger.warn("Changing {}'s host ID from {} to {}", endpoint, storedId, hostId); - + endpointToHostIdMap.forcePut(endpoint, hostId); } finally @@ -999,6 +999,16 @@ public class TokenMetadata } } + /** + * We think the size() operation is safe enough, so we call it without the read lock on purpose. + * + * see CASSANDRA-12999 + */ + public int getSizeOfAllEndpoints() + { + return endpointToHostIdMap.size(); + } + /** caller should not modify leavingEndpoints */ public Set<InetAddress> getLeavingEndpoints() { @@ -1014,6 +1024,16 @@ public class TokenMetadata } /** + * We think the size() operation is safe enough, so we call it without the read lock on purpose. + * + * see CASSANDRA-12999 + */ + public int getSizeOfLeavingEndpoints() + { + return leavingEndpoints.size(); + } + + /** * Endpoints which are migrating to the new tokens * @return set of addresses of moving endpoints */ @@ -1030,6 +1050,16 @@ public class TokenMetadata } } + /** + * We think the size() operation is safe enough, so we call it without the read lock on purpose. + * + * see CASSANDRA-12999 + */ + public int getSizeOfMovingEndpoints() + { + return movingEndpoints.size(); + } + public static int firstTokenIndex(final ArrayList<Token> ring, Token start, boolean insertMin) { assert ring.size() > 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a53187f..9bf5679 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -881,8 +881,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (useStrictConsistency && !allowSimultaneousMoves() && ( tokenMetadata.getBootstrapTokens().valueSet().size() > 0 || - tokenMetadata.getLeavingEndpoints().size() > 0 || - tokenMetadata.getMovingEndpoints().size() > 0 + tokenMetadata.getSizeOfLeavingEndpoints() > 0 || + tokenMetadata.getSizeOfMovingEndpoints() > 0 )) { throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true"); @@ -4209,7 +4209,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public void forceRemoveCompletion() { - if (!replicatingNodes.isEmpty() || !tokenMetadata.getLeavingEndpoints().isEmpty()) + if (!replicatingNodes.isEmpty() || tokenMetadata.getSizeOfLeavingEndpoints() > 0) { logger.warn("Removal not confirmed for for {}", StringUtils.join(this.replicatingNodes, ",")); for (InetAddress endpoint : tokenMetadata.getLeavingEndpoints()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java index 91c83bd..e5a86fd 100644 --- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java +++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java @@ -21,6 +21,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Map; +import java.util.UUID; import com.google.common.collect.Iterators; import com.google.common.collect.Multimap; @@ -288,4 +289,42 @@ public class TokenMetadataTest assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first)); assertTrue(racks.get(DATA_CENTER).get(RACK2).contains(second)); } + + @Test + public void testEndpointSizes() throws UnknownHostException + { + final InetAddress first = InetAddress.getByName("127.0.0.1"); + final InetAddress second = InetAddress.getByName("127.0.0.6"); + + tmd.updateNormalToken(token(ONE), first); + tmd.updateNormalToken(token(SIX), second); + + TokenMetadata tokenMetadata = tmd.cloneOnlyTokenMap(); + assertNotNull(tokenMetadata); + + tokenMetadata.updateHostId(UUID.randomUUID(), first); + tokenMetadata.updateHostId(UUID.randomUUID(), second); + + assertEquals(2, tokenMetadata.getSizeOfAllEndpoints()); + assertEquals(0, tokenMetadata.getSizeOfLeavingEndpoints()); + assertEquals(0, tokenMetadata.getSizeOfMovingEndpoints()); + + tokenMetadata.addLeavingEndpoint(first); + assertEquals(1, tokenMetadata.getSizeOfLeavingEndpoints()); + + tokenMetadata.removeEndpoint(first); + assertEquals(0, tokenMetadata.getSizeOfLeavingEndpoints()); + assertEquals(1, tokenMetadata.getSizeOfAllEndpoints()); + + tokenMetadata.addMovingEndpoint(token(SIX), second); + assertEquals(1, tokenMetadata.getSizeOfMovingEndpoints()); + + tokenMetadata.removeFromMoving(second); + assertEquals(0, tokenMetadata.getSizeOfMovingEndpoints()); + + tokenMetadata.removeEndpoint(second); + assertEquals(0, tokenMetadata.getSizeOfAllEndpoints()); + assertEquals(0, tokenMetadata.getSizeOfLeavingEndpoints()); + assertEquals(0, tokenMetadata.getSizeOfMovingEndpoints()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java index 19f0b7a..754def9 100644 --- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java +++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java @@ -556,7 +556,7 @@ public class LeaveAndBootstrapTest Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(2)))); ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)))); - assertTrue(tmd.getLeavingEndpoints().isEmpty()); + assertTrue(tmd.getSizeOfLeavingEndpoints() == 0); assertEquals(keyTokens.get(2), tmd.getToken(hosts.get(2))); // node 3 goes through leave and left and then jumps to normal at its new token @@ -567,7 +567,7 @@ public class LeaveAndBootstrapTest ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(4)))); assertTrue(tmd.getBootstrapTokens().isEmpty()); - assertTrue(tmd.getLeavingEndpoints().isEmpty()); + assertTrue(tmd.getSizeOfLeavingEndpoints() == 0); assertEquals(keyTokens.get(4), tmd.getToken(hosts.get(2))); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index 05757c0..c6dce70 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -978,7 +978,7 @@ public class MoveTest Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(newToken))); ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken))); - assertTrue(tmd.getMovingEndpoints().isEmpty()); + assertTrue(tmd.getSizeOfMovingEndpoints() == 0); assertEquals(newToken, tmd.getToken(hosts.get(2))); newToken = positionToken(8); @@ -988,7 +988,7 @@ public class MoveTest ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken))); assertTrue(tmd.getBootstrapTokens().isEmpty()); - assertTrue(tmd.getMovingEndpoints().isEmpty()); + assertTrue(tmd.getSizeOfMovingEndpoints() == 0); assertEquals(newToken, tmd.getToken(hosts.get(2))); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4901e4b1/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index 0ef9b9c..f43c4f4 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -163,7 +163,7 @@ public class RemoveTest Thread.sleep(1000); // make sure removal is waiting for confirmation assertTrue(tmd.isLeaving(removalhost)); - assertEquals(1, tmd.getLeavingEndpoints().size()); + assertEquals(1, tmd.getSizeOfLeavingEndpoints()); for (InetAddress host : hosts) { @@ -174,6 +174,6 @@ public class RemoveTest remover.join(); assertTrue(success.get()); - assertTrue(tmd.getLeavingEndpoints().isEmpty()); + assertTrue(tmd.getSizeOfLeavingEndpoints() == 0); } }
