add Topology to TokenMetadata and clean up thread safety design
patch by Sam Overton; reviewed by jbellis for CASSANDRA-3881


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/893d1da9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/893d1da9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/893d1da9

Branch: refs/heads/trunk
Commit: 893d1da990b4f31462ad241dc0c4b6a91cf3dbee
Parents: 602e383
Author: Jonathan Ellis <[email protected]>
Authored: Tue Jul 3 11:55:54 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Tue Jul 3 11:55:54 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/dht/RangeStreamer.java    |    2 +-
 .../locator/AbstractReplicationStrategy.java       |    2 +-
 .../apache/cassandra/locator/TokenMetadata.java    |  175 ++++++++++++---
 .../apache/cassandra/service/StorageService.java   |   31 ++--
 .../service/AntiEntropyServiceTestAbstract.java    |    2 +-
 .../cassandra/service/LeaveAndBootstrapTest.java   |    2 +-
 .../org/apache/cassandra/service/MoveTest.java     |    2 +-
 7 files changed, 164 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 33777f0..ce82319 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -149,7 +149,7 @@ public class RangeStreamer implements 
IEndpointStateChangeSubscriber, IFailureDe
     private Multimap<Range<Token>, InetAddress> 
getAllRangesWithSourcesFor(String table, Collection<Range<Token>> desiredRanges)
     {
         AbstractReplicationStrategy strat = 
Table.open(table).getReplicationStrategy();
-        Multimap<Range<Token>, InetAddress> rangeAddresses = 
strat.getRangeAddresses(metadata);
+        Multimap<Range<Token>, InetAddress> rangeAddresses = 
strat.getRangeAddresses(metadata.cloneOnlyTokenMap());
 
         Multimap<Range<Token>, InetAddress> rangeSources = 
ArrayListMultimap.create();
         for (Range<Token> desiredRange : desiredRanges)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index b654fe2..7fa431a 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -176,7 +176,7 @@ public abstract class AbstractReplicationStrategy
 
     public Multimap<InetAddress, Range<Token>> getAddressRanges()
     {
-        return getAddressRanges(tokenMetadata);
+        return getAddressRanges(tokenMetadata.cloneOnlyTokenMap());
     }
 
     public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata 
metadata, Token pendingToken, InetAddress pendingAddress)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b78a748..3340b2b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -26,11 +26,13 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.*;
+
 import org.apache.cassandra.utils.Pair;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
@@ -70,7 +72,7 @@ public class TokenMetadata
     // Finally, note that recording the tokens of joining nodes in 
bootstrapTokens also
     // means we can detect and reject the addition of multiple nodes at the 
same token
     // before one becomes part of the ring.
-    private final BiMap<Token, InetAddress> bootstrapTokens = 
Maps.synchronizedBiMap(HashBiMap.<Token, InetAddress>create());
+    private final BiMap<Token, InetAddress> bootstrapTokens = 
HashBiMap.<Token, InetAddress>create();
     // (don't need to record Token here since it's still part of 
tokenToEndpointMap until it's done leaving)
     private final Set<InetAddress> leavingEndpoints = new 
HashSet<InetAddress>();
     // this is a cache of the calculation from {tokenToEndpointMap, 
bootstrapTokens, leavingEndpoints}
@@ -82,21 +84,21 @@ public class TokenMetadata
 
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
-    private ArrayList<Token> sortedTokens;
+    private volatile ArrayList<Token> sortedTokens;
 
+    private final Topology topology;
     /* list of subscribers that are notified when the tokenToEndpointMap 
changed */
     private final CopyOnWriteArrayList<AbstractReplicationStrategy> 
subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
 
     public TokenMetadata()
     {
-        this(null);
+        this(HashBiMap.<Token, InetAddress>create(), new Topology());
     }
 
-    public TokenMetadata(BiMap<Token, InetAddress> tokenToEndpointMap)
+    public TokenMetadata(BiMap<Token, InetAddress> tokenToEndpointMap, 
Topology topology)
     {
-        if (tokenToEndpointMap == null)
-            tokenToEndpointMap = HashBiMap.create();
         this.tokenToEndpointMap = tokenToEndpointMap;
+        this.topology = topology;
         endpointToHostIdMap = HashBiMap.create();
         sortedTokens = sortTokens();
     }
