add Topology to TokenMetadata and clean up thread safety design patch by Sam Overton; reviewed by jbellis for CASSANDRA-3881
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/893d1da9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/893d1da9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/893d1da9 Branch: refs/heads/trunk Commit: 893d1da990b4f31462ad241dc0c4b6a91cf3dbee Parents: 602e383 Author: Jonathan Ellis <[email protected]> Authored: Tue Jul 3 11:55:54 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Jul 3 11:55:54 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/dht/RangeStreamer.java | 2 +- .../locator/AbstractReplicationStrategy.java | 2 +- .../apache/cassandra/locator/TokenMetadata.java | 175 ++++++++++++--- .../apache/cassandra/service/StorageService.java | 31 ++-- .../service/AntiEntropyServiceTestAbstract.java | 2 +- .../cassandra/service/LeaveAndBootstrapTest.java | 2 +- .../org/apache/cassandra/service/MoveTest.java | 2 +- 7 files changed, 164 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 33777f0..ce82319 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -149,7 +149,7 @@ public class RangeStreamer implements IEndpointStateChangeSubscriber, IFailureDe private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String table, Collection<Range<Token>> desiredRanges) { AbstractReplicationStrategy strat = Table.open(table).getReplicationStrategy(); - Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(metadata); + Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(metadata.cloneOnlyTokenMap()); Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create(); for (Range<Token> desiredRange : desiredRanges) http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index b654fe2..7fa431a 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -176,7 +176,7 @@ public abstract class AbstractReplicationStrategy public Multimap<InetAddress, Range<Token>> getAddressRanges() { - return getAddressRanges(tokenMetadata); + return getAddressRanges(tokenMetadata.cloneOnlyTokenMap()); } public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress) http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index b78a748..3340b2b 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -26,11 +26,13 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.collect.*; + import org.apache.cassandra.utils.Pair; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.FailureDetector; @@ -70,7 +72,7 @@ public class TokenMetadata // Finally, note that recording the tokens of joining nodes in bootstrapTokens also // means we can detect and reject the addition of multiple nodes at the same token // before one becomes part of the ring. - private final BiMap<Token, InetAddress> bootstrapTokens = Maps.synchronizedBiMap(HashBiMap.<Token, InetAddress>create()); + private final BiMap<Token, InetAddress> bootstrapTokens = HashBiMap.<Token, InetAddress>create(); // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving) private final Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>(); // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints} @@ -82,21 +84,21 @@ public class TokenMetadata /* Use this lock for manipulating the token map */ private final ReadWriteLock lock = new ReentrantReadWriteLock(true); - private ArrayList<Token> sortedTokens; + private volatile ArrayList<Token> sortedTokens; + private final Topology topology; /* list of subscribers that are notified when the tokenToEndpointMap changed */ private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>(); public TokenMetadata() { - this(null); + this(HashBiMap.<Token, InetAddress>create(), new Topology()); } - public TokenMetadata(BiMap<Token, InetAddress> tokenToEndpointMap) + public TokenMetadata(BiMap<Token, InetAddress> tokenToEndpointMap, Topology topology) { - if (tokenToEndpointMap == null) - tokenToEndpointMap = HashBiMap.create(); this.tokenToEndpointMap = tokenToEndpointMap; + this.topology = topology; endpointToHostIdMap = HashBiMap.create(); sortedTokens = sortTokens(); } @@ -113,12 +115,17 @@ public class TokenMetadata { int n = 0; Range<Token> sourceRange = getPrimaryRangeFor(getToken(source)); - synchronized (bootstrapTokens) + lock.readLock().lock(); + try { for (Token token : bootstrapTokens.keySet()) if (sourceRange.contains(token)) n++; } + finally + { + lock.readLock().unlock(); + } return n; } @@ -161,9 +168,13 @@ public class TokenMetadata if (!endpoint.equals(prev)) { if (prev != null) + { logger.warn("Token " + token + " changing ownership from " + prev + " to " + endpoint); + topology.removeEndpoint(prev); + } shouldSortTokens = true; } + topology.addEndpoint(endpoint); leavingEndpoints.remove(endpoint); removeFromMoving(endpoint); // also removing this endpoint from moving } @@ -315,6 +326,7 @@ public class TokenMetadata { bootstrapTokens.inverse().remove(endpoint); tokenToEndpointMap.inverse().remove(endpoint); + topology.removeEndpoint(endpoint); leavingEndpoints.remove(endpoint); endpointToHostIdMap.remove(endpoint); sortedTokens = sortTokens(); @@ -431,7 +443,7 @@ public class TokenMetadata lock.readLock().lock(); try { - return new TokenMetadata(HashBiMap.create(tokenToEndpointMap)); + return new TokenMetadata(HashBiMap.create(tokenToEndpointMap), new Topology(topology)); } finally { @@ -512,15 +524,7 @@ public class TokenMetadata public ArrayList<Token> sortedTokens() { - lock.readLock().lock(); - try - { - return sortedTokens; - } - finally - { - lock.readLock().unlock(); - } + return sortedTokens; } private Multimap<Range<Token>, InetAddress> getPendingRangesMM(String table) @@ -576,10 +580,18 @@ public class TokenMetadata return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1)); } - /** caller should not modify bootstrapTokens */ + /** @return a copy of the bootstrapping tokens map */ public Map<Token, InetAddress> getBootstrapTokens() { - return bootstrapTokens; + lock.readLock().lock(); + try + { + return ImmutableMap.copyOf(bootstrapTokens); + } + finally + { + lock.readLock().unlock(); + } } /** caller should not modify leavingEndpoints */ @@ -662,6 +674,7 @@ public class TokenMetadata { bootstrapTokens.clear(); tokenToEndpointMap.clear(); + topology.clear(); leavingEndpoints.clear(); pendingRanges.clear(); endpointToHostIdMap.clear(); @@ -689,17 +702,14 @@ public class TokenMetadata } } - synchronized (bootstrapTokens) + if (!bootstrapTokens.isEmpty()) { - if (!bootstrapTokens.isEmpty()) + sb.append("Bootstrapping Tokens:" ); + sb.append(System.getProperty("line.separator")); + for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet()) { - sb.append("Bootstrapping Tokens:" ); + sb.append(entry.getValue() + ":" + entry.getKey()); sb.append(System.getProperty("line.separator")); - for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet()) - { - sb.append(entry.getValue() + ":" + entry.getKey()); - sb.append(System.getProperty("line.separator")); - } } } @@ -821,10 +831,7 @@ public class TokenMetadata { Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size() + bootstrapTokens.size()); map.putAll(tokenToEndpointMap); - synchronized (bootstrapTokens) - { - map.putAll(bootstrapTokens); - } + map.putAll(bootstrapTokens); return map; } finally @@ -832,4 +839,110 @@ public class TokenMetadata lock.readLock().unlock(); } } + + /** + * @return the Topology map of nodes to DCs + Racks + * + * This is only allowed when a copy has been made of TokenMetadata, to avoid concurrent modifications + * when Topology methods are subsequently used by the caller. + */ + public Topology getTopology() + { + assert this != StorageService.instance.getTokenMetadata(); + return topology; + } + + /** + * Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints + * in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy. + */ + public static class Topology + { + /** multi-map of DC to endpoints in that DC */ + private final Multimap<String, InetAddress> dcEndpoints; + /** map of DC to multi-map of rack to endpoints in that rack */ + private final Map<String, Multimap<String, InetAddress>> dcRacks; + /** reverse-lookup map for endpoint to current known dc/rack assignment */ + private final Map<InetAddress, Pair<String, String>> currentLocations; + + protected Topology() + { + dcEndpoints = HashMultimap.create(); + dcRacks = new HashMap<String, Multimap<String, InetAddress>>(); + currentLocations = new HashMap<InetAddress, Pair<String, String>>(); + } + + protected void clear() + { + dcEndpoints.clear(); + dcRacks.clear(); + currentLocations.clear(); + } + + /** + * construct deep-copy of other + */ + protected Topology(Topology other) + { + dcEndpoints = HashMultimap.create(other.dcEndpoints); + dcRacks = new HashMap<String, Multimap<String, InetAddress>>(); + for (String dc : other.dcRacks.keySet()) + dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc))); + currentLocations = new HashMap<InetAddress, Pair<String, String>>(other.currentLocations); + } + + /** + * Stores current DC/rack assignment for ep + */ + protected void addEndpoint(InetAddress ep) + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + String dc = snitch.getDatacenter(ep); + String rack = snitch.getRack(ep); + Pair<String, String> current = currentLocations.get(ep); + if (current != null) + { + if (current.left.equals(dc) && current.right.equals(rack)) + return; + dcRacks.get(current.left).remove(current.right, ep); + dcEndpoints.remove(current.left, ep); + } + + dcEndpoints.put(dc, ep); + + if (!dcRacks.containsKey(dc)) + dcRacks.put(dc, HashMultimap.<String, InetAddress>create()); + dcRacks.get(dc).put(rack, ep); + + currentLocations.put(ep, new Pair<String, String>(dc, rack)); + } + + /** + * Removes current DC/rack assignment for ep + */ + protected void removeEndpoint(InetAddress ep) + { + if (!currentLocations.containsKey(ep)) + return; + Pair<String, String> current = currentLocations.remove(ep); + dcEndpoints.remove(current.left, ep); + dcRacks.get(current.left).remove(current.right, ep); + } + + /** + * @return multi-map of DC to endpoints in that DC + */ + public Multimap<String, InetAddress> getDatacenterEndpoints() + { + return dcEndpoints; + } + + /** + * @return map of DC to multi-map of rack to endpoints in that rack + */ + public Map<String, Multimap<String, InetAddress>> getDatacenterRacks() + { + return dcRacks; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index fba8d44..f72ea4b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1362,7 +1362,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // all leaving nodes are gone. for (Range<Token> range : affectedRanges) { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm)); + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm.cloneOnlyTokenMap())); Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); } @@ -1372,17 +1372,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // For each of the bootstrapping nodes, simply add and remove them one by one to // allLeftMetadata and check in between what their ranges would be. - synchronized (bootstrapTokens) + for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet()) { - for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet()) - { - InetAddress endpoint = entry.getValue(); + InetAddress endpoint = entry.getValue(); - allLeftMetadata.updateNormalToken(entry.getKey(), endpoint); - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - pendingRanges.put(range, endpoint); - allLeftMetadata.removeEndpoint(endpoint); - } + allLeftMetadata.updateNormalToken(entry.getKey(), endpoint); + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + pendingRanges.put(range, endpoint); + allLeftMetadata.removeEndpoint(endpoint); } // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes. @@ -1421,7 +1418,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe private Multimap<InetAddress, Range<Token>> getNewSourceRanges(String table, Set<Range<Token>> ranges) { InetAddress myAddress = FBUtilities.getBroadcastAddress(); - Multimap<Range<Token>, InetAddress> rangeAddresses = Table.open(table).getReplicationStrategy().getRangeAddresses(tokenMetadata); + Multimap<Range<Token>, InetAddress> rangeAddresses = Table.open(table).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap()); Multimap<InetAddress, Range<Token>> sourceRanges = HashMultimap.create(); IFailureDetector failureDetector = FailureDetector.instance; @@ -1551,7 +1548,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // Find (for each range) all nodes that store replicas for these ranges as well for (Range<Token> range : ranges) - currentReplicaEndpoints.put(range, Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right, tokenMetadata)); + currentReplicaEndpoints.put(range, Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right, tokenMetadata.cloneOnlyTokenMap())); TokenMetadata temp = tokenMetadata.cloneAfterAllLeft(); @@ -2373,7 +2370,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe Map<String, Multimap<InetAddress, Range<Token>>> rangesToFetch = new HashMap<String, Multimap<InetAddress, Range<Token>>>(); Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable = new HashMap<String, Multimap<Range<Token>, InetAddress>>(); - TokenMetadata tokenMetaClone = tokenMetadata.cloneAfterAllSettled(); + TokenMetadata tokenMetaCloneAllSettled = tokenMetadata.cloneAfterAllSettled(); + // clone to avoid concurrent modification in calculateNaturalEndpoints + TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap(); // for each of the non system tables calculating new ranges // which current node will handle after move to the new token @@ -2389,7 +2388,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // ring ranges and endpoints associated with them // this used to determine what nodes should we ping about range data - Multimap<Range<Token>, InetAddress> rangeAddresses = strategy.getRangeAddresses(tokenMetadata); + Multimap<Range<Token>, InetAddress> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone); // calculated parts of the ranges to request/stream from/to nodes in the ring Pair<Set<Range<Token>>, Set<Range<Token>>> rangesPerTable = calculateStreamAndFetchRanges(currentRanges, updatedRanges); @@ -2418,8 +2417,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe for (Range<Token> toStream : rangesPerTable.left) { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetadata)); - Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone)); + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone)); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaCloneAllSettled)); rangeWithEndpoints.putAll(toStream, Sets.difference(newEndpoints, currentEndpoints)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java index 17146f4..c373fa8 100644 --- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java +++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java @@ -185,7 +185,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Set<InetAddress> expected = new HashSet<InetAddress>(); for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) { - expected.addAll(ars.getRangeAddresses(tmd).get(replicaRange)); + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); } expected.remove(FBUtilities.getBroadcastAddress()); Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(tablename); http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java index 24df493..fc3d8a6 100644 --- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java +++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java @@ -115,7 +115,7 @@ public class LeaveAndBootstrapTest { int replicationFactor = strategy.getReplicationFactor(); - HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, strategy.calculateNaturalEndpoints(token, tmd))); + HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap()))); HashSet<InetAddress> expected = new HashSet<InetAddress>(); for (int i = 0; i < replicationFactor; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index 38dc42a..42f20fc 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -123,7 +123,7 @@ public class MoveTest { int replicationFactor = strategy.getReplicationFactor(); - HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, strategy.calculateNaturalEndpoints(token, tmd))); + HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap()))); HashSet<InetAddress> expected = new HashSet<InetAddress>(); for (int i = 0; i < replicationFactor; i++)
