Fix timeout in ReplicationAwareTokenAllocatorTest

patch by Stefania Alborghetti; reviewed by Branimir Lambov for CASSANDRA-12784


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6ec31ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6ec31ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6ec31ba

Branch: refs/heads/trunk
Commit: c6ec31bada33b9b803d09a863414ea9cad72752e
Parents: 7a5118c
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Thu Oct 13 16:32:58 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Tue Oct 18 09:08:56 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/dht/RandomPartitioner.java |   2 +-
 .../apache/cassandra/utils/GuidGenerator.java   |   5 +-
 ...tractReplicationAwareTokenAllocatorTest.java | 716 ++++++++++++++++++
 ...rmur3ReplicationAwareTokenAllocatorTest.java |  48 ++
 ...andomReplicationAwareTokenAllocatorTest.java |  54 ++
 .../ReplicationAwareTokenAllocatorTest.java     | 755 -------------------
 test/unit/org/apache/cassandra/Util.java        |  20 +-
 8 files changed, 835 insertions(+), 766 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d230462..32a2dfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
  * Improve sum aggregate functions (CASSANDRA-12417)
  * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes 
in CASSANDRA-10876 (CASSANDRA-12761)
  * cqlsh fails to format collections when using aliases (CASSANDRA-11534)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java 
b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index 7c8f6ac..ee3b862 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -93,7 +93,7 @@ public class RandomPartitioner implements IPartitioner
 
     public BigIntegerToken getRandomToken(Random random)
     {
-        BigInteger token = 
FBUtilities.hashToBigInteger(GuidGenerator.guidAsBytes(random));
+        BigInteger token = 
FBUtilities.hashToBigInteger(GuidGenerator.guidAsBytes(random, 0));
         if ( token.signum() == -1 )
             token = token.multiply(BigInteger.valueOf(-1L));
         return new BigIntegerToken(token);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/src/java/org/apache/cassandra/utils/GuidGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/GuidGenerator.java 
b/src/java/org/apache/cassandra/utils/GuidGenerator.java
index 0843344..c5ed7a7 100644
--- a/src/java/org/apache/cassandra/utils/GuidGenerator.java
+++ b/src/java/org/apache/cassandra/utils/GuidGenerator.java
@@ -76,10 +76,9 @@ public class GuidGenerator
         return convertToStandardFormat( sb.toString() );
     }
 
