Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 e7c2952d3 -> 5b05b6826
Fix consolidating racks violating the RF contract patch by Stefania Alborghetti; reviewed by Blake Eggleston for CASSANDRA-10238 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/257cdaa0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/257cdaa0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/257cdaa0 Branch: refs/heads/cassandra-2.2 Commit: 257cdaa08dc12f747a25c03b9b0ad3ffc76ace9b Parents: 0bb32f0 Author: Stefania Alborghetti <[email protected]> Authored: Fri Sep 11 16:31:40 2015 +0800 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Sep 16 11:40:31 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/locator/PropertyFileSnitch.java | 1 + .../apache/cassandra/locator/TokenMetadata.java | 142 +++++++++---- .../locator/YamlFileNetworkTopologySnitch.java | 1 + .../cassandra/service/StorageService.java | 16 ++ .../cassandra/locator/TokenMetadataTest.java | 209 ++++++++++++++++++- 6 files changed, 332 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d4cc15f..3c47427 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.17 + * Fix consolidating racks violating the RF contract (CASSANDRA-10238) * Disallow decommission when node is in drained state (CASSANDRA-8741) * Backport CASSANDRA-8013 to 2.0 (CASSANDRA-10144) * Make getFullyExpiredSSTables less expensive (CASSANDRA-9882) http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java index 4f822c6..745eeb8 100644 --- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java +++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java @@ -69,6 +69,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch protected void runMayThrow() throws ConfigurationException { reloadConfiguration(); + StorageService.instance.updateTopology(); } }; ResourceWatcher.watch(SNITCH_PROPERTIES_FILENAME, runnable, 60 * 1000); http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index a673c94..b1b25e8 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -78,14 +78,14 @@ public class TokenMetadata // Finally, note that recording the tokens of joining nodes in bootstrapTokens also // means we can detect and reject the addition of multiple nodes at the same token // before one becomes part of the ring. - private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<Token, InetAddress>(); + private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>(); // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving) - private final Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>(); + private final Set<InetAddress> leavingEndpoints = new HashSet<>(); // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints} - private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>> pendingRanges = new ConcurrentHashMap<String, Multimap<Range<Token>, InetAddress>>(); + private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>> pendingRanges = new ConcurrentHashMap<>(); // nodes which are migrating to the new tokens in the ring - private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>(); + private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>(); /* Use this lock for manipulating the token map */ private final ReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -121,7 +121,7 @@ public class TokenMetadata private ArrayList<Token> sortTokens() { - return new ArrayList<Token>(tokenToEndpointMap.keySet()); + return new ArrayList<>(tokenToEndpointMap.keySet()); } /** @return the number of nodes bootstrapping into source's primary range */ @@ -165,8 +165,6 @@ public class TokenMetadata * * Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple) * is expensive (CASSANDRA-3831). - * - * @param endpointTokens */ public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens) { @@ -213,9 +211,6 @@ public class TokenMetadata /** * Store an end-point to host ID mapping. Each ID must be unique, and * cannot be changed after the fact. - * - * @param hostId - * @param endpoint */ public void updateHostId(UUID hostId, InetAddress endpoint) { @@ -284,7 +279,7 @@ public class TokenMetadata lock.readLock().lock(); try { - Map<InetAddress, UUID> readMap = new HashMap<InetAddress, UUID>(); + Map<InetAddress, UUID> readMap = new HashMap<>(); readMap.putAll(endpointToHostIdMap); return readMap; } @@ -407,6 +402,43 @@ public class TokenMetadata } /** + * This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238. + */ + public void updateTopology(InetAddress endpoint) + { + assert endpoint != null; + + lock.writeLock().lock(); + try + { + logger.info("Updating topology for {}", endpoint); + topology.updateEndpoint(endpoint); + } + finally + { + lock.writeLock().unlock(); + } + } + + /** + * This is called when the snitch properties for many endpoints are updated, it will update + * the topology mappings of any endpoints whose snitch has changed, see CASSANDRA-10238. + */ + public void updateTopology() + { + lock.writeLock().lock(); + try + { + logger.info("Updating topology for all endpoints that have changed"); + topology.updateEndpoints(); + } + finally + { + lock.writeLock().unlock(); + } + } + + /** * Remove pair of token/address from moving endpoints * @param endpoint address of the moving node */ @@ -442,7 +474,7 @@ public class TokenMetadata lock.readLock().lock(); try { - return new ArrayList<Token>(tokenToEndpointMap.inverse().get(endpoint)); + return new ArrayList<>(tokenToEndpointMap.inverse().get(endpoint)); } finally { @@ -508,7 +540,7 @@ public class TokenMetadata } } - private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>(); + private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<>(); /** * Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges, @@ -519,7 +551,7 @@ public class TokenMetadata lock.readLock().lock(); try { - return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), + return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp), HashBiMap.create(endpointToHostIdMap), new Topology(topology)); } @@ -622,9 +654,9 @@ public class TokenMetadata public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token> tokens) { - Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(tokens.size()); + Collection<Range<Token>> ranges = new ArrayList<>(tokens.size()); for (Token right : tokens) - ranges.add(new Range<Token>(getPredecessor(right), right)); + ranges.add(new Range<>(getPredecessor(right), right)); return ranges; } @@ -660,7 +692,7 @@ public class TokenMetadata public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddress endpoint) { - List<Range<Token>> ranges = new ArrayList<Range<Token>>(); + List<Range<Token>> ranges = new ArrayList<>(); for (Map.Entry<Range<Token>, InetAddress> entry : getPendingRangesMM(keyspaceName).entries()) { if (entry.getValue().equals(endpoint)) @@ -845,7 +877,7 @@ public class TokenMetadata for (InetAddress ep : eps) { sb.append(ep); - sb.append(":"); + sb.append(':'); sb.append(tokenToEndpointMap.inverse().get(ep)); sb.append(System.getProperty("line.separator")); } @@ -857,7 +889,7 @@ public class TokenMetadata sb.append(System.getProperty("line.separator")); for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet()) { - sb.append(entry.getValue()).append(":").append(entry.getKey()); + sb.append(entry.getValue()).append(':').append(entry.getKey()); sb.append(System.getProperty("line.separator")); } } @@ -896,7 +928,7 @@ public class TokenMetadata { for (Map.Entry<Range<Token>, InetAddress> rmap : entry.getValue().entries()) { - sb.append(rmap.getValue()).append(":").append(rmap.getKey()); + sb.append(rmap.getValue()).append(':').append(rmap.getKey()); sb.append(System.getProperty("line.separator")); } } @@ -910,7 +942,7 @@ public class TokenMetadata if (ranges.isEmpty()) return Collections.emptyList(); - Set<InetAddress> endpoints = new HashSet<InetAddress>(); + Set<InetAddress> endpoints = new HashSet<>(); for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : ranges.entrySet()) { if (entry.getKey().contains(token)) @@ -954,7 +986,7 @@ public class TokenMetadata lock.readLock().lock(); try { - Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size() + bootstrapTokens.size()); + Map<Token, InetAddress> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size()); map.putAll(tokenToEndpointMap); map.putAll(bootstrapTokens); return map; @@ -1001,14 +1033,14 @@ public class TokenMetadata /** reverse-lookup map for endpoint to current known dc/rack assignment */ private final Map<InetAddress, Pair<String, String>> currentLocations; - protected Topology() + Topology() { dcEndpoints = HashMultimap.create(); - dcRacks = new HashMap<String, Multimap<String, InetAddress>>(); - currentLocations = new HashMap<InetAddress, Pair<String, String>>(); + dcRacks = new HashMap<>(); + currentLocations = new HashMap<>(); } - protected void clear() + void clear() { dcEndpoints.clear(); dcRacks.clear(); @@ -1018,19 +1050,19 @@ public class TokenMetadata /** * construct deep-copy of other */ - protected Topology(Topology other) + Topology(Topology other) { dcEndpoints = HashMultimap.create(other.dcEndpoints); - dcRacks = new HashMap<String, Multimap<String, InetAddress>>(); + dcRacks = new HashMap<>(); for (String dc : other.dcRacks.keySet()) dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc))); - currentLocations = new HashMap<InetAddress, Pair<String, String>>(other.currentLocations); + currentLocations = new HashMap<>(other.currentLocations); } /** * Stores current DC/rack assignment for ep */ - protected void addEndpoint(InetAddress ep) + void addEndpoint(InetAddress ep) { IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); String dc = snitch.getDatacenter(ep); @@ -1040,10 +1072,14 @@ public class TokenMetadata { if (current.left.equals(dc) && current.right.equals(rack)) return; - dcRacks.get(current.left).remove(current.right, ep); - dcEndpoints.remove(current.left, ep); + doRemoveEndpoint(ep, current); } + doAddEndpoint(ep, dc, rack); + } + + private void doAddEndpoint(InetAddress ep, String dc, String rack) + { dcEndpoints.put(dc, ep); if (!dcRacks.containsKey(dc)) @@ -1056,13 +1092,49 @@ public class TokenMetadata /** * Removes current DC/rack assignment for ep */ - protected void removeEndpoint(InetAddress ep) + void removeEndpoint(InetAddress ep) { if (!currentLocations.containsKey(ep)) return; - Pair<String, String> current = currentLocations.remove(ep); - dcEndpoints.remove(current.left, ep); + + doRemoveEndpoint(ep, currentLocations.remove(ep)); + } + + private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current) + { dcRacks.get(current.left).remove(current.right, ep); + dcEndpoints.remove(current.left, ep); + } + + void updateEndpoint(InetAddress ep) + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + if (snitch == null || !currentLocations.containsKey(ep)) + return; + + updateEndpoint(ep, snitch); + } + + void updateEndpoints() + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + if (snitch == null) + return; + + for (InetAddress ep : currentLocations.keySet()) + updateEndpoint(ep, snitch); + } + + private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch) + { + Pair<String, String> current = currentLocations.get(ep); + String dc = snitch.getDatacenter(ep); + String rack = snitch.getRack(ep); + if (dc.equals(current.left) && rack.equals(current.right)) + return; + + doRemoveEndpoint(ep, current); + doAddEndpoint(ep, dc, rack); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java index 3237979..e6691c4 100644 --- a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java +++ b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java @@ -120,6 +120,7 @@ public class YamlFileNetworkTopologySnitch protected void runMayThrow() throws ConfigurationException { loadTopologyConfiguration(); + StorageService.instance.updateTopology(); } }; ResourceWatcher.watch(topologyConfigFilename, runnable, http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5ac4980..c5f159e 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1378,9 +1378,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value)); break; case DC: + updateTopology(endpoint); SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value)); break; case RACK: + updateTopology(endpoint); SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value)); break; case RPC_ADDRESS: @@ -1398,6 +1400,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } + public void updateTopology(InetAddress endpoint) + { + if (getTokenMetadata().isMember(endpoint)) + { + getTokenMetadata().updateTopology(endpoint); + } + } + + public void updateTopology() + { + getTokenMetadata().updateTopology(); + + } + private void updatePeerInfo(InetAddress endpoint) { EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java index 95118dc..fc8095d 100644 --- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java +++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java @@ -19,19 +19,27 @@ package org.apache.cassandra.locator; import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Map; + import com.google.common.collect.Iterators; +import com.google.common.collect.Multimap; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; + +import static junit.framework.Assert.assertNotNull; import static org.junit.Assert.assertEquals; import static org.apache.cassandra.Util.token; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; @RunWith(OrderedJUnit4ClassRunner.class) @@ -50,9 +58,9 @@ public class TokenMetadataTest tmd.updateNormalToken(token(SIX), InetAddress.getByName("127.0.0.6")); } - private void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected) + private static void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected) { - ArrayList<Token> actual = new ArrayList<Token>(); + ArrayList<Token> actual = new ArrayList<>(); Iterators.addAll(actual, TokenMetadata.ringIterator(ring, token(start), includeMin)); assertEquals(actual.toString(), expected.length, actual.size()); for (int i = 0; i < expected.length; i++) @@ -84,4 +92,199 @@ public class TokenMetadataTest { testRingIterator(new ArrayList<Token>(), "2", false); } + + @Test + public void testTopologyUpdate_RackConsolidation() throws UnknownHostException + { + final InetAddress first = InetAddress.getByName("127.0.0.1"); + final InetAddress second = InetAddress.getByName("127.0.0.6"); + final String DATA_CENTER = "datacenter1"; + final String RACK1 = "rack1"; + final String RACK2 = "rack2"; + + DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() + { + @Override + public String getRack(InetAddress endpoint) + { + return endpoint.equals(first) ? RACK1 : RACK2; + } + + @Override + public String getDatacenter(InetAddress endpoint) + { + return DATA_CENTER; + } + + @Override + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + { + return 0; + } + }); + + tmd.updateNormalToken(token(ONE), first); + tmd.updateNormalToken(token(SIX), second); + + TokenMetadata tokenMetadata = tmd.cloneOnlyTokenMap(); + assertNotNull(tokenMetadata); + + TokenMetadata.Topology topology = tokenMetadata.getTopology(); + assertNotNull(topology); + + Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints(); + assertNotNull(allEndpoints); + assertTrue(allEndpoints.size() == 2); + assertTrue(allEndpoints.containsKey(DATA_CENTER)); + assertTrue(allEndpoints.get(DATA_CENTER).contains(first)); + assertTrue(allEndpoints.get(DATA_CENTER).contains(second)); + + Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + assertNotNull(racks); + assertTrue(racks.size() == 1); + assertTrue(racks.containsKey(DATA_CENTER)); + assertTrue(racks.get(DATA_CENTER).size() == 2); + assertTrue(racks.get(DATA_CENTER).containsKey(RACK1)); + assertTrue(racks.get(DATA_CENTER).containsKey(RACK2)); + assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first)); + assertTrue(racks.get(DATA_CENTER).get(RACK2).contains(second)); + + DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() + { + @Override + public String getRack(InetAddress endpoint) + { + return RACK1; + } + + @Override + public String getDatacenter(InetAddress endpoint) + { + return DATA_CENTER; + } + + @Override + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + { + return 0; + } + }); + + tokenMetadata.updateTopology(first); + tokenMetadata.updateTopology(second); + + allEndpoints = topology.getDatacenterEndpoints(); + assertNotNull(allEndpoints); + assertTrue(allEndpoints.size() == 2); + assertTrue(allEndpoints.containsKey(DATA_CENTER)); + assertTrue(allEndpoints.get(DATA_CENTER).contains(first)); + assertTrue(allEndpoints.get(DATA_CENTER).contains(second)); + + racks = topology.getDatacenterRacks(); + assertNotNull(racks); + assertTrue(racks.size() == 1); + assertTrue(racks.containsKey(DATA_CENTER)); + assertTrue(racks.get(DATA_CENTER).size() == 2); + assertTrue(racks.get(DATA_CENTER).containsKey(RACK1)); + assertFalse(racks.get(DATA_CENTER).containsKey(RACK2)); + assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first)); + assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(second)); + } + + @Test + public void testTopologyUpdate_RackExpansion() throws UnknownHostException + { + final InetAddress first = InetAddress.getByName("127.0.0.1"); + final InetAddress second = InetAddress.getByName("127.0.0.6"); + final String DATA_CENTER = "datacenter1"; + final String RACK1 = "rack1"; + final String RACK2 = "rack2"; + + DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() + { + @Override + public String getRack(InetAddress endpoint) + { + return RACK1; + } + + @Override + public String getDatacenter(InetAddress endpoint) + { + return DATA_CENTER; + } + + @Override + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + { + return 0; + } + }); + + tmd.updateNormalToken(token(ONE), first); + tmd.updateNormalToken(token(SIX), second); + + TokenMetadata tokenMetadata = tmd.cloneOnlyTokenMap(); + assertNotNull(tokenMetadata); + + TokenMetadata.Topology topology = tokenMetadata.getTopology(); + assertNotNull(topology); + + Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints(); + assertNotNull(allEndpoints); + assertTrue(allEndpoints.size() == 2); + assertTrue(allEndpoints.containsKey(DATA_CENTER)); + assertTrue(allEndpoints.get(DATA_CENTER).contains(first)); + assertTrue(allEndpoints.get(DATA_CENTER).contains(second)); + + Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + assertNotNull(racks); + assertTrue(racks.size() == 1); + assertTrue(racks.containsKey(DATA_CENTER)); + assertTrue(racks.get(DATA_CENTER).size() == 2); + assertTrue(racks.get(DATA_CENTER).containsKey(RACK1)); + assertFalse(racks.get(DATA_CENTER).containsKey(RACK2)); + assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first)); + assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(second)); + + DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() + { + @Override + public String getRack(InetAddress endpoint) + { + return endpoint.equals(first) ? RACK1 : RACK2; + } + + @Override + public String getDatacenter(InetAddress endpoint) + { + return DATA_CENTER; + } + + @Override + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + { + return 0; + } + }); + + tokenMetadata.updateTopology(); + + allEndpoints = topology.getDatacenterEndpoints(); + assertNotNull(allEndpoints); + assertTrue(allEndpoints.size() == 2); + assertTrue(allEndpoints.containsKey(DATA_CENTER)); + assertTrue(allEndpoints.get(DATA_CENTER).contains(first)); + assertTrue(allEndpoints.get(DATA_CENTER).contains(second)); + + racks = topology.getDatacenterRacks(); + assertNotNull(racks); + assertTrue(racks.size() == 1); + assertTrue(racks.containsKey(DATA_CENTER)); + assertTrue(racks.get(DATA_CENTER).size() == 2); + assertTrue(racks.get(DATA_CENTER).containsKey(RACK1)); + assertTrue(racks.get(DATA_CENTER).containsKey(RACK2)); + assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first)); + assertTrue(racks.get(DATA_CENTER).get(RACK2).contains(second)); + } }
