Updated Branches: refs/heads/trunk c22dd0821 -> e85afdc5b
update TokenMetadata in support of many tokens per node Patch by Sam Overton; reviewed by jbellis for CASSANDRA-4121 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e85afdc5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e85afdc5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e85afdc5 Branch: refs/heads/trunk Commit: e85afdc5b6a691591834bd32f766087560c60a39 Parents: c22dd08 Author: Eric Evans <[email protected]> Authored: Mon Jul 9 16:07:34 2012 -0600 Committer: Eric Evans <[email protected]> Committed: Mon Jul 9 16:07:34 2012 -0600 ---------------------------------------------------------------------- .../apache/cassandra/locator/SimpleStrategy.java | 4 +- .../apache/cassandra/locator/TokenMetadata.java | 157 +++++++++------ .../apache/cassandra/service/StorageService.java | 111 ++++++----- .../locator/NetworkTopologyStrategyTest.java | 7 +- 4 files changed, 160 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/src/java/org/apache/cassandra/locator/SimpleStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java index 11c2aa8..50d470f 100644 --- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java +++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java @@ -54,7 +54,9 @@ public class SimpleStrategy extends AbstractReplicationStrategy Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false); while (endpoints.size() < replicas && iter.hasNext()) { - endpoints.add(metadata.getEndpoint(iter.next())); + InetAddress ep = metadata.getEndpoint(iter.next()); + if (!endpoints.contains(ep)) + endpoints.add(ep); } return endpoints; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 3340b2b..8fb63d5 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -18,6 +18,7 @@ package org.apache.cassandra.locator; import java.net.InetAddress; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -27,7 +28,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.collect.*; +import org.apache.cassandra.utils.BiMultiValMap; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.SortedBiMultiValMap; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +46,7 @@ public class TokenMetadata private static final Logger logger = LoggerFactory.getLogger(TokenMetadata.class); /* Maintains token to endpoint map of every node in the cluster. */ - private final BiMap<Token, InetAddress> tokenToEndpointMap; + private final BiMultiValMap<Token, InetAddress> tokenToEndpointMap; /* Maintains endpoint to host ID map of every node in the cluster */ private final BiMap<InetAddress, UUID> endpointToHostIdMap; @@ -72,7 +75,7 @@ public class TokenMetadata // Finally, note that recording the tokens of joining nodes in bootstrapTokens also // means we can detect and reject the addition of multiple nodes at the same token // before one becomes part of the ring. - private final BiMap<Token, InetAddress> bootstrapTokens = HashBiMap.<Token, InetAddress>create(); + private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<Token, InetAddress>(); // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving) private final Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>(); // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints} @@ -90,12 +93,21 @@ public class TokenMetadata /* list of subscribers that are notified when the tokenToEndpointMap changed */ private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>(); + private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>() + { + @Override + public int compare(InetAddress o1, InetAddress o2) + { + return ByteBuffer.wrap(o1.getAddress()).compareTo(ByteBuffer.wrap(o2.getAddress())); + } + }; + public TokenMetadata() { - this(HashBiMap.<Token, InetAddress>create(), new Topology()); + this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), new Topology()); } - public TokenMetadata(BiMap<Token, InetAddress> tokenToEndpointMap, Topology topology) + public TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, Topology topology) { this.tokenToEndpointMap = tokenToEndpointMap; this.topology = topology; @@ -105,22 +117,21 @@ public class TokenMetadata private ArrayList<Token> sortTokens() { - ArrayList<Token> tokens = new ArrayList<Token>(tokenToEndpointMap.keySet()); - Collections.sort(tokens); - return tokens; + return new ArrayList<Token>(tokenToEndpointMap.keySet()); } /** @return the number of nodes bootstrapping into source's primary range */ public int pendingRangeChanges(InetAddress source) { int n = 0; - Range<Token> sourceRange = getPrimaryRangeFor(getToken(source)); + Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source)); lock.readLock().lock(); try { for (Token token : bootstrapTokens.keySet()) - if (sourceRange.contains(token)) - n++; + for (Range<Token> range : sourceRanges) + if (range.contains(token)) + n++; } finally { @@ -134,7 +145,15 @@ public class TokenMetadata */ public void updateNormalToken(Token token, InetAddress endpoint) { - updateNormalTokens(Collections.singleton(Pair.create(token, endpoint))); + updateNormalTokens(Collections.singleton(token), endpoint); + } + + public void updateNormalTokens(Collection<Token> tokens, InetAddress endpoint) + { + Multimap<InetAddress, Token> endpointTokens = HashMultimap.create(); + for (Token token : tokens) + endpointTokens.put(endpoint, token); + updateNormalTokens(endpointTokens); } /** @@ -143,40 +162,39 @@ public class TokenMetadata * Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple) * is expensive (CASSANDRA-3831). * - * @param tokenPairs + * @param endpointTokens */ - public void updateNormalTokens(Set<Pair<Token, InetAddress>> tokenPairs) + public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens) { - if (tokenPairs.isEmpty()) + if (endpointTokens.isEmpty()) return; lock.writeLock().lock(); try { boolean shouldSortTokens = false; - for (Pair<Token, InetAddress> tokenEndpointPair : tokenPairs) + for (InetAddress endpoint : endpointTokens.keySet()) { - Token token = tokenEndpointPair.left; - InetAddress endpoint = tokenEndpointPair.right; + Collection<Token> tokens = endpointTokens.get(endpoint); - assert token != null; - assert endpoint != null; + assert tokens != null && !tokens.isEmpty(); - bootstrapTokens.inverse().remove(endpoint); - tokenToEndpointMap.inverse().remove(endpoint); - InetAddress prev = tokenToEndpointMap.put(token, endpoint); - if (!endpoint.equals(prev)) + bootstrapTokens.removeValue(endpoint); + tokenToEndpointMap.removeValue(endpoint); + topology.addEndpoint(endpoint); + leavingEndpoints.remove(endpoint); + removeFromMoving(endpoint); // also removing this endpoint from moving + + for (Token token : tokens) { - if (prev != null) + InetAddress prev = tokenToEndpointMap.put(token, endpoint); + if (!endpoint.equals(prev)) { - logger.warn("Token " + token + " changing ownership from " + prev + " to " + endpoint); - topology.removeEndpoint(prev); + if (prev != null) + logger.warn("Token " + token + " changing ownership from " + prev + " to " + endpoint); + shouldSortTokens = true; } - shouldSortTokens = true; } - topology.addEndpoint(endpoint); - leavingEndpoints.remove(endpoint); - removeFromMoving(endpoint); // also removing this endpoint from moving } if (shouldSortTokens) @@ -239,26 +257,38 @@ public class TokenMetadata return readMap; } + @Deprecated public void addBootstrapToken(Token token, InetAddress endpoint) { - assert token != null; + addBootstrapTokens(Collections.singleton(token), endpoint); + } + + public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint) + { + assert tokens != null && !tokens.isEmpty(); assert endpoint != null; lock.writeLock().lock(); try { + InetAddress oldEndpoint; - oldEndpoint = bootstrapTokens.get(token); - if (oldEndpoint != null && !oldEndpoint.equals(endpoint)) - throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); + for (Token token : tokens) + { + oldEndpoint = bootstrapTokens.get(token); + if (oldEndpoint != null && !oldEndpoint.equals(endpoint)) + throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); + + oldEndpoint = tokenToEndpointMap.get(token); + if (oldEndpoint != null && !oldEndpoint.equals(endpoint)) + throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); + } - oldEndpoint = tokenToEndpointMap.get(token); - if (oldEndpoint != null && !oldEndpoint.equals(endpoint)) - throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); + bootstrapTokens.removeValue(endpoint); - bootstrapTokens.inverse().remove(endpoint); - bootstrapTokens.put(token, endpoint); + for (Token token : tokens) + bootstrapTokens.put(token, endpoint); } finally { @@ -324,8 +354,8 @@ public class TokenMetadata lock.writeLock().lock(); try { - bootstrapTokens.inverse().remove(endpoint); - tokenToEndpointMap.inverse().remove(endpoint); + bootstrapTokens.removeValue(endpoint); + tokenToEndpointMap.removeValue(endpoint); topology.removeEndpoint(endpoint); leavingEndpoints.remove(endpoint); endpointToHostIdMap.remove(endpoint); @@ -366,7 +396,7 @@ public class TokenMetadata } } - public Token getToken(InetAddress endpoint) + public Collection<Token> getTokens(InetAddress endpoint) { assert endpoint != null; assert isMember(endpoint); // don't want to return nulls @@ -374,7 +404,7 @@ public class TokenMetadata lock.readLock().lock(); try { - return tokenToEndpointMap.inverse().get(endpoint); + return new ArrayList<Token>(tokenToEndpointMap.inverse().get(endpoint)); } finally { @@ -382,6 +412,12 @@ public class TokenMetadata } } + @Deprecated + public Token getToken(InetAddress endpoint) + { + return getTokens(endpoint).iterator().next(); + } + public boolean isMember(InetAddress endpoint) { assert endpoint != null; @@ -443,7 +479,7 @@ public class TokenMetadata lock.readLock().lock(); try { - return new TokenMetadata(HashBiMap.create(tokenToEndpointMap), new Topology(topology)); + return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), new Topology(topology)); } finally { @@ -517,9 +553,18 @@ public class TokenMetadata } } + public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token> tokens) + { + Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(tokens.size()); + for (Token right : tokens) + ranges.add(new Range<Token>(getPredecessor(right), right)); + return ranges; + } + + @Deprecated public Range<Token> getPrimaryRangeFor(Token right) { - return new Range<Token>(getPredecessor(right), right); + return getPrimaryRangesFor(Arrays.asList(right)).iterator().next(); } public ArrayList<Token> sortedTokens() @@ -581,12 +626,12 @@ public class TokenMetadata } /** @return a copy of the bootstrapping tokens map */ - public Map<Token, InetAddress> getBootstrapTokens() + public BiMultiValMap<Token, InetAddress> getBootstrapTokens() { lock.readLock().lock(); try { - return ImmutableMap.copyOf(bootstrapTokens); + return new BiMultiValMap<Token, InetAddress>(bootstrapTokens); } finally { @@ -803,24 +848,6 @@ public class TokenMetadata } /** - * @return a token to endpoint map to consider for read operations on the cluster. - */ - public Map<Token, InetAddress> getTokenToEndpointMapForReading() - { - lock.readLock().lock(); - try - { - Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size()); - map.putAll(tokenToEndpointMap); - return map; - } - finally - { - lock.readLock().unlock(); - } - } - - /** * @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes * in the cluster. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ce14352..583b3a7 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.google.common.base.Function; import com.google.common.collect.*; import org.apache.log4j.Level; import org.apache.commons.lang.StringUtils; @@ -56,6 +55,7 @@ import org.apache.cassandra.net.IAsyncResult; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.ResponseVerbHandler; +import org.apache.cassandra.service.AntiEntropyService.RepairFuture; import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler; import org.apache.cassandra.streaming.*; import org.apache.cassandra.thrift.*; @@ -120,6 +120,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe return getRangesForEndpoint(table, FBUtilities.getBroadcastAddress()); } + public Collection<Range<Token>> getLocalPrimaryRanges() + { + return getPrimaryRangesForEndpoint(FBUtilities.getBroadcastAddress()); + } + + @Deprecated public Range<Token> getLocalPrimaryRange() { return getPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddress()); @@ -1168,7 +1174,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe logger.info("Node " + endpoint + " state jump to leaving"); tokenMetadata.updateNormalToken(token, endpoint); } - else if (!tokenMetadata.getToken(endpoint).equals(token)) + else if (!tokenMetadata.getTokens(endpoint).contains(token)) { logger.warn("Node " + endpoint + " 'leaving' token mismatch. Long network partition?"); tokenMetadata.updateNormalToken(token, endpoint); @@ -1338,7 +1344,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe { TokenMetadata tm = StorageService.instance.getTokenMetadata(); Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create(); - Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens(); + BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens(); Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints(); if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty()) @@ -1373,11 +1379,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // For each of the bootstrapping nodes, simply add and remove them one by one to // allLeftMetadata and check in between what their ranges would be. - for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet()) + for (InetAddress endpoint : bootstrapTokens.inverse().keySet()) { - InetAddress endpoint = entry.getValue(); - - allLeftMetadata.updateNormalToken(entry.getKey(), endpoint); + Collection<Token> tokens = bootstrapTokens.inverse().get(endpoint); + + allLeftMetadata.updateNormalTokens(tokens, endpoint); for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) pendingRanges.put(range, endpoint); allLeftMetadata.removeEndpoint(endpoint); @@ -1977,18 +1983,17 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe if (Table.SYSTEM_TABLE.equals(tableName)) return; - AntiEntropyService.RepairFuture future = forceTableRepair(getLocalPrimaryRange(), tableName, isSequential, columnFamilies); - if (future == null) - return; - try - { - future.get(); - } - catch (Exception e) + List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(); + for (Range<Token> range : getLocalPrimaryRanges()) { - logger.error("Repair session " + future.session.getName() + " failed.", e); - throw new IOException("Some repair session(s) failed (see log for details)."); + RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies); + if (future != null) + futures.add(future); } + if (futures.isEmpty()) + return; + for (AntiEntropyService.RepairFuture future : futures) + FBUtilities.waitOnFuture(future); } public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException @@ -2041,9 +2046,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe * This method returns the predecessor of the endpoint ep on the identifier * space. */ - InetAddress getPredecessor(InetAddress ep) + InetAddress getPredecessor(Token token) { - Token token = tokenMetadata.getToken(ep); return tokenMetadata.getEndpoint(tokenMetadata.getPredecessor(token)); } @@ -2051,17 +2055,27 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe * This method returns the successor of the endpoint ep on the identifier * space. */ - public InetAddress getSuccessor(InetAddress ep) + public InetAddress getSuccessor(Token token) { - Token token = tokenMetadata.getToken(ep); return tokenMetadata.getEndpoint(tokenMetadata.getSuccessor(token)); } /** + * Get the primary ranges for the specified endpoint. + * @param ep endpoint we are interested in. + * @return collection of ranges for the specified endpoint. + */ + public Collection<Range<Token>> getPrimaryRangesForEndpoint(InetAddress ep) + { + return tokenMetadata.getPrimaryRangesFor(tokenMetadata.getTokens(ep)); + } + + /** * Get the primary range for the specified endpoint. * @param ep endpoint we are interested in. * @return range for the specified endpoint. */ + @Deprecated public Range<Token> getPrimaryRangeForEndpoint(InetAddress ep) { return tokenMetadata.getPrimaryRangeFor(tokenMetadata.getToken(ep)); @@ -2741,14 +2755,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe public Map<InetAddress, Float> getOwnership() { - Map<Token, InetAddress> tokensToEndpoints = tokenMetadata.getTokenToEndpointMapForReading(); - List<Token> sortedTokens = new ArrayList<Token>(tokensToEndpoints.keySet()); - Collections.sort(sortedTokens); + List<Token> sortedTokens = tokenMetadata.sortedTokens(); // describeOwnership returns tokens in an unspecified order, let's re-order them Map<Token, Float> tokenMap = new TreeMap<Token, Float>(getPartitioner().describeOwnership(sortedTokens)); Map<InetAddress, Float> stringMap = new LinkedHashMap<InetAddress, Float>(); for (Map.Entry<Token, Float> entry : tokenMap.entrySet()) - stringMap.put(tokensToEndpoints.get(entry.getKey()), entry.getValue()); + stringMap.put(tokenMetadata.getEndpoint(entry.getKey()), entry.getValue()); return stringMap; } @@ -2773,22 +2785,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe if (keyspace == null) keyspace = Schema.instance.getNonSystemTables().get(0); - final BiMap<InetAddress, Token> endpointsToTokens = ImmutableBiMap.copyOf(metadata.getTokenToEndpointMapForReading()).inverse(); - Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<Collection<InetAddress>>(); - if (isDcAwareReplicationStrategy(keyspace)) - { - // mapping of dc's to nodes, use sorted map so that we get dcs sorted - SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<String, Collection<InetAddress>>(); - sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap()); - for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values()) - endpointsGroupedByDc.add(endpoints); - } - else - { - endpointsGroupedByDc.add(endpointsToTokens.keySet()); - } + // mapping of dc's to nodes, use sorted map so that we get dcs sorted + SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<String, Collection<InetAddress>>(); + sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap()); + for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values()) + endpointsGroupedByDc.add(endpoints); + Map<Token, Float> tokenOwnership = getPartitioner().describeOwnership(tokenMetadata.sortedTokens()); LinkedHashMap<InetAddress, Float> finalOwnership = Maps.newLinkedHashMap(); // calculate ownership per dc @@ -2802,19 +2806,22 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe { public int compare(InetAddress o1, InetAddress o2) { - return endpointsToTokens.get(o1).compareTo(endpointsToTokens.get(o2)); - } - }); + byte[] b1 = o1.getAddress(); + byte[] b2 = o2.getAddress(); - // calculate the ownership without replication - Function<InetAddress, Token> f = new Function<InetAddress, Token>() - { - public Token apply(InetAddress arg0) - { - return endpointsToTokens.get(arg0); + if(b1.length < b2.length) return -1; + if(b1.length > b2.length) return 1; + + for(int i = 0; i < b1.length; i++) + { + int left = (int)b1[i] & 0xFF; + int right = (int)b2[i] & 0xFF; + if (left < right) return -1; + else if (left > right) return 1; + } + return 0; } - }; - Map<Token, Float> tokenOwnership = getPartitioner().describeOwnership(Lists.transform(sortedEndpoints, f)); + }); // calculate the ownership with replication and add the endpoint to the final ownership map for (InetAddress endpoint : endpoints) @@ -3144,7 +3151,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe */ public List<String> getRangeKeySample() { - List<DecoratedKey> keys = keySamples(ColumnFamilyStore.allUserDefined(), getLocalPrimaryRange()); + List<DecoratedKey> keys = new ArrayList<DecoratedKey>(); + for (Range<Token> range : getLocalPrimaryRanges()) + keys.addAll(keySamples(ColumnFamilyStore.allUserDefined(), range)); List<String> sampledKeys = new ArrayList<String>(keys.size()); for (DecoratedKey key : keys) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java index 9e3d684..bd03766 100644 --- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java @@ -41,6 +41,9 @@ import org.apache.cassandra.dht.StringToken; import org.apache.cassandra.dht.Token; import org.apache.cassandra.utils.Pair; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + public class NetworkTopologyStrategyTest { private String table = "Keyspace1"; @@ -105,7 +108,7 @@ public class NetworkTopologyStrategyTest DatabaseDescriptor.setEndpointSnitch(snitch); TokenMetadata metadata = new TokenMetadata(); Map<String, String> configOptions = new HashMap<String, String>(); - Set<Pair<Token, InetAddress>> tokens = new HashSet<Pair<Token, InetAddress>>(); + Multimap<InetAddress, Token> tokens = HashMultimap.create(); int totalRF = 0; for (int dc = 0; dc < dcRacks.length; ++dc) @@ -120,7 +123,7 @@ public class NetworkTopologyStrategyTest InetAddress address = InetAddress.getByAddress(ipBytes); StringToken token = new StringToken(String.format("%02x%02x%02x", ep, rack, dc)); logger.debug("adding node " + address + " at " + token); - tokens.add(new Pair<Token, InetAddress>(token, address)); + tokens.put(address, token); } } }