-    public static ByteBuffer guidAsBytes(Random random)
+    public static ByteBuffer guidAsBytes(Random random, long time)
     {
         StringBuilder sbValueBeforeMD5 = new StringBuilder();
-        long time = System.currentTimeMillis();
         long rand = 0;
         rand = random.nextLong();
         sbValueBeforeMD5.append(s_id)
@@ -94,7 +93,7 @@ public class GuidGenerator
 
     public static ByteBuffer guidAsBytes()
     {
-        return guidAsBytes(myRand);
+        return guidAsBytes(myRand, System.currentTimeMillis());
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
 
b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
new file mode 100644
index 0000000..80e980a
--- /dev/null
+++ 
b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht.tokenallocator;
+
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Base class for {@link Murmur3ReplicationAwareTokenAllocatorTest} and {@link 
RandomReplicationAwareTokenAllocatorTest},
+ * we need to separate classes to avoid timeous in case flaky tests need to be 
repeated, see CASSANDRA-12784.
+ */
+@Ignore
+abstract class AbstractReplicationAwareTokenAllocatorTest
+{
+    private static final int TARGET_CLUSTER_SIZE = 250;
+
+    interface TestReplicationStrategy extends ReplicationStrategy<Unit>
+    {
+        void addUnit(Unit n);
+
+        void removeUnit(Unit n);
+
+        /**
+         * Returns a list of all replica units for given token.
+         */
+        List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens);
+
+        /**
+         * Returns the start of the token span that is replicated in this 
token.
+         * Note: Though this is not trivial to see, the replicated span is 
always contiguous. A token in the same
+         * group acts as a barrier; if one is not found the token replicates 
everything up to the replica'th distinct
+         * group seen in front of it.
+         */
+        Token replicationStart(Token token, Unit unit, NavigableMap<Token, 
Unit> sortedTokens);
+
+        /**
+         * Multiplier for the acceptable disbalance in the cluster. With some 
strategies it is harder to achieve good
+         * results.
+         */
+        public double spreadExpectation();
+    }
+
+    static class NoReplicationStrategy implements TestReplicationStrategy
+    {
+        public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens)
+        {
+            return 
Collections.singletonList(sortedTokens.ceilingEntry(token).getValue());
+        }
+
+        public Token replicationStart(Token token, Unit unit, 
NavigableMap<Token, Unit> sortedTokens)
+        {
+            return sortedTokens.lowerKey(token);
+        }
+
+        public String toString()
+        {
+            return "No replication";
+        }
+
+        public void addUnit(Unit n)
+        {
+        }
+
+        public void removeUnit(Unit n)
+        {
+        }
+
+        public int replicas()
+        {
+            return 1;
+        }
+
+        public boolean sameGroup(Unit n1, Unit n2)
+        {
+            return false;
+        }
+
+        public Object getGroup(Unit unit)
+        {
+            return unit;
+        }
+
+        public double spreadExpectation()
+        {
+            return 1;
+        }
+    }
+
+    static class SimpleReplicationStrategy implements TestReplicationStrategy
+    {
+        int replicas;
+
+        public SimpleReplicationStrategy(int replicas)
+        {
+            super();
+            this.replicas = replicas;
+        }
+
+        public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens)
+        {
+            List<Unit> endpoints = new ArrayList<Unit>(replicas);
+
+            token = sortedTokens.ceilingKey(token);
+            if (token == null)
+                token = sortedTokens.firstKey();
+            Iterator<Unit> iter = Iterables.concat(sortedTokens.tailMap(token, 
true).values(), sortedTokens.values()).iterator();
+            while (endpoints.size() < replicas)
+            {
+                if (!iter.hasNext())
+                    return endpoints;
+                Unit ep = iter.next();
+                if (!endpoints.contains(ep))
+                    endpoints.add(ep);
+            }
+            return endpoints;
+        }
+
+        public Token replicationStart(Token token, Unit unit, 
NavigableMap<Token, Unit> sortedTokens)
+        {
+            Set<Unit> seenUnits = Sets.newHashSet();
+            int unitsFound = 0;
+
+            for (Map.Entry<Token, Unit> en : Iterables.concat(
+                                                             
sortedTokens.headMap(token, false).descendingMap().entrySet(),
+                                                             
sortedTokens.descendingMap().entrySet()))
+            {
+                Unit n = en.getValue();
+                // Same group as investigated unit is a break; anything that 
could replicate in it replicates there.
+                if (n == unit)
+                    break;
+
+                if (seenUnits.add(n))
+                {
+                    if (++unitsFound == replicas)
+                        break;
+                }
+                token = en.getKey();
+            }
+            return token;
+        }
+
+        public void addUnit(Unit n)
+        {
+        }
+
+        public void removeUnit(Unit n)
+        {
+        }
+
+        public String toString()
+        {
+            return String.format("Simple %d replicas", replicas);
+        }
+
+        public int replicas()
+        {
+            return replicas;
+        }
+
+        public boolean sameGroup(Unit n1, Unit n2)
+        {
+            return false;
+        }
+
+        public Unit getGroup(Unit unit)
+        {
+            // The unit is the group.
+            return unit;
+        }
+
+        public double spreadExpectation()
+        {
+            return 1;
+        }
+    }
+
+    static abstract class GroupReplicationStrategy implements 
TestReplicationStrategy
+    {
+        final int replicas;
+        final Map<Unit, Integer> groupMap;
+
+        public GroupReplicationStrategy(int replicas)
+        {
+            this.replicas = replicas;
+            this.groupMap = Maps.newHashMap();
+        }
+
+        public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens)
+        {
+            List<Unit> endpoints = new ArrayList<Unit>(replicas);
+            BitSet usedGroups = new BitSet();
+
+            if (sortedTokens.isEmpty())
+                return endpoints;
+
+            token = sortedTokens.ceilingKey(token);
+            if (token == null)
+                token = sortedTokens.firstKey();
+            Iterator<Unit> iter = Iterables.concat(sortedTokens.tailMap(token, 
true).values(), sortedTokens.values()).iterator();
+            while (endpoints.size() < replicas)
+            {
+                // For simlicity assuming list can't be exhausted before 
finding all replicas.
+                Unit ep = iter.next();
+                int group = groupMap.get(ep);
+                if (!usedGroups.get(group))
+                {
+                    endpoints.add(ep);
+                    usedGroups.set(group);
+                }
+            }
+            return endpoints;
+        }
+
+        public Token lastReplicaToken(Token token, NavigableMap<Token, Unit> 
sortedTokens)
+        {
+            BitSet usedGroups = new BitSet();
+            int groupsFound = 0;
+
+            token = sortedTokens.ceilingKey(token);
+            if (token == null)
+                token = sortedTokens.firstKey();
+            for (Map.Entry<Token, Unit> en :
+            Iterables.concat(sortedTokens.tailMap(token, true).entrySet(),
+                             sortedTokens.entrySet()))
+            {
+                Unit ep = en.getValue();
+                int group = groupMap.get(ep);
+                if (!usedGroups.get(group))
+                {
+                    usedGroups.set(group);
+                    if (++groupsFound >= replicas)
+                        return en.getKey();
+                }
+            }
+            return token;
+        }
+
+        public Token replicationStart(Token token, Unit unit, 
NavigableMap<Token, Unit> sortedTokens)
+        {
+            // replicated ownership
+            int unitGroup = groupMap.get(unit);   // unit must be already added
+            BitSet seenGroups = new BitSet();
+            int groupsFound = 0;
+
+            for (Map.Entry<Token, Unit> en : Iterables.concat(
+                                                             
sortedTokens.headMap(token, false).descendingMap().entrySet(),
+                                                             
sortedTokens.descendingMap().entrySet()))
+            {
+                Unit n = en.getValue();
+                int ngroup = groupMap.get(n);
+                // Same group as investigated unit is a break; anything that 
could replicate in it replicates there.
+                if (ngroup == unitGroup)
+                    break;
+
+                if (!seenGroups.get(ngroup))
+                {
+                    if (++groupsFound == replicas)
+                        break;
+                    seenGroups.set(ngroup);
+                }
+                token = en.getKey();
+            }
+            return token;
+        }
+
+        public String toString()
+        {
+            Map<Integer, Integer> idToSize = instanceToCount(groupMap);
+            Map<Integer, Integer> sizeToCount = Maps.newTreeMap();
+            sizeToCount.putAll(instanceToCount(idToSize));
+            return String.format("%s strategy, %d replicas, group size to 
count %s", getClass().getSimpleName(), replicas, sizeToCount);
+        }
+
+        @Override
+        public int replicas()
+        {
+            return replicas;
+        }
+
+        public boolean sameGroup(Unit n1, Unit n2)
+        {
+            return groupMap.get(n1).equals(groupMap.get(n2));
+        }
+
+        public void removeUnit(Unit n)
+        {
+            groupMap.remove(n);
+        }
+
+        public Integer getGroup(Unit unit)
+        {
+            return groupMap.get(unit);
+        }
+
+        public double spreadExpectation()
+        {
+            return 1.5;   // Even balanced racks get disbalanced when they 
lose nodes.
+        }
+    }
+
+    private static <T> Map<T, Integer> instanceToCount(Map<?, T> map)
+    {
+        Map<T, Integer> idToCount = Maps.newHashMap();
+        for (Map.Entry<?, T> en : map.entrySet())
+        {
+            Integer old = idToCount.get(en.getValue());
+            idToCount.put(en.getValue(), old != null ? old + 1 : 1);
+        }
+        return idToCount;
+    }
+
+    /**
+     * Group strategy spreading units into a fixed number of groups.
+     */
+    static class FixedGroupCountReplicationStrategy extends 
GroupReplicationStrategy
+    {
+        int groupId;
+        int groupCount;
+
+        public FixedGroupCountReplicationStrategy(int replicas, int groupCount)
+        {
+            super(replicas);
+            assert groupCount >= replicas;
+            groupId = 0;
+            this.groupCount = groupCount;
+        }
+
+        public void addUnit(Unit n)
+        {
+            groupMap.put(n, groupId++ % groupCount);
+        }
+    }
+
+    /**
+     * Group strategy with a fixed number of units per group.
+     */
+    static class BalancedGroupReplicationStrategy extends 
GroupReplicationStrategy
+    {
+        int groupId;
+        int groupSize;
+
+        public BalancedGroupReplicationStrategy(int replicas, int groupSize)
+        {
+            super(replicas);
+            groupId = 0;
+            this.groupSize = groupSize;
+        }
+
+        public void addUnit(Unit n)
+        {
+            groupMap.put(n, groupId++ / groupSize);
+        }
+    }
+
+    static class UnbalancedGroupReplicationStrategy extends 
GroupReplicationStrategy
+    {
+        int groupId;
+        int nextSize;
+        int num;
+        int minGroupSize;
+        int maxGroupSize;
+        Random rand;
+
+        public UnbalancedGroupReplicationStrategy(int replicas, int 
minGroupSize, int maxGroupSize, Random rand)
+        {
+            super(replicas);
+            groupId = -1;
+            nextSize = 0;
+            num = 0;
+            this.maxGroupSize = maxGroupSize;
+            this.minGroupSize = minGroupSize;
+            this.rand = rand;
+        }
+
+        public void addUnit(Unit n)
+        {
+            if (++num > nextSize)
+            {
+                nextSize = minGroupSize + rand.nextInt(maxGroupSize - 
minGroupSize + 1);
+                ++groupId;
+                num = 0;
+            }
+            groupMap.put(n, groupId);
+        }
+
+        public double spreadExpectation()
+        {
+            return 2;
+        }
+    }
+
+    static Map<Unit, Double> 
evaluateReplicatedOwnership(ReplicationAwareTokenAllocator<Unit> t)
+    {
+        Map<Unit, Double> ownership = Maps.newHashMap();
+        Iterator<Token> it = t.sortedTokens.keySet().iterator();
+        if (!it.hasNext())
+            return ownership;
+
+        Token current = it.next();
+        while (it.hasNext())
+        {
+            Token next = it.next();
+            addOwnership(t, current, next, ownership);
+            current = next;
+        }
+        addOwnership(t, current, t.sortedTokens.firstKey(), ownership);
+
+        return ownership;
+    }
+
+    private static void addOwnership(ReplicationAwareTokenAllocator<Unit> t, 
Token current, Token next, Map<Unit, Double> ownership)
+    {
+        TestReplicationStrategy ts = (TestReplicationStrategy) t.strategy;
+        double size = current.size(next);
+        Token representative = t.partitioner.midpoint(current, next);
+        for (Unit n : ts.getReplicas(representative, t.sortedTokens))
+        {
+            Double v = ownership.get(n);
+            ownership.put(n, v != null ? v + size : size);
+        }
+    }
+
+    private static double replicatedTokenOwnership(Token token, 
NavigableMap<Token, Unit> sortedTokens, ReplicationStrategy<Unit> strategy)
+    {
+        TestReplicationStrategy ts = (TestReplicationStrategy) strategy;
+        Token next = sortedTokens.higherKey(token);
+        if (next == null)
+            next = sortedTokens.firstKey();
+        return ts.replicationStart(token, sortedTokens.get(token), 
sortedTokens).size(next);
+    }
+
+    static interface TokenCount
+    {
+        int tokenCount(int perUnitCount, Random rand);
+
+        double spreadExpectation();
+    }
+
+    static TokenCount fixedTokenCount = new TokenCount()
+    {
+        public int tokenCount(int perUnitCount, Random rand)
+        {
+            return perUnitCount;
+        }
+
+        public double spreadExpectation()
+        {
+            return 4;  // High tolerance to avoid flakiness.
+        }
+    };
+
+    static TokenCount varyingTokenCount = new TokenCount()
+    {
+        public int tokenCount(int perUnitCount, Random rand)
+        {
+            if (perUnitCount == 1) return 1;
+            // 25 to 175%
+            return rand.nextInt(perUnitCount * 3 / 2) + (perUnitCount + 3) / 4;
+        }
+
+        public double spreadExpectation()
+        {
+            return 8;  // High tolerance to avoid flakiness.
+        }
+    };
+
+    Random seededRand = new Random(2);
+
+    private void random(Map<Token, Unit> map, TestReplicationStrategy rs,
+                        int unitCount, TokenCount tc, int perUnitCount, 
IPartitioner partitioner)
+    {
+        System.out.format("\nRandom generation of %d units with %d tokens 
each\n", unitCount, perUnitCount);
+        Random rand = seededRand;
+        for (int i = 0; i < unitCount; i++)
+        {
+            Unit unit = new Unit();
+            rs.addUnit(unit);
+            int tokens = tc.tokenCount(perUnitCount, rand);
+            for (int j = 0; j < tokens; j++)
+            {
+                map.put(partitioner.getRandomToken(rand), unit);
+            }
+        }
+    }
+
+    protected void testExistingCluster(IPartitioner partitioner, int 
maxVNodeCount)
+    {
+        for (int rf = 1; rf <= 5; ++rf)
+        {
+            for (int perUnitCount = 1; perUnitCount <= maxVNodeCount; 
perUnitCount *= 4)
+            {
+                testExistingCluster(perUnitCount, fixedTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
+                testExistingCluster(perUnitCount, varyingTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
+                if (rf == 1) continue;  // Replication strategy doesn't matter 
for RF = 1.
+                for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 4 
< TARGET_CLUSTER_SIZE; groupSize *= 4)
+                {
+                    testExistingCluster(perUnitCount, fixedTokenCount,
+                                        new 
BalancedGroupReplicationStrategy(rf, groupSize), partitioner);
+                    testExistingCluster(perUnitCount, varyingTokenCount,
+                                        new 
UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, 
seededRand),
+                                        partitioner);
+                }
+                testExistingCluster(perUnitCount, fixedTokenCount,
+                                    new FixedGroupCountReplicationStrategy(rf, 
rf * 2), partitioner);
+            }
+        }
+    }
+
+    private void testExistingCluster(int perUnitCount, TokenCount tc, 
TestReplicationStrategy rs, IPartitioner partitioner)
+    {
+        System.out.println("Testing existing cluster, target " + perUnitCount 
+ " vnodes, replication " + rs);
+        final int targetClusterSize = TARGET_CLUSTER_SIZE;
+        NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap();
+
+        random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount, 
partitioner);
+
+        ReplicationAwareTokenAllocator<Unit> t = new 
ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner);
+        grow(t, targetClusterSize * 9 / 10, tc, perUnitCount, false);
+        grow(t, targetClusterSize, tc, perUnitCount, true);
+        loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount, 
partitioner);
+        System.out.println();
+    }
+
+    protected void testNewCluster(IPartitioner partitioner, int maxVNodeCount)
+    {
+        // This test is flaky because the selection of the tokens for the 
first RF nodes (which is random, with an
+        // uncontrolled seed) can sometimes cause a pathological situation 
where the algorithm will find a (close to)
+        // ideal distribution of tokens for some number of nodes, which in 
turn will inevitably cause it to go into a
+        // bad (unacceptable to the test criteria) distribution after adding 
one more node.
+
+        // This should happen very rarely, unless something is broken in the 
token allocation code.
+
+        for (int rf = 2; rf <= 5; ++rf)
+        {
+            for (int perUnitCount = 1; perUnitCount <= maxVNodeCount; 
perUnitCount *= 4)
+            {
+                testNewCluster(perUnitCount, fixedTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
+                testNewCluster(perUnitCount, varyingTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
+                if (rf == 1) continue;  // Replication strategy doesn't matter 
for RF = 1.
+                for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 8 
< TARGET_CLUSTER_SIZE; groupSize *= 4)
+                {
+                    testNewCluster(perUnitCount, fixedTokenCount,
+                                   new BalancedGroupReplicationStrategy(rf, 
groupSize), partitioner);
+                    testNewCluster(perUnitCount, varyingTokenCount,
+                                   new UnbalancedGroupReplicationStrategy(rf, 
groupSize / 2, groupSize * 2, seededRand),
+                                   partitioner);
+                }
+                testNewCluster(perUnitCount, fixedTokenCount,
+                               new FixedGroupCountReplicationStrategy(rf, rf * 
2), partitioner);
+            }
+        }
+    }
+
+    private void testNewCluster(int perUnitCount, TokenCount tc, 
TestReplicationStrategy rs, IPartitioner partitioner)
+    {
+        System.out.println("Testing new cluster, target " + perUnitCount + " 
vnodes, replication " + rs);
+        final int targetClusterSize = TARGET_CLUSTER_SIZE;
+        NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap();
+
+        ReplicationAwareTokenAllocator<Unit> t = new 
ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner);
+        grow(t, targetClusterSize * 2 / 5, tc, perUnitCount, false);
+        grow(t, targetClusterSize, tc, perUnitCount, true);
+        loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount, 
partitioner);
+        System.out.println();
+    }
+
+    private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int 
howMany,
+                                TokenCount tc, int perUnitCount, IPartitioner 
partitioner)
+    {
+        int fullCount = t.unitCount();
+        System.out.format("Losing %d units. ", howMany);
+        for (int i = 0; i < howMany; ++i)
+        {
+            Unit u = t.unitFor(partitioner.getRandomToken(seededRand));
+            t.removeUnit(u);
+            ((TestReplicationStrategy) t.strategy).removeUnit(u);
+        }
+        // Grow half without verifying.
+        grow(t, (t.unitCount() + fullCount * 3) / 4, tc, perUnitCount, false);
+        // Metrics should be back to normal by now. Check that they remain so.
+        grow(t, fullCount, tc, perUnitCount, true);
+    }
+
+    static class Summary
+    {
+        double min = 1;
+        double max = 1;
+        double stddev = 0;
+
+        void update(SummaryStatistics stat)
+        {
+            min = Math.min(min, stat.getMin());
+            max = Math.max(max, stat.getMax());
+            stddev = Math.max(stddev, stat.getStandardDeviation());
+        }
+
+        public String toString()
+        {
+            return String.format("max %.2f min %.2f stddev %.4f", max, min, 
stddev);
+        }
+    }
+
+    public void grow(ReplicationAwareTokenAllocator<Unit> t, int 
targetClusterSize, TokenCount tc, int perUnitCount, boolean verifyMetrics)
+    {
+        int size = t.unitCount();
+        Summary su = new Summary();
+        Summary st = new Summary();
+        Random rand = new Random(targetClusterSize + perUnitCount);
+        TestReplicationStrategy strategy = (TestReplicationStrategy) 
t.strategy;
+        if (size < targetClusterSize)
+        {
+            System.out.format("Adding %d unit(s) using %s...", 
targetClusterSize - size, t.toString());
+            long time = System.currentTimeMillis();
+            while (size < targetClusterSize)
+            {
+                int tokens = tc.tokenCount(perUnitCount, rand);
+                Unit unit = new Unit();
+                strategy.addUnit(unit);
+                t.addUnit(unit, tokens);
+                ++size;
+                if (verifyMetrics)
+                    updateSummary(t, su, st, false);
+            }
+            System.out.format(" Done in %.3fs\n", (System.currentTimeMillis() 
- time) / 1000.0);
+            if (verifyMetrics)
+            {
+                updateSummary(t, su, st, true);
+                double maxExpected = 1.0 + tc.spreadExpectation() * 
strategy.spreadExpectation() / (perUnitCount * t.replicas);
+                if (su.max > maxExpected)
+                {
+                    Assert.fail(String.format("Expected max unit size below 
%.4f, was %.4f", maxExpected, su.max));
+                }
+                // We can't verify lower side range as small loads can't 
always be fixed.
+            }
+        }
+    }
+
+
+    private void updateSummary(ReplicationAwareTokenAllocator<Unit> t, Summary 
su, Summary st, boolean print)
+    {
+        int size = t.sortedTokens.size();
+        double inverseAverage = 1.0 * size / t.strategy.replicas();
+
+        Map<Unit, Double> ownership = evaluateReplicatedOwnership(t);
+        SummaryStatistics unitStat = new SummaryStatistics();
+        for (Map.Entry<Unit, Double> en : ownership.entrySet())
+            unitStat.addValue(en.getValue() * inverseAverage / 
t.unitToTokens.get(en.getKey()).size());
+        su.update(unitStat);
+
+        SummaryStatistics tokenStat = new SummaryStatistics();
+        for (Token tok : t.sortedTokens.keySet())
+            tokenStat.addValue(replicatedTokenOwnership(tok, t.sortedTokens, 
t.strategy) * inverseAverage);
+        st.update(tokenStat);
+
+        if (print)
+        {
+            System.out.format("Size %d(%d)   \tunit %s  token %s   %s\n",
+                              t.unitCount(), size,
+                              mms(unitStat),
+                              mms(tokenStat),
+                              t.strategy);
+            System.out.format("Worst intermediate unit\t%s  token %s\n", su, 
st);
+        }
+    }
+
+
+    private static String mms(SummaryStatistics s)
+    {
+        return String.format("max %.2f min %.2f stddev %.4f", s.getMax(), 
s.getMin(), s.getStandardDeviation());
+    }
+
+
+    int nextUnitId = 0;
+
+    final class Unit implements Comparable<Unit>
+    {
+        int unitId = nextUnitId++;
+
+        public String toString()
+        {
+            return Integer.toString(unitId);
+        }
+
+        @Override
+        public int compareTo(Unit o)
+        {
+            return Integer.compare(unitId, o.unitId);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/long/org/apache/cassandra/dht/tokenallocator/Murmur3ReplicationAwareTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/dht/tokenallocator/Murmur3ReplicationAwareTokenAllocatorTest.java
 
b/test/long/org/apache/cassandra/dht/tokenallocator/Murmur3ReplicationAwareTokenAllocatorTest.java
new file mode 100644
index 0000000..e28ecfa
--- /dev/null
+++ 
b/test/long/org/apache/cassandra/dht/tokenallocator/Murmur3ReplicationAwareTokenAllocatorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht.tokenallocator;
+
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+
+public class Murmur3ReplicationAwareTokenAllocatorTest extends 
AbstractReplicationAwareTokenAllocatorTest
+{
+    private static final int MAX_VNODE_COUNT = 64;
+
+    @Test
+    public void testExistingCluster()
+    {
+        super.testExistingCluster(new Murmur3Partitioner(), MAX_VNODE_COUNT);
+    }
+
+    @Test
+    public void testNewCluster()
+    {
+        Util.flakyTest(this::flakyTestNewCluster,
+                       2,
+                       "It tends to fail sometimes due to the random selection 
of the tokens in the first few nodes.");
+    }
+
+    private void flakyTestNewCluster()
+    {
+        testNewCluster(new Murmur3Partitioner(), MAX_VNODE_COUNT);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java
 
b/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java
new file mode 100644
index 0000000..bd94442
--- /dev/null
+++ 
b/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht.tokenallocator;
+
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.dht.RandomPartitioner;
+
+public class RandomReplicationAwareTokenAllocatorTest extends 
AbstractReplicationAwareTokenAllocatorTest
+{
+    /** The maximum number of vnodes to use in the tests.
+     *  For RandomPartitioner we use a smaller number because
+     *  the tests take much longer and would otherwise timeout,
+     *  see CASSANDRA-12784.
+     * */
+    private static final int MAX_VNODE_COUNT = 16;
+
+    @Test
+    public void testExistingCluster()
+    {
+        testExistingCluster(new RandomPartitioner(), MAX_VNODE_COUNT);
+    }
+
+    @Test
+    public void testNewClusterr()
+    {
+        Util.flakyTest(this::flakyTestNewCluster,
+                       3,
+                       "It tends to fail sometimes due to the random selection 
of the tokens in the first few nodes.");
+    }
+
+    private void flakyTestNewCluster()
+    {
+        testNewCluster(new RandomPartitioner(), MAX_VNODE_COUNT);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
 
b/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
deleted file mode 100644
index 482e2ac..0000000
--- 
a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
+++ /dev/null
@@ -1,755 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.dht.tokenallocator;
-
-import java.util.*;
-
-import junit.framework.Assert;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
-
-import org.junit.Test;
-
-import org.apache.cassandra.Util;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.dht.Token;
-
-public class ReplicationAwareTokenAllocatorTest
-{
-    private static final int MAX_VNODE_COUNT = 64;
-
-    private static final int TARGET_CLUSTER_SIZE = 250;
-
-    interface TestReplicationStrategy extends ReplicationStrategy<Unit>
-    {
-        void addUnit(Unit n);
-
-        void removeUnit(Unit n);
-
-        /**
-         * Returns a list of all replica units for given token.
-         */
-        List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens);
-
-        /**
-         * Returns the start of the token span that is replicated in this 
token.
-         * Note: Though this is not trivial to see, the replicated span is 
always contiguous. A token in the same
-         * group acts as a barrier; if one is not found the token replicates 
everything up to the replica'th distinct
-         * group seen in front of it.
-         */
-        Token replicationStart(Token token, Unit unit, NavigableMap<Token, 
Unit> sortedTokens);
-
-        /**
-         * Multiplier for the acceptable disbalance in the cluster. With some 
strategies it is harder to achieve good
-         * results.
-         */
-        public double spreadExpectation();
-    }
-
-    static class NoReplicationStrategy implements TestReplicationStrategy
-    {
-        public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens)
-        {
-            return 
Collections.singletonList(sortedTokens.ceilingEntry(token).getValue());
-        }
-
-        public Token replicationStart(Token token, Unit unit, 
NavigableMap<Token, Unit> sortedTokens)
-        {
-            return sortedTokens.lowerKey(token);
-        }
-
-        public String toString()
-        {
-            return "No replication";
-        }
-
-        public void addUnit(Unit n)
-        {
-        }
-
-        public void removeUnit(Unit n)
-        {
-        }
-
-        public int replicas()
-        {
-            return 1;
-        }
-
-        public boolean sameGroup(Unit n1, Unit n2)
-        {
-            return false;
-        }
-
-        public Object getGroup(Unit unit)
-        {
-            return unit;
-        }
-
-        public double spreadExpectation()
-        {
-            return 1;
-        }
-    }
-
-    static class SimpleReplicationStrategy implements TestReplicationStrategy
-    {
-        int replicas;
-
-        public SimpleReplicationStrategy(int replicas)
-        {
-            super();
-            this.replicas = replicas;
-        }
-
-        public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens)
-        {
-            List<Unit> endpoints = new ArrayList<Unit>(replicas);
-
-            token = sortedTokens.ceilingKey(token);
-            if (token == null)
-                token = sortedTokens.firstKey();
-            Iterator<Unit> iter = Iterables.concat(sortedTokens.tailMap(token, 
true).values(), sortedTokens.values()).iterator();
-            while (endpoints.size() < replicas)
-            {
-                if (!iter.hasNext())
-                    return endpoints;
-                Unit ep = iter.next();
-                if (!endpoints.contains(ep))
-                    endpoints.add(ep);
-            }
-            return endpoints;
-        }
-
-        public Token replicationStart(Token token, Unit unit, 
NavigableMap<Token, Unit> sortedTokens)
-        {
-            Set<Unit> seenUnits = Sets.newHashSet();
-            int unitsFound = 0;
-
-            for (Map.Entry<Token, Unit> en : Iterables.concat(
-                                                             
sortedTokens.headMap(token, false).descendingMap().entrySet(),
-                                                             
sortedTokens.descendingMap().entrySet()))
-            {
-                Unit n = en.getValue();
-                // Same group as investigated unit is a break; anything that 
could replicate in it replicates there.
-                if (n == unit)
-                    break;
-
-                if (seenUnits.add(n))
-                {
-                    if (++unitsFound == replicas)
-                        break;
-                }
-                token = en.getKey();
-            }
-            return token;
-        }
-
-        public void addUnit(Unit n)
-        {
-        }
-
-        public void removeUnit(Unit n)
-        {
-        }
-
-        public String toString()
-        {
-            return String.format("Simple %d replicas", replicas);
-        }
-
-        public int replicas()
-        {
-            return replicas;
-        }
-
-        public boolean sameGroup(Unit n1, Unit n2)
-        {
-            return false;
-        }
-
-        public Unit getGroup(Unit unit)
-        {
-            // The unit is the group.
-            return unit;
-        }
-
-        public double spreadExpectation()
-        {
-            return 1;
-        }
-    }
-
-    static abstract class GroupReplicationStrategy implements 
TestReplicationStrategy
-    {
-        final int replicas;
-        final Map<Unit, Integer> groupMap;
-
-        public GroupReplicationStrategy(int replicas)
-        {
-            this.replicas = replicas;
-            this.groupMap = Maps.newHashMap();
-        }
-
-        public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens)
-        {
-            List<Unit> endpoints = new ArrayList<Unit>(replicas);
-            BitSet usedGroups = new BitSet();
-
-            if (sortedTokens.isEmpty())
-                return endpoints;
-
-            token = sortedTokens.ceilingKey(token);
-            if (token == null)
-                token = sortedTokens.firstKey();
-            Iterator<Unit> iter = Iterables.concat(sortedTokens.tailMap(token, 
true).values(), sortedTokens.values()).iterator();
-            while (endpoints.size() < replicas)
-            {
-                // For simlicity assuming list can't be exhausted before 
finding all replicas.
-                Unit ep = iter.next();
-                int group = groupMap.get(ep);
-                if (!usedGroups.get(group))
-                {
-                    endpoints.add(ep);
-                    usedGroups.set(group);
-                }
-            }
-            return endpoints;
-        }
-
-        public Token lastReplicaToken(Token token, NavigableMap<Token, Unit> 
sortedTokens)
-        {
-            BitSet usedGroups = new BitSet();
-            int groupsFound = 0;
-
-            token = sortedTokens.ceilingKey(token);
-            if (token == null)
-                token = sortedTokens.firstKey();
-            for (Map.Entry<Token, Unit> en :
-            Iterables.concat(sortedTokens.tailMap(token, true).entrySet(),
-                             sortedTokens.entrySet()))
-            {
-                Unit ep = en.getValue();
-                int group = groupMap.get(ep);
-                if (!usedGroups.get(group))
-                {
-                    usedGroups.set(group);
-                    if (++groupsFound >= replicas)
-                        return en.getKey();
-                }
-            }
-            return token;
-        }
-
-        public Token replicationStart(Token token, Unit unit, 
NavigableMap<Token, Unit> sortedTokens)
-        {
-            // replicated ownership
-            int unitGroup = groupMap.get(unit);   // unit must be already added
-            BitSet seenGroups = new BitSet();
-            int groupsFound = 0;
-
-            for (Map.Entry<Token, Unit> en : Iterables.concat(
-                                                             
sortedTokens.headMap(token, false).descendingMap().entrySet(),
-                                                             
sortedTokens.descendingMap().entrySet()))
-            {
-                Unit n = en.getValue();
-                int ngroup = groupMap.get(n);
-                // Same group as investigated unit is a break; anything that 
could replicate in it replicates there.
-                if (ngroup == unitGroup)
-                    break;
-
-                if (!seenGroups.get(ngroup))
-                {
-                    if (++groupsFound == replicas)
-                        break;
-                    seenGroups.set(ngroup);
-                }
-                token = en.getKey();
-            }
-            return token;
-        }
-
-        public String toString()
-        {
-            Map<Integer, Integer> idToSize = instanceToCount(groupMap);
-            Map<Integer, Integer> sizeToCount = Maps.newTreeMap();
-            sizeToCount.putAll(instanceToCount(idToSize));
-            return String.format("%s strategy, %d replicas, group size to 
count %s", getClass().getSimpleName(), replicas, sizeToCount);
-        }
-
-        @Override
-        public int replicas()
-        {
-            return replicas;
-        }
-
-        public boolean sameGroup(Unit n1, Unit n2)
-        {
-            return groupMap.get(n1).equals(groupMap.get(n2));
-        }
-
-        public void removeUnit(Unit n)
-        {
-            groupMap.remove(n);
-        }
-
-        public Integer getGroup(Unit unit)
-        {
-            return groupMap.get(unit);
-        }
-
-        public double spreadExpectation()
-        {
-            return 1.5;   // Even balanced racks get disbalanced when they 
lose nodes.
-        }
-    }
-
-    private static <T> Map<T, Integer> instanceToCount(Map<?, T> map)
-    {
-        Map<T, Integer> idToCount = Maps.newHashMap();
-        for (Map.Entry<?, T> en : map.entrySet())
-        {
-            Integer old = idToCount.get(en.getValue());
-            idToCount.put(en.getValue(), old != null ? old + 1 : 1);
-        }
-        return idToCount;
-    }
-
-    /**
-     * Group strategy spreading units into a fixed number of groups.
-     */
-    static class FixedGroupCountReplicationStrategy extends 
GroupReplicationStrategy
-    {
-        int groupId;
-        int groupCount;
-
-        public FixedGroupCountReplicationStrategy(int replicas, int groupCount)
-        {
-            super(replicas);
-            assert groupCount >= replicas;
-            groupId = 0;
-            this.groupCount = groupCount;
-        }
-
-        public void addUnit(Unit n)
-        {
-            groupMap.put(n, groupId++ % groupCount);
-        }
-    }
-
-    /**
-     * Group strategy with a fixed number of units per group.
-     */
-    static class BalancedGroupReplicationStrategy extends 
GroupReplicationStrategy
-    {
-        int groupId;
-        int groupSize;
-
-        public BalancedGroupReplicationStrategy(int replicas, int groupSize)
-        {
-            super(replicas);
-            groupId = 0;
-            this.groupSize = groupSize;
-        }
-
-        public void addUnit(Unit n)
-        {
-            groupMap.put(n, groupId++ / groupSize);
-        }
-    }
-
-    static class UnbalancedGroupReplicationStrategy extends 
GroupReplicationStrategy
-    {
-        int groupId;
-        int nextSize;
-        int num;
-        int minGroupSize;
-        int maxGroupSize;
-        Random rand;
-
-        public UnbalancedGroupReplicationStrategy(int replicas, int 
minGroupSize, int maxGroupSize, Random rand)
-        {
-            super(replicas);
-            groupId = -1;
-            nextSize = 0;
-            num = 0;
-            this.maxGroupSize = maxGroupSize;
-            this.minGroupSize = minGroupSize;
-            this.rand = rand;
-        }
-
-        public void addUnit(Unit n)
-        {
-            if (++num > nextSize)
-            {
-                nextSize = minGroupSize + rand.nextInt(maxGroupSize - 
minGroupSize + 1);
-                ++groupId;
-                num = 0;
-            }
-            groupMap.put(n, groupId);
-        }
-
-        public double spreadExpectation()
-        {
-            return 2;
-        }
-    }
-
-    static Map<Unit, Double> 
evaluateReplicatedOwnership(ReplicationAwareTokenAllocator<Unit> t)
-    {
-        Map<Unit, Double> ownership = Maps.newHashMap();
-        Iterator<Token> it = t.sortedTokens.keySet().iterator();
-        if (!it.hasNext())
-            return ownership;
-
-        Token current = it.next();
-        while (it.hasNext())
-        {
-            Token next = it.next();
-            addOwnership(t, current, next, ownership);
-            current = next;
-        }
-        addOwnership(t, current, t.sortedTokens.firstKey(), ownership);
-
-        return ownership;
-    }
-
-    private static void addOwnership(ReplicationAwareTokenAllocator<Unit> t, 
Token current, Token next, Map<Unit, Double> ownership)
-    {
-        TestReplicationStrategy ts = (TestReplicationStrategy) t.strategy;
-        double size = current.size(next);
-        Token representative = t.partitioner.midpoint(current, next);
-        for (Unit n : ts.getReplicas(representative, t.sortedTokens))
-        {
-            Double v = ownership.get(n);
-            ownership.put(n, v != null ? v + size : size);
-        }
-    }
-
-    private static double replicatedTokenOwnership(Token token, 
NavigableMap<Token, Unit> sortedTokens, ReplicationStrategy<Unit> strategy)
-    {
-        TestReplicationStrategy ts = (TestReplicationStrategy) strategy;
-        Token next = sortedTokens.higherKey(token);
-        if (next == null)
-            next = sortedTokens.firstKey();
-        return ts.replicationStart(token, sortedTokens.get(token), 
sortedTokens).size(next);
-    }
-
-    static interface TokenCount
-    {
-        int tokenCount(int perUnitCount, Random rand);
-
-        double spreadExpectation();
-    }
-
-    static TokenCount fixedTokenCount = new TokenCount()
-    {
-        public int tokenCount(int perUnitCount, Random rand)
-        {
-            return perUnitCount;
-        }
-
-        public double spreadExpectation()
-        {
-            return 4;  // High tolerance to avoid flakiness.
-        }
-    };
-
-    static TokenCount varyingTokenCount = new TokenCount()
-    {
-        public int tokenCount(int perUnitCount, Random rand)
-        {
-            if (perUnitCount == 1) return 1;
-            // 25 to 175%
-            return rand.nextInt(perUnitCount * 3 / 2) + (perUnitCount + 3) / 4;
-        }
-
-        public double spreadExpectation()
-        {
-            return 8;  // High tolerance to avoid flakiness.
-        }
-    };
-
-    Random seededRand = new Random(2);
-
-    private void random(Map<Token, Unit> map, TestReplicationStrategy rs,
-                        int unitCount, TokenCount tc, int perUnitCount, 
IPartitioner partitioner)
-    {
-        System.out.format("\nRandom generation of %d units with %d tokens 
each\n", unitCount, perUnitCount);
-        Random rand = seededRand;
-        for (int i = 0; i < unitCount; i++)
-        {
-            Unit unit = new Unit();
-            rs.addUnit(unit);
-            int tokens = tc.tokenCount(perUnitCount, rand);
-            for (int j = 0; j < tokens; j++)
-            {
-                map.put(partitioner.getRandomToken(rand), unit);
-            }
-        }
-    }
-
-    @Test
-    public void testExistingClusterWithRandomPartitioner()
-    {
-        testExistingCluster(new RandomPartitioner());
-    }
-
-    @Test
-    public void testExistingClusterWithMurmur3Partitioner()
-    {
-        testExistingCluster(new Murmur3Partitioner());
-    }
-
-    public void testExistingCluster(IPartitioner partitioner)
-    {
-        for (int rf = 1; rf <= 5; ++rf)
-        {
-            for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; 
perUnitCount *= 4)
-            {
-                testExistingCluster(perUnitCount, fixedTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
-                testExistingCluster(perUnitCount, varyingTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
-                if (rf == 1) continue;  // Replication strategy doesn't matter 
for RF = 1.
-                for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 4 
< TARGET_CLUSTER_SIZE; groupSize *= 4)
-                {
-                    testExistingCluster(perUnitCount, fixedTokenCount,
-                                        new 
BalancedGroupReplicationStrategy(rf, groupSize), partitioner);
-                    testExistingCluster(perUnitCount, varyingTokenCount,
-                                        new 
UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, 
seededRand),
-                                        partitioner);
-                }
-                testExistingCluster(perUnitCount, fixedTokenCount,
-                                    new FixedGroupCountReplicationStrategy(rf, 
rf * 2), partitioner);
-            }
-        }
-    }
-
-    public void testExistingCluster(int perUnitCount, TokenCount tc, 
TestReplicationStrategy rs, IPartitioner partitioner)
-    {
-        System.out.println("Testing existing cluster, target " + perUnitCount 
+ " vnodes, replication " + rs);
-        final int targetClusterSize = TARGET_CLUSTER_SIZE;
-        NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap();
-
-        random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount, 
partitioner);
-
-        ReplicationAwareTokenAllocator<Unit> t = new 
ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner);
-        grow(t, targetClusterSize * 9 / 10, tc, perUnitCount, false);
-        grow(t, targetClusterSize, tc, perUnitCount, true);
-        loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount, 
partitioner);
-        System.out.println();
-    }
-
-    @Test
-    public void testNewClusterWithRandomPartitioner()
-    {
-        Util.flakyTest(this::flakyTestNewClusterWithRandomPartitioner,
-                       5,
-                       "It tends to fail sometimes due to the random selection 
of the tokens in the first few nodes.");
-    }
-
-    @Test
-    public void testNewClusterWithMurmur3Partitioner()
-    {
-        Util.flakyTest(this::flakyTestNewClusterWithMurmur3Partitioner,
-                       5,
-                       "It tends to fail sometimes due to the random selection 
of the tokens in the first few nodes.");
-    }
-
-    public void flakyTestNewClusterWithRandomPartitioner()
-    {
-        flakyTestNewCluster(new RandomPartitioner());
-    }
-
-    public void flakyTestNewClusterWithMurmur3Partitioner()
-    {
-        flakyTestNewCluster(new Murmur3Partitioner());
-    }
-
-    public void flakyTestNewCluster(IPartitioner partitioner)
-    {
-        // This test is flaky because the selection of the tokens for the 
first RF nodes (which is random, with an
-        // uncontrolled seed) can sometimes cause a pathological situation 
where the algorithm will find a (close to)
-        // ideal distribution of tokens for some number of nodes, which in 
turn will inevitably cause it to go into a
-        // bad (unacceptable to the test criteria) distribution after adding 
one more node.
-
-        // This should happen very rarely, unless something is broken in the 
token allocation code.
-
-        for (int rf = 2; rf <= 5; ++rf)
-        {
-            for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; 
perUnitCount *= 4)
-            {
-                testNewCluster(perUnitCount, fixedTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
-                testNewCluster(perUnitCount, varyingTokenCount, new 
SimpleReplicationStrategy(rf), partitioner);
-                if (rf == 1) continue;  // Replication strategy doesn't matter 
for RF = 1.
-                for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 8 
< TARGET_CLUSTER_SIZE; groupSize *= 4)
-                {
-                    testNewCluster(perUnitCount, fixedTokenCount,
-                                   new BalancedGroupReplicationStrategy(rf, 
groupSize), partitioner);
-                    testNewCluster(perUnitCount, varyingTokenCount,
-                                   new UnbalancedGroupReplicationStrategy(rf, 
groupSize / 2, groupSize * 2, seededRand),
-                                   partitioner);
-                }
-                testNewCluster(perUnitCount, fixedTokenCount,
-                               new FixedGroupCountReplicationStrategy(rf, rf * 
2), partitioner);
-            }
-        }
-    }
-
-    public void testNewCluster(int perUnitCount, TokenCount tc, 
TestReplicationStrategy rs, IPartitioner partitioner)
-    {
-        System.out.println("Testing new cluster, target " + perUnitCount + " 
vnodes, replication " + rs);
-        final int targetClusterSize = TARGET_CLUSTER_SIZE;
-        NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap();
-
-        ReplicationAwareTokenAllocator<Unit> t = new 
ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner);
-        grow(t, targetClusterSize * 2 / 5, tc, perUnitCount, false);
-        grow(t, targetClusterSize, tc, perUnitCount, true);
-        loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount, 
partitioner);
-        System.out.println();
-    }
-
-    private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int 
howMany,
-                                TokenCount tc, int perUnitCount, IPartitioner 
partitioner)
-    {
-        int fullCount = t.unitCount();
-        System.out.format("Losing %d units. ", howMany);
-        for (int i = 0; i < howMany; ++i)
-        {
-            Unit u = t.unitFor(partitioner.getRandomToken(seededRand));
-            t.removeUnit(u);
-            ((TestReplicationStrategy) t.strategy).removeUnit(u);
-        }
-        // Grow half without verifying.
-        grow(t, (t.unitCount() + fullCount * 3) / 4, tc, perUnitCount, false);
-        // Metrics should be back to normal by now. Check that they remain so.
-        grow(t, fullCount, tc, perUnitCount, true);
-    }
-
-    static class Summary
-    {
-        double min = 1;
-        double max = 1;
-        double stddev = 0;
-
-        void update(SummaryStatistics stat)
-        {
-            min = Math.min(min, stat.getMin());
-            max = Math.max(max, stat.getMax());
-            stddev = Math.max(stddev, stat.getStandardDeviation());
-        }
-
-        public String toString()
-        {
-            return String.format("max %.2f min %.2f stddev %.4f", max, min, 
stddev);
-        }
-    }
-
-    public void grow(ReplicationAwareTokenAllocator<Unit> t, int 
targetClusterSize, TokenCount tc, int perUnitCount, boolean verifyMetrics)
-    {
-        int size = t.unitCount();
-        Summary su = new Summary();
-        Summary st = new Summary();
-        Random rand = new Random(targetClusterSize + perUnitCount);
-        TestReplicationStrategy strategy = (TestReplicationStrategy) 
t.strategy;
-        if (size < targetClusterSize)
-        {
-            System.out.format("Adding %d unit(s) using %s...", 
targetClusterSize - size, t.toString());
-            long time = System.currentTimeMillis();
-            while (size < targetClusterSize)
-            {
-                int tokens = tc.tokenCount(perUnitCount, rand);
-                Unit unit = new Unit();
-                strategy.addUnit(unit);
-                t.addUnit(unit, tokens);
-                ++size;
-                if (verifyMetrics)
-                    updateSummary(t, su, st, false);
-            }
-            System.out.format(" Done in %.3fs\n", (System.currentTimeMillis() 
- time) / 1000.0);
-            if (verifyMetrics)
-            {
-                updateSummary(t, su, st, true);
-                double maxExpected = 1.0 + tc.spreadExpectation() * 
strategy.spreadExpectation() / (perUnitCount * t.replicas);
-                if (su.max > maxExpected)
-                {
-                    Assert.fail(String.format("Expected max unit size below 
%.4f, was %.4f", maxExpected, su.max));
-                }
-                // We can't verify lower side range as small loads can't 
always be fixed.
-            }
-        }
-    }
-
-
-    private void updateSummary(ReplicationAwareTokenAllocator<Unit> t, Summary 
su, Summary st, boolean print)
-    {
-        int size = t.sortedTokens.size();
-        double inverseAverage = 1.0 * size / t.strategy.replicas();
-
-        Map<Unit, Double> ownership = evaluateReplicatedOwnership(t);
-        SummaryStatistics unitStat = new SummaryStatistics();
-        for (Map.Entry<Unit, Double> en : ownership.entrySet())
-            unitStat.addValue(en.getValue() * inverseAverage / 
t.unitToTokens.get(en.getKey()).size());
-        su.update(unitStat);
-
-        SummaryStatistics tokenStat = new SummaryStatistics();
-        for (Token tok : t.sortedTokens.keySet())
-            tokenStat.addValue(replicatedTokenOwnership(tok, t.sortedTokens, 
t.strategy) * inverseAverage);
-        st.update(tokenStat);
-
-        if (print)
-        {
-            System.out.format("Size %d(%d)   \tunit %s  token %s   %s\n",
-                              t.unitCount(), size,
-                              mms(unitStat),
-                              mms(tokenStat),
-                              t.strategy);
-            System.out.format("Worst intermediate unit\t%s  token %s\n", su, 
st);
-        }
-    }
-
-
-    private static String mms(SummaryStatistics s)
-    {
-        return String.format("max %.2f min %.2f stddev %.4f", s.getMax(), 
s.getMin(), s.getStandardDeviation());
-    }
-
-
-    int nextUnitId = 0;
-
-    final class Unit implements Comparable<Unit>
-    {
-        int unitId = nextUnitId++;
-
-        public String toString()
-        {
-            return Integer.toString(unitId);
-        }
-
-        @Override
-        public int compareTo(Unit o)
-        {
-            return Integer.compare(unitId, o.unitId);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index 2f083c9..f93ce5c 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -36,6 +36,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import org.apache.commons.lang3.StringUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -67,7 +70,9 @@ import static org.junit.Assert.assertTrue;
 
 public class Util
 {
-    private static List<UUID> hostIdPool = new ArrayList<UUID>();
+    private static final Logger logger = LoggerFactory.getLogger(Util.class);
+
+    private static List<UUID> hostIdPool = new ArrayList<>();
 
     public static IPartitioner testPartitioner()
     {
@@ -601,10 +606,9 @@ public class Util
         AssertionError e = runCatchingAssertionError(test);
         if (e == null)
             return;     // success
-        System.err.format("Test failed. %s%n"
-                        + "Re-running %d times to verify it isn't failing more 
often than it should.%n"
-                        + "Failure was: %s%n", message, rerunsOnFailure, e);
-        e.printStackTrace();
+
+        logger.info("Test failed. {}", message, e);
+        logger.info("Re-running {} times to verify it isn't failing more often 
than it should.", rerunsOnFailure);
 
         int rerunsFailed = 0;
         for (int i = 0; i < rerunsOnFailure; ++i)
@@ -614,15 +618,17 @@ public class Util
             {
                 ++rerunsFailed;
                 e.addSuppressed(t);
+
+                logger.debug("Test failed again, total num failures: {}", 
rerunsFailed, t);
             }
         }
         if (rerunsFailed > 0)
         {
-            System.err.format("Test failed in %d of the %d reruns.%n", 
rerunsFailed, rerunsOnFailure);
+            logger.error("Test failed in {} of the {} reruns.", rerunsFailed, 
rerunsOnFailure);
             throw e;
         }
 
-        System.err.println("All reruns succeeded. Failure treated as flake.");
+        logger.info("All reruns succeeded. Failure treated as flake.");
     }
 
     // for use with Optional in tests, can be used as an argument to 
orElseThrow

Reply via email to