Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 9dafa438a -> e0c1b0bb7 refs/heads/cassandra-3.0 1d7bacc45 -> a4e3f0847 refs/heads/cassandra-3.3 dbd5ce104 -> afcb167b9 refs/heads/trunk 5dc53e98c -> 0bc2744e4
Optimize pending ranges computation patch by dikanggu; reviewed by blambov for CASSANDRA-9258 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6ff1cbb3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6ff1cbb3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6ff1cbb3 Branch: refs/heads/cassandra-2.2 Commit: 6ff1cbb3ee1b7e6f261aeb454854dd249ab605df Parents: 9dafa43 Author: Dikang Gu <[email protected]> Authored: Tue Jan 5 15:22:06 2016 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Tue Jan 5 15:22:06 2016 +0100 ---------------------------------------------------------------------- .../apache/cassandra/locator/TokenMetadata.java | 61 ++++++++++---------- .../cassandra/service/StorageService.java | 2 +- 2 files changed, 32 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ff1cbb3/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index db0b609..00d8ee9 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -82,7 +82,7 @@ public class TokenMetadata // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving) private final Set<InetAddress> leavingEndpoints = new HashSet<>(); // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints} - private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>> pendingRanges = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, PendingRangeMaps> pendingRanges = new ConcurrentHashMap<String, PendingRangeMaps>(); // nodes which are migrating to the new tokens in the ring private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>(); @@ -673,23 +673,30 @@ public class TokenMetadata return sortedTokens; } - private Multimap<Range<Token>, InetAddress> getPendingRangesMM(String keyspaceName) + public Multimap<Range<Token>, InetAddress> getPendingRangesMM(String keyspaceName) { - Multimap<Range<Token>, InetAddress> map = pendingRanges.get(keyspaceName); - if (map == null) + Multimap<Range<Token>, InetAddress> map = HashMultimap.create(); + PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); + + if (pendingRangeMaps != null) { - map = HashMultimap.create(); - Multimap<Range<Token>, InetAddress> priorMap = pendingRanges.putIfAbsent(keyspaceName, map); - if (priorMap != null) - map = priorMap; + for (Map.Entry<Range<Token>, List<InetAddress>> entry : pendingRangeMaps) + { + Range<Token> range = entry.getKey(); + for (InetAddress address : entry.getValue()) + { + map.put(range, address); + } + } } + return map; } /** a mutable map may be returned but caller should not modify it */ - public Map<Range<Token>, Collection<InetAddress>> getPendingRanges(String keyspaceName) + public PendingRangeMaps getPendingRanges(String keyspaceName) { - return getPendingRangesMM(keyspaceName).asMap(); + return this.pendingRanges.get(keyspaceName); } public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddress endpoint) @@ -733,7 +740,7 @@ public class TokenMetadata lock.readLock().lock(); try { - Multimap<Range<Token>, InetAddress> newPendingRanges = HashMultimap.create(); + PendingRangeMaps newPendingRanges = new PendingRangeMaps(); if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) { @@ -761,7 +768,10 @@ public class TokenMetadata { Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - newPendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); + for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints)) + { + newPendingRanges.addPendingRange(range, address); + } } // At this stage newPendingRanges has been updated according to leave operations. We can @@ -776,7 +786,9 @@ public class TokenMetadata allLeftMetadata.updateNormalTokens(tokens, endpoint); for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - newPendingRanges.put(range, endpoint); + { + newPendingRanges.addPendingRange(range, endpoint); + } allLeftMetadata.removeEndpoint(endpoint); } @@ -794,7 +806,7 @@ public class TokenMetadata for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) { - newPendingRanges.put(range, endpoint); + newPendingRanges.addPendingRange(range, endpoint); } allLeftMetadata.removeEndpoint(endpoint); @@ -1029,13 +1041,9 @@ public class TokenMetadata { StringBuilder sb = new StringBuilder(); - for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : pendingRanges.entrySet()) + for (PendingRangeMaps pendingRangeMaps : pendingRanges.values()) { - for (Map.Entry<Range<Token>, InetAddress> rmap : entry.getValue().entries()) - { - sb.append(rmap.getValue()).append(':').append(rmap.getKey()); - sb.append(System.getProperty("line.separator")); - } + sb.append(pendingRangeMaps.printPendingRanges()); } return sb.toString(); @@ -1043,18 +1051,11 @@ public class TokenMetadata public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName) { - Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(keyspaceName); - if (ranges.isEmpty()) + PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); + if (pendingRangeMaps == null) return Collections.emptyList(); - Set<InetAddress> endpoints = new HashSet<>(); - for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : ranges.entrySet()) - { - if (entry.getKey().contains(token)) - endpoints.addAll(entry.getValue()); - } - - return endpoints; + return pendingRangeMaps.pendingEndpointsFor(token); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ff1cbb3/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index e8e7daf..84ebd9a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1378,7 +1378,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE keyspace = Schema.instance.getNonSystemKeyspaces().get(0); Map<List<String>, List<String>> map = new HashMap<>(); - for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : tokenMetadata.getPendingRanges(keyspace).entrySet()) + for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet()) { List<InetAddress> l = new ArrayList<>(entry.getValue()); map.put(entry.getKey().asList(), stringify(l));