@@ -113,12 +115,17 @@ public class TokenMetadata
     {
         int n = 0;
         Range<Token> sourceRange = getPrimaryRangeFor(getToken(source));
-        synchronized (bootstrapTokens)
+        lock.readLock().lock();
+        try
         {
             for (Token token : bootstrapTokens.keySet())
                 if (sourceRange.contains(token))
                     n++;
         }
+        finally
+        {
+            lock.readLock().unlock();
+        }
         return n;
     }
 
@@ -161,9 +168,13 @@ public class TokenMetadata
                 if (!endpoint.equals(prev))
                 {
                     if (prev != null)
+                    {
                         logger.warn("Token " + token + " changing ownership 
from " + prev + " to " + endpoint);
+                        topology.removeEndpoint(prev);
+                    }
                     shouldSortTokens = true;
                 }
+                topology.addEndpoint(endpoint);
                 leavingEndpoints.remove(endpoint);
                 removeFromMoving(endpoint); // also removing this endpoint 
from moving
             }
@@ -315,6 +326,7 @@ public class TokenMetadata
         {
             bootstrapTokens.inverse().remove(endpoint);
             tokenToEndpointMap.inverse().remove(endpoint);
+            topology.removeEndpoint(endpoint);
             leavingEndpoints.remove(endpoint);
             endpointToHostIdMap.remove(endpoint);
             sortedTokens = sortTokens();
@@ -431,7 +443,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            return new TokenMetadata(HashBiMap.create(tokenToEndpointMap));
+            return new TokenMetadata(HashBiMap.create(tokenToEndpointMap), new 
Topology(topology));
         }
         finally
         {
@@ -512,15 +524,7 @@ public class TokenMetadata
 
     public ArrayList<Token> sortedTokens()
     {
-        lock.readLock().lock();
-        try
-        {
-            return sortedTokens;
-        }
-        finally
-        {
-            lock.readLock().unlock();
-        }
+        return sortedTokens;
     }
 
     private Multimap<Range<Token>, InetAddress> getPendingRangesMM(String 
table)
@@ -576,10 +580,18 @@ public class TokenMetadata
         return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : 
tokens.get(index + 1));
     }
 
-    /** caller should not modify bootstrapTokens */
+    /** @return a copy of the bootstrapping tokens map */
     public Map<Token, InetAddress> getBootstrapTokens()
     {
-        return bootstrapTokens;
+        lock.readLock().lock();
+        try
+        {
+            return ImmutableMap.copyOf(bootstrapTokens);
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
     }
 
     /** caller should not modify leavingEndpoints */
@@ -662,6 +674,7 @@ public class TokenMetadata
     {
         bootstrapTokens.clear();
         tokenToEndpointMap.clear();
+        topology.clear();
         leavingEndpoints.clear();
         pendingRanges.clear();
         endpointToHostIdMap.clear();
@@ -689,17 +702,14 @@ public class TokenMetadata
                 }
             }
 
-            synchronized (bootstrapTokens)
+            if (!bootstrapTokens.isEmpty())
             {
-                if (!bootstrapTokens.isEmpty())
+                sb.append("Bootstrapping Tokens:" );
+                sb.append(System.getProperty("line.separator"));
+                for (Map.Entry<Token, InetAddress> entry : 
bootstrapTokens.entrySet())
                 {
-                    sb.append("Bootstrapping Tokens:" );
+                    sb.append(entry.getValue() + ":" + entry.getKey());
                     sb.append(System.getProperty("line.separator"));
-                    for (Map.Entry<Token, InetAddress> entry : 
bootstrapTokens.entrySet())
-                    {
-                        sb.append(entry.getValue() + ":" + entry.getKey());
-                        sb.append(System.getProperty("line.separator"));
-                    }
                 }
             }
 
@@ -821,10 +831,7 @@ public class TokenMetadata
         {
             Map<Token, InetAddress> map = new HashMap<Token, 
InetAddress>(tokenToEndpointMap.size() + bootstrapTokens.size());
             map.putAll(tokenToEndpointMap);
-            synchronized (bootstrapTokens)
-            {
-                map.putAll(bootstrapTokens);
-            }
+            map.putAll(bootstrapTokens);
             return map;
         }
         finally
@@ -832,4 +839,110 @@ public class TokenMetadata
             lock.readLock().unlock();
         }
     }
+
+    /**
+     * @return the Topology map of nodes to DCs + Racks
+     *
+     * This is only allowed when a copy has been made of TokenMetadata, to 
avoid concurrent modifications
+     * when Topology methods are subsequently used by the caller.
+     */
+    public Topology getTopology()
+    {
+        assert this != StorageService.instance.getTokenMetadata();
+        return topology;
+    }
+
+    /**
+     * Tracks the assignment of racks and endpoints in each datacenter for all 
the "normal" endpoints
+     * in this TokenMetadata. This allows faster calculation of endpoints in 
NetworkTopologyStrategy.
+     */
+    public static class Topology
+    {
+        /** multi-map of DC to endpoints in that DC */
+        private final Multimap<String, InetAddress> dcEndpoints;
+        /** map of DC to multi-map of rack to endpoints in that rack */
+        private final Map<String, Multimap<String, InetAddress>> dcRacks;
+        /** reverse-lookup map for endpoint to current known dc/rack 
assignment */
+        private final Map<InetAddress, Pair<String, String>> currentLocations;
+
+        protected Topology()
+        {
+            dcEndpoints = HashMultimap.create();
+            dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
+            currentLocations = new HashMap<InetAddress, Pair<String, 
String>>();
+        }
+
+        protected void clear()
+        {
+            dcEndpoints.clear();
+            dcRacks.clear();
+            currentLocations.clear();
+        }
+
+        /**
+         * construct deep-copy of other
+         */
+        protected Topology(Topology other)
+        {
+            dcEndpoints = HashMultimap.create(other.dcEndpoints);
+            dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
+            for (String dc : other.dcRacks.keySet())
+                dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc)));
+            currentLocations = new HashMap<InetAddress, Pair<String, 
String>>(other.currentLocations);
+        }
+
+        /**
+         * Stores current DC/rack assignment for ep
+         */
+        protected void addEndpoint(InetAddress ep)
+        {
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            String dc = snitch.getDatacenter(ep);
+            String rack = snitch.getRack(ep);
+            Pair<String, String> current = currentLocations.get(ep);
+            if (current != null)
+            {
+                if (current.left.equals(dc) && current.right.equals(rack))
+                    return;
+                dcRacks.get(current.left).remove(current.right, ep);
+                dcEndpoints.remove(current.left, ep);
+            }
+
+            dcEndpoints.put(dc, ep);
+
+            if (!dcRacks.containsKey(dc))
+                dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
+            dcRacks.get(dc).put(rack, ep);
+
+            currentLocations.put(ep, new Pair<String, String>(dc, rack));
+        }
+
+        /**
+         * Removes current DC/rack assignment for ep
+         */
+        protected void removeEndpoint(InetAddress ep)
+        {
+            if (!currentLocations.containsKey(ep))
+                return;
+            Pair<String, String> current = currentLocations.remove(ep);
+            dcEndpoints.remove(current.left, ep);
+            dcRacks.get(current.left).remove(current.right, ep);
+        }
+
+        /**
+         * @return multi-map of DC to endpoints in that DC
+         */
+        public Multimap<String, InetAddress> getDatacenterEndpoints()
+        {
+            return dcEndpoints;
+        }
+
+        /**
+         * @return map of DC to multi-map of rack to endpoints in that rack
+         */
+        public Map<String, Multimap<String, InetAddress>> getDatacenterRacks()
+        {
+            return dcRacks;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index fba8d44..f72ea4b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1362,7 +1362,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         // all leaving nodes are gone.
         for (Range<Token> range : affectedRanges)
         {
-            Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm));
+            Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
tm.cloneOnlyTokenMap()));
             Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
allLeftMetadata));
             pendingRanges.putAll(range, Sets.difference(newEndpoints, 
currentEndpoints));
         }
@@ -1372,17 +1372,14 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
 
         // For each of the bootstrapping nodes, simply add and remove them one 
by one to
         // allLeftMetadata and check in between what their ranges would be.
-        synchronized (bootstrapTokens)
+        for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
         {
-            for (Map.Entry<Token, InetAddress> entry : 
bootstrapTokens.entrySet())
-            {
-                InetAddress endpoint = entry.getValue();
+            InetAddress endpoint = entry.getValue();
 
-                allLeftMetadata.updateNormalToken(entry.getKey(), endpoint);
-                for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                    pendingRanges.put(range, endpoint);
-                allLeftMetadata.removeEndpoint(endpoint);
-            }
+            allLeftMetadata.updateNormalToken(entry.getKey(), endpoint);
+            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+                pendingRanges.put(range, endpoint);
+            allLeftMetadata.removeEndpoint(endpoint);
         }
 
         // At this stage pendingRanges has been updated according to leaving 
and bootstrapping nodes.
@@ -1421,7 +1418,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
     private Multimap<InetAddress, Range<Token>> getNewSourceRanges(String 
table, Set<Range<Token>> ranges)
     {
         InetAddress myAddress = FBUtilities.getBroadcastAddress();
-        Multimap<Range<Token>, InetAddress> rangeAddresses = 
Table.open(table).getReplicationStrategy().getRangeAddresses(tokenMetadata);
+        Multimap<Range<Token>, InetAddress> rangeAddresses = 
Table.open(table).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap());
         Multimap<InetAddress, Range<Token>> sourceRanges = 
HashMultimap.create();
         IFailureDetector failureDetector = FailureDetector.instance;
 
@@ -1551,7 +1548,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
 
         // Find (for each range) all nodes that store replicas for these 
ranges as well
         for (Range<Token> range : ranges)
-            currentReplicaEndpoints.put(range, 
Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right,
 tokenMetadata));
+            currentReplicaEndpoints.put(range, 
Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right,
 tokenMetadata.cloneOnlyTokenMap()));
 
         TokenMetadata temp = tokenMetadata.cloneAfterAllLeft();
 
@@ -2373,7 +2370,9 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         Map<String, Multimap<InetAddress, Range<Token>>> rangesToFetch = new 
HashMap<String, Multimap<InetAddress, Range<Token>>>();
         Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable 
= new HashMap<String, Multimap<Range<Token>, InetAddress>>();
 
-        TokenMetadata tokenMetaClone = tokenMetadata.cloneAfterAllSettled();
+        TokenMetadata tokenMetaCloneAllSettled = 
tokenMetadata.cloneAfterAllSettled();
+        // clone to avoid concurrent modification in calculateNaturalEndpoints
+        TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap();
 
         // for each of the non system tables calculating new ranges
         // which current node will handle after move to the new token
@@ -2389,7 +2388,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
 
             // ring ranges and endpoints associated with them
             // this used to determine what nodes should we ping about range 
data
-            Multimap<Range<Token>, InetAddress> rangeAddresses = 
strategy.getRangeAddresses(tokenMetadata);
+            Multimap<Range<Token>, InetAddress> rangeAddresses = 
strategy.getRangeAddresses(tokenMetaClone);
 
             // calculated parts of the ranges to request/stream from/to nodes 
in the ring
             Pair<Set<Range<Token>>, Set<Range<Token>>> rangesPerTable = 
calculateStreamAndFetchRanges(currentRanges, updatedRanges);
@@ -2418,8 +2417,8 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
 
             for (Range<Token> toStream : rangesPerTable.left)
             {
-                Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, 
tokenMetadata));
-                Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, 
tokenMetaClone));
+                Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, 
tokenMetaClone));
+                Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, 
tokenMetaCloneAllSettled));
                 rangeWithEndpoints.putAll(toStream, 
Sets.difference(newEndpoints, currentEndpoints));
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java 
b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 17146f4..c373fa8 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -185,7 +185,7 @@ public abstract class AntiEntropyServiceTestAbstract 
extends SchemaLoader
         Set<InetAddress> expected = new HashSet<InetAddress>();
         for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
         {
-            expected.addAll(ars.getRangeAddresses(tmd).get(replicaRange));
+            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
         }
         expected.remove(FBUtilities.getBroadcastAddress());
         Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(tablename);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java 
b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index 24df493..fc3d8a6 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -115,7 +115,7 @@ public class LeaveAndBootstrapTest
             {
                 int replicationFactor = strategy.getReplicationFactor();
 
-                HashSet<InetAddress> actual = new 
HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, 
strategy.calculateNaturalEndpoints(token, tmd)));
+                HashSet<InetAddress> actual = new 
HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, 
strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap())));
                 HashSet<InetAddress> expected = new HashSet<InetAddress>();
 
                 for (int i = 0; i < replicationFactor; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/893d1da9/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java 
b/test/unit/org/apache/cassandra/service/MoveTest.java
index 38dc42a..42f20fc 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -123,7 +123,7 @@ public class MoveTest
             {
                 int replicationFactor = strategy.getReplicationFactor();
 
-                HashSet<InetAddress> actual = new 
HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, 
strategy.calculateNaturalEndpoints(token, tmd)));
+                HashSet<InetAddress> actual = new 
HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, 
strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap())));
                 HashSet<InetAddress> expected = new HashSet<InetAddress>();
 
                 for (int i = 0; i < replicationFactor; i++)

Reply via email to