Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.X b98a40605 -> e2a0d75b0
  refs/heads/trunk 250573302 -> 6d40809d8


Implement the NoReplicationTokenAllocator

Patch by Dikang Gu; reviewed by Branimir Lambov for CASSANDRA-12777


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2a0d75b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2a0d75b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2a0d75b

Branch: refs/heads/cassandra-3.X
Commit: e2a0d75b024463ad481333bdae826928b55ac589
Parents: b98a406
Author: Dikang Gu <[email protected]>
Authored: Thu Oct 13 12:31:39 2016 -0700
Committer: Aleksey Yeschenko <[email protected]>
Committed: Tue Oct 25 16:56:07 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/dht/ByteOrderedPartitioner.java   |   5 +
 .../org/apache/cassandra/dht/IPartitioner.java  |   5 +
 .../apache/cassandra/dht/LocalPartitioner.java  |   5 +
 .../cassandra/dht/Murmur3Partitioner.java       |  36 +++
 .../dht/OrderPreservingPartitioner.java         |   5 +
 .../apache/cassandra/dht/RandomPartitioner.java |  37 ++-
 .../NoReplicationTokenAllocator.java            | 266 ++++++++++++++++++
 .../ReplicationAwareTokenAllocator.java         | 250 +----------------
 .../dht/tokenallocator/TokenAllocation.java     |   2 +-
 .../dht/tokenallocator/TokenAllocator.java      |   2 +-
 .../dht/tokenallocator/TokenAllocatorBase.java  | 279 +++++++++++++++++++
 .../tokenallocator/TokenAllocatorFactory.java   |  37 +++
 ...tractReplicationAwareTokenAllocatorTest.java | 175 +-----------
 .../NoReplicationTokenAllocatorTest.java        | 249 +++++++++++++++++
 .../tokenallocator/TokenAllocatorTestBase.java  | 160 +++++++++++
 .../apache/cassandra/dht/LengthPartitioner.java |   5 +
 .../cassandra/dht/Murmur3PartitionerTest.java   |  23 ++
 .../cassandra/dht/PartitionerTestCase.java      |  32 +++
 .../cassandra/dht/RandomPartitionerTest.java    |  26 ++
 20 files changed, 1177 insertions(+), 423 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dea2003..0f16a0e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
  * Use non-token restrictions for bounds when token restrictions are 
overridden (CASSANDRA-12419)
  * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
  * Use different build directories for Eclipse and Ant (CASSANDRA-12466)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java 
b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index d334604..1271a5a 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -158,6 +158,11 @@ public class ByteOrderedPartitioner implements IPartitioner
         return new BytesToken(bytesForBig(midpair.left, sigbytes, 
midpair.right));
     }
 
+    public Token split(Token left, Token right, double ratioToLeft)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Convert a byte array containing the most significant of 'sigbytes' bytes
      * representing a big-endian magnitude into a BigInteger.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java 
b/src/java/org/apache/cassandra/dht/IPartitioner.java
index eb4aafb..e342bd0 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -45,6 +45,11 @@ public interface IPartitioner
     public Token midpoint(Token left, Token right);
 
     /**
+     * Calculate a Token which take approximate 0 <= ratioToLeft <= 1 
ownership of the given range.
+     */
+    public Token split(Token left, Token right, double ratioToLeft);
+
+    /**
      * @return A Token smaller than all others in the range that is being 
partitioned.
      * Not legal to assign to a node or key.  (But legal to use in range 
scans.)
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java 
b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index 9922eb0..0a9b7cb 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -50,6 +50,11 @@ public class LocalPartitioner implements IPartitioner
         throw new UnsupportedOperationException();
     }
 
+    public Token split(Token left, Token right, double ratioToLeft)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public LocalToken getMinimumToken()
     {
         return new LocalToken(ByteBufferUtil.EMPTY_BYTE_BUFFER);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java 
b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 9ed0cca..0f922e3 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -94,6 +94,42 @@ public class Murmur3Partitioner implements IPartitioner
         return new LongToken(midpoint.longValue());
     }
 
+    public Token split(Token lToken, Token rToken, double ratioToLeft)
+    {
+        BigDecimal l = BigDecimal.valueOf(((LongToken) lToken).token),
+                   r = BigDecimal.valueOf(((LongToken) rToken).token),
+                   ratio = BigDecimal.valueOf(ratioToLeft);
+        long newToken;
+
+        if (l.compareTo(r) < 0)
+        {
+            newToken = 
r.subtract(l).multiply(ratio).add(l).toBigInteger().longValue();
+        }
+        else
+        {
+            // wrapping case
+            // L + ((R - min) + (max - L)) * pct
+            BigDecimal max = BigDecimal.valueOf(MAXIMUM);
+            BigDecimal min = BigDecimal.valueOf(MINIMUM.token);
+
+            BigInteger token = 
max.subtract(min).add(r).subtract(l).multiply(ratio).add(l).toBigInteger();
+
+            BigInteger maxToken = BigInteger.valueOf(MAXIMUM);
+
+            if (token.compareTo(maxToken) <= 0)
+            {
+                newToken = token.longValue();
+            }
+            else
+            {
+                // if the value is above maximum
+                BigInteger minToken = BigInteger.valueOf(MINIMUM.token);
+                newToken = minToken.add(token.subtract(maxToken)).longValue();
+            }
+        }
+        return new LongToken(newToken);
+    }
+
     public LongToken getMinimumToken()
     {
         return MINIMUM;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java 
b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index ab552c4..954b0af 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -63,6 +63,11 @@ public class OrderPreservingPartitioner implements 
IPartitioner
         return new StringToken(stringForBig(midpair.left, sigchars, 
midpair.right));
     }
 
+    public Token split(Token left, Token right, double ratioToLeft)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Copies the characters of the given string into a BigInteger.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java 
b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index f6090e0..82c2493 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -78,6 +78,32 @@ public class RandomPartitioner implements IPartitioner
         return new BigIntegerToken(midpair.left);
     }
 
+    public Token split(Token ltoken, Token rtoken, double ratioToLeft)
+    {
+        BigDecimal left = ltoken.equals(MINIMUM) ? BigDecimal.ZERO : new 
BigDecimal(((BigIntegerToken)ltoken).token),
+                   right = rtoken.equals(MINIMUM) ? BigDecimal.ZERO : new 
BigDecimal(((BigIntegerToken)rtoken).token),
+                   ratio = BigDecimal.valueOf(ratioToLeft);
+
+        BigInteger newToken;
+
+        if (left.compareTo(right) < 0)
+        {
+            newToken = 
right.subtract(left).multiply(ratio).add(left).toBigInteger();
+        }
+        else
+        {
+            // wrapping case
+            // L + ((R - min) + (max - L)) * ratio
+            BigDecimal max = new BigDecimal(MAXIMUM);
+
+            newToken = 
max.add(right).subtract(left).multiply(ratio).add(left).toBigInteger().mod(MAXIMUM);
+        }
+
+        assert isValidToken(newToken) : "Invalid tokens from split";
+
+        return new BigIntegerToken(newToken);
+    }
+
     public BigIntegerToken getMinimumToken()
     {
         return MINIMUM;
@@ -99,6 +125,10 @@ public class RandomPartitioner implements IPartitioner
         return new BigIntegerToken(token);
     }
 
+    private boolean isValidToken(BigInteger token) {
+        return token.compareTo(ZERO) >= 0 && token.compareTo(MAXIMUM) <= 0;
+    }
+
     private final Token.TokenFactory tokenFactory = new Token.TokenFactory()
     {
         public ByteBuffer toByteArray(Token token)
@@ -122,11 +152,8 @@ public class RandomPartitioner implements IPartitioner
         {
             try
             {
-                BigInteger i = new BigInteger(token);
-                if (i.compareTo(ZERO) < 0)
-                    throw new ConfigurationException("Token must be >= 0");
-                if (i.compareTo(MAXIMUM) > 0)
-                    throw new ConfigurationException("Token must be <= 
2**127");
+                if(!isValidToken(new BigInteger(token)))
+                    throw new ConfigurationException("Token must be >= 0 and 
<= 2**127");
             }
             catch (NumberFormatException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java
 
b/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java
new file mode 100644
index 0000000..54d80dc
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht.tokenallocator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+
+public class NoReplicationTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
+{
+    PriorityQueue<Weighted<UnitInfo>> sortedUnits = Queues.newPriorityQueue();
+    Map<Unit, PriorityQueue<Weighted<TokenInfo>>> tokensInUnits = 
Maps.newHashMap();
+
+    private static final double MAX_TAKEOVER_RATIO = 0.90;
+    private static final double MIN_TAKEOVER_RATIO = 1.0 - MAX_TAKEOVER_RATIO;
+
+    public NoReplicationTokenAllocator(NavigableMap<Token, Unit> sortedTokens,
+                                       ReplicationStrategy<Unit> strategy,
+                                       IPartitioner partitioner)
+    {
+        super(sortedTokens, strategy, partitioner);
+    }
+
+    /**
+     * Construct the token ring as a CircularList of TokenInfo,
+     * and populate the ownership of the UnitInfo's provided
+     */
+    private TokenInfo<Unit> createTokenInfos(Map<Unit, UnitInfo<Unit>> units)
+    {
+        if (units.isEmpty())
+            return null;
+
+        // build the circular list
+        TokenInfo<Unit> prev = null;
+        TokenInfo<Unit> first = null;
+        for (Map.Entry<Token, Unit> en : sortedTokens.entrySet())
+        {
+            Token t = en.getKey();
+            UnitInfo<Unit> ni = units.get(en.getValue());
+            TokenInfo<Unit> ti = new TokenInfo<>(t, ni);
+            first = ti.insertAfter(first, prev);
+            prev = ti;
+        }
+
+        TokenInfo<Unit> curr = first;
+        tokensInUnits.clear();
+        sortedUnits.clear();
+        do
+        {
+            populateTokenInfoAndAdjustUnit(curr);
+            curr = curr.next;
+        } while (curr != first);
+
+        for (UnitInfo<Unit> unitInfo : units.values())
+        {
+            sortedUnits.add(new Weighted<UnitInfo>(unitInfo.ownership, 
unitInfo));
+        }
+
+        return first;
+    }
+
+    /**
+     * Used in tests.
+     */
+    protected void createTokenInfos()
+    {
+        createTokenInfos(createUnitInfos(Maps.newHashMap()));
+    }
+
+    private void populateTokenInfoAndAdjustUnit(TokenInfo<Unit> token)
+    {
+        token.replicationStart = token.prevInRing().token;
+        token.replicationThreshold = token.token;
+        token.replicatedOwnership = token.replicationStart.size(token.token);
+        token.owningUnit.ownership += token.replicatedOwnership;
+
+        PriorityQueue<Weighted<TokenInfo>> unitTokens = 
tokensInUnits.get(token.owningUnit.unit);
+        if (unitTokens == null)
+        {
+            unitTokens = Queues.newPriorityQueue();
+            tokensInUnits.put(token.owningUnit.unit, unitTokens);
+        }
+        unitTokens.add(new Weighted<TokenInfo>(token.replicatedOwnership, 
token));
+    }
+
+    private Collection<Token> generateRandomTokens(UnitInfo<Unit> newUnit, int 
numTokens, Map<Unit, UnitInfo<Unit>> unitInfos)
+    {
+        Set<Token> tokens = new HashSet<>(numTokens);
+        while (tokens.size() < numTokens)
+        {
+            Token token = partitioner.getRandomToken();
+            if (!sortedTokens.containsKey(token))
+            {
+                tokens.add(token);
+                sortedTokens.put(token, newUnit.unit);
+            }
+        }
+        unitInfos.put(newUnit.unit, newUnit);
+        createTokenInfos(unitInfos);
+        return tokens;
+    }
+
+    public Collection<Token> addUnit(Unit newUnit, int numTokens)
+    {
+        assert !tokensInUnits.containsKey(newUnit);
+
+        Map<Object, GroupInfo> groups = Maps.newHashMap();
+        UnitInfo<Unit> newUnitInfo = new UnitInfo<>(newUnit, 0, groups, 
strategy);
+        Map<Unit, UnitInfo<Unit>> unitInfos = createUnitInfos(groups);
+
+        if (unitInfos.isEmpty())
+            return generateRandomTokens(newUnitInfo, numTokens, unitInfos);
+
+        if (numTokens > sortedTokens.size())
+            return generateRandomTokens(newUnitInfo, numTokens, unitInfos);
+
+        TokenInfo<Unit> head = createTokenInfos(unitInfos);
+
+        // Select the nodes we will work with, extract them from sortedUnits 
and calculate targetAverage
+        double targetAverage = 0.0;
+        double sum = 0.0;
+        List<Weighted<UnitInfo>> unitsToChange = new ArrayList<>();
+
+        for (int i = 0; i < numTokens; i++)
+        {
+            Weighted<UnitInfo> unit = sortedUnits.peek();
+
+            if (unit == null)
+                break;
+
+            sum += unit.weight;
+            double average = sum / (unitsToChange.size() + 2); // unit and 
newUnit must be counted
+            if (unit.weight <= average)
+                // No point to include later nodes, target can only decrease 
from here.
+                break;
+
+            sortedUnits.remove();
+            unitsToChange.add(unit);
+            targetAverage = average;
+        }
+
+        List<Token> newTokens = Lists.newArrayListWithCapacity(numTokens);
+
+        int nr = 0;
+        // calculate the tokens
+        for (Weighted<UnitInfo> unit : unitsToChange)
+        {
+            // TODO: Any better ways to assign how many tokens to change in 
each node?
+            int tokensToChange = numTokens / unitsToChange.size() + (nr < 
numTokens % unitsToChange.size() ? 1 : 0);
+
+            Queue<Weighted<TokenInfo>> unitTokens = 
tokensInUnits.get(unit.value.unit);
+            List<Weighted<TokenInfo>> tokens = 
Lists.newArrayListWithCapacity(tokensToChange);
+
+            double workWeight = 0;
+            // Extract biggest vnodes and calculate how much weight we can 
work with.
+            for (int i = 0; i < tokensToChange; i++)
+            {
+                Weighted<TokenInfo> wt = unitTokens.remove();
+                tokens.add(wt);
+                workWeight += wt.weight;
+                unit.value.ownership -= wt.weight;
+            }
+
+            double toTakeOver = unit.weight - targetAverage;
+            // Split toTakeOver proportionally between the vnodes.
+            for (Weighted<TokenInfo> wt : tokens)
+            {
+                double slice;
+                Token token;
+
+                if (toTakeOver < workWeight)
+                {
+                    // Spread decrease.
+                    slice = toTakeOver / workWeight;
+
+                    if (slice < MIN_TAKEOVER_RATIO)
+                        slice = MIN_TAKEOVER_RATIO;
+                    if (slice > MAX_TAKEOVER_RATIO)
+                        slice = MAX_TAKEOVER_RATIO;
+                }
+                else
+                {
+                    slice = MAX_TAKEOVER_RATIO;
+                }
+                token = partitioner.split(wt.value.prevInRing().token, 
wt.value.token, slice);
+
+                //Token selected, now change all data
+                sortedTokens.put(token, newUnit);
+
+                TokenInfo<Unit> ti = new TokenInfo<>(token, newUnitInfo);
+
+                ti.insertAfter(head, wt.value.prevInRing());
+
+                populateTokenInfoAndAdjustUnit(ti);
+                populateTokenInfoAndAdjustUnit(wt.value);
+                newTokens.add(token);
+            }
+
+            // adjust the weight for current unit
+            sortedUnits.add(new Weighted<>(unit.value.ownership, unit.value));
+            ++nr;
+        }
+        sortedUnits.add(new Weighted<>(newUnitInfo.ownership, newUnitInfo));
+
+        return newTokens;
+    }
+
+    /**
+     * For testing, remove the given unit preserving correct state of the 
allocator.
+     */
+    void removeUnit(Unit n)
+    {
+        Iterator<Weighted<UnitInfo>> it = sortedUnits.iterator();
+        while (it.hasNext())
+        {
+            if (it.next().value.unit.equals(n))
+            {
+                it.remove();
+                break;
+            }
+        }
+
+        PriorityQueue<Weighted<TokenInfo>> tokenInfos = 
tokensInUnits.remove(n);
+        Collection<Token> tokens = 
Lists.newArrayListWithCapacity(tokenInfos.size());
+        for (Weighted<TokenInfo> tokenInfo : tokenInfos)
+        {
+            tokens.add(tokenInfo.value.token);
+        }
+        sortedTokens.keySet().removeAll(tokens);
+    }
+
+    public int getReplicas()
+    {
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
 
b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
index a60be94..87dba59 100644
--- 
a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
+++ 
b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
@@ -36,23 +36,23 @@ import org.apache.cassandra.dht.Token;
  * ownership needs to be evenly distributed. At the moment only nodes as a 
whole are treated as units, but that
  * will change with the introduction of token ranges per disk.
  */
-class ReplicationAwareTokenAllocator<Unit> implements TokenAllocator<Unit>
+class ReplicationAwareTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
 {
-    final NavigableMap<Token, Unit> sortedTokens;
     final Multimap<Unit, Token> unitToTokens;
-    final ReplicationStrategy<Unit> strategy;
-    final IPartitioner partitioner;
     final int replicas;
 
     ReplicationAwareTokenAllocator(NavigableMap<Token, Unit> sortedTokens, 
ReplicationStrategy<Unit> strategy, IPartitioner partitioner)
     {
-        this.sortedTokens = sortedTokens;
+        super(sortedTokens, strategy, partitioner);
         unitToTokens = HashMultimap.create();
         for (Map.Entry<Token, Unit> en : sortedTokens.entrySet())
             unitToTokens.put(en.getValue(), en.getKey());
-        this.strategy = strategy;
         this.replicas = strategy.replicas();
-        this.partitioner = partitioner;
+    }
+
+    public int getReplicas()
+    {
+        return replicas;
     }
 
     public Collection<Token> addUnit(Unit newUnit, int numTokens)
@@ -151,19 +151,6 @@ class ReplicationAwareTokenAllocator<Unit> implements 
TokenAllocator<Unit>
         return tokens;
     }
 
-    private Map<Unit, UnitInfo<Unit>> createUnitInfos(Map<Object, GroupInfo> 
groups)
-    {
-        Map<Unit, UnitInfo<Unit>> map = Maps.newHashMap();
-        for (Unit n : sortedTokens.values())
-        {
-            UnitInfo<Unit> ni = map.get(n);
-            if (ni == null)
-                map.put(n, ni = new UnitInfo<>(n, 0, groups, strategy));
-            ni.tokenCount++;
-        }
-        return map;
-    }
-
     /**
      * Construct the token ring as a CircularList of TokenInfo,
      * and populate the ownership of the UnitInfo's provided
@@ -506,19 +493,6 @@ class ReplicationAwareTokenAllocator<Unit> implements 
TokenAllocator<Unit>
         }
     }
 
-    private Map.Entry<Token, Unit> mapEntryFor(Token t)
-    {
-        Map.Entry<Token, Unit> en = sortedTokens.floorEntry(t);
-        if (en == null)
-            en = sortedTokens.lastEntry();
-        return en;
-    }
-
-    Unit unitFor(Token t)
-    {
-        return mapEntryFor(t).getValue();
-    }
-
     private double optimalTokenOwnership(int tokensToAdd)
     {
         return 1.0 * replicas / (sortedTokens.size() + tokensToAdd);
@@ -554,7 +528,7 @@ class ReplicationAwareTokenAllocator<Unit> implements 
TokenAllocator<Unit>
         sortedTokens.keySet().removeAll(tokens);
     }
 
-    int unitCount()
+    public int unitCount()
     {
         return unitToTokens.asMap().size();
     }
@@ -564,189 +538,6 @@ class ReplicationAwareTokenAllocator<Unit> implements 
TokenAllocator<Unit>
         return getClass().getSimpleName();
     }
 
-    // get or initialise the shared GroupInfo associated with the unit
-    private static <Unit> GroupInfo getGroup(Unit unit, Map<Object, GroupInfo> 
groupMap, ReplicationStrategy<Unit> strategy)
-    {
-        Object groupClass = strategy.getGroup(unit);
-        GroupInfo group = groupMap.get(groupClass);
-        if (group == null)
-            groupMap.put(groupClass, group = new GroupInfo(groupClass));
-        return group;
-    }
-
-    /**
-     * Unique group object that one or more UnitInfo objects link to.
-     */
-    private static class GroupInfo
-    {
-        /**
-         * Group identifier given by ReplicationStrategy.getGroup(Unit).
-         */
-        final Object group;
-
-        /**
-         * Seen marker. When non-null, the group is already seen in 
replication walks.
-         * Also points to previous seen group to enable walking the seen 
groups and clearing the seen markers.
-         */
-        GroupInfo prevSeen = null;
-        /**
-         * Same marker/chain used by populateTokenInfo.
-         */
-        GroupInfo prevPopulate = null;
-
-        /**
-         * Value used as terminator for seen chains.
-         */
-        static GroupInfo TERMINATOR = new GroupInfo(null);
-
-        public GroupInfo(Object group)
-        {
-            this.group = group;
-        }
-
-        public String toString()
-        {
-            return group.toString() + (prevSeen != null ? "*" : "");
-        }
-    }
-
-    /**
-     * Unit information created and used by ReplicationAwareTokenDistributor. 
Contained vnodes all point to the same
-     * instance.
-     */
-    static class UnitInfo<Unit>
-    {
-        final Unit unit;
-        final GroupInfo group;
-        double ownership;
-        int tokenCount;
-
-        /**
-         * During evaluateImprovement this is used to form a chain of units 
affected by the candidate insertion.
-         */
-        UnitInfo<Unit> prevUsed;
-        /**
-         * During evaluateImprovement this holds the ownership after the 
candidate insertion.
-         */
-        double adjustedOwnership;
-
-        private UnitInfo(Unit unit, GroupInfo group)
-        {
-            this.unit = unit;
-            this.group = group;
-            this.tokenCount = 0;
-        }
-
-        public UnitInfo(Unit unit, double ownership, Map<Object, GroupInfo> 
groupMap, ReplicationStrategy<Unit> strategy)
-        {
-            this(unit, getGroup(unit, groupMap, strategy));
-            this.ownership = ownership;
-        }
-
-        public String toString()
-        {
-            return String.format("%s%s(%.2e)%s",
-                    unit, unit == group.group ? (group.prevSeen != null ? "*" 
: "") : ":" + group.toString(),
-                    ownership, prevUsed != null ? (prevUsed == this ? "#" : 
"->" + prevUsed.toString()) : "");
-        }
-    }
-
-    private static class CircularList<T extends CircularList<T>>
-    {
-        T prev;
-        T next;
-
-        /**
-         * Inserts this after unit in the circular list which starts at head. 
Returns the new head of the list, which
-         * only changes if head was null.
-         */
-        @SuppressWarnings("unchecked")
-        T insertAfter(T head, T unit)
-        {
-            if (head == null)
-            {
-                return prev = next = (T) this;
-            }
-            assert unit != null;
-            assert unit.next != null;
-            prev = unit;
-            next = unit.next;
-            prev.next = (T) this;
-            next.prev = (T) this;
-            return head;
-        }
-
-        /**
-         * Removes this from the list that starts at head. Returns the new 
head of the list, which only changes if the
-         * head was removed.
-         */
-        T removeFrom(T head)
-        {
-            next.prev = prev;
-            prev.next = next;
-            return this == head ? (this == next ? null : next) : head;
-        }
-    }
-
-    private static class BaseTokenInfo<Unit, T extends BaseTokenInfo<Unit, T>> 
extends CircularList<T>
-    {
-        final Token token;
-        final UnitInfo<Unit> owningUnit;
-
-        /**
-         * Start of the replication span for the vnode, i.e. the first token 
of the RF'th group seen before the token.
-         * The replicated ownership of the unit is the range between {@code 
replicationStart} and {@code token}.
-         */
-        Token replicationStart;
-        /**
-         * The closest position that the new candidate can take to become the 
new replication start. If candidate is
-         * closer, the start moves to this position. Used to determine 
replicationStart after insertion of new token.
-         *
-         * Usually the RF minus one boundary, i.e. the first token of the 
RF-1'th group seen before the token.
-         */
-        Token replicationThreshold;
-        /**
-         * Current replicated ownership. This number is reflected in the 
owning unit's ownership.
-         */
-        double replicatedOwnership = 0;
-
-        public BaseTokenInfo(Token token, UnitInfo<Unit> owningUnit)
-        {
-            this.token = token;
-            this.owningUnit = owningUnit;
-        }
-
-        public String toString()
-        {
-            return String.format("%s(%s)", token, owningUnit);
-        }
-
-        /**
-         * Previous unit in the token ring. For existing tokens this is prev,
-         * for candidates it's "split".
-         */
-        TokenInfo<Unit> prevInRing()
-        {
-            return null;
-        }
-    }
-
-    /**
-     * TokenInfo about existing tokens/vnodes.
-     */
-    private static class TokenInfo<Unit> extends BaseTokenInfo<Unit, 
TokenInfo<Unit>>
-    {
-        public TokenInfo(Token token, UnitInfo<Unit> owningUnit)
-        {
-            super(token, owningUnit);
-        }
-
-        TokenInfo<Unit> prevInRing()
-        {
-            return prev;
-        }
-    }
-
     /**
      * TokenInfo about candidate new tokens/vnodes.
      */
@@ -776,30 +567,5 @@ class ReplicationAwareTokenAllocator<Unit> implements 
TokenAllocator<Unit>
             token = token.next;
         } while (token != null && token != tokens);
     }
-
-    static class Weighted<T> implements Comparable<Weighted<T>>
-    {
-        final double weight;
-        final T value;
-
-        public Weighted(double weight, T value)
-        {
-            this.weight = weight;
-            this.value = value;
-        }
-
-        @Override
-        public int compareTo(Weighted<T> o)
-        {
-            int cmp = Double.compare(o.weight, this.weight);
-            return cmp;
-        }
-
-        @Override
-        public String toString()
-        {
-            return String.format("%s<%s>", value, weight);
-        }
-    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java 
b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index ebbd690..36824a1 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -147,7 +147,7 @@ public class TokenAllocation
             if (strategy.inAllocationRing(en.getValue()))
                 sortedTokens.put(en.getKey(), en.getValue());
         }
-        return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, 
tokenMetadata.partitioner);
+        return TokenAllocatorFactory.createTokenAllocator(sortedTokens, 
strategy, tokenMetadata.partitioner);
     }
 
     interface StrategyAdapter extends ReplicationStrategy<InetAddress>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java 
b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java
index 580f2ec..2eb9a4c 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java
@@ -23,5 +23,5 @@ import org.apache.cassandra.dht.Token;
 
 public interface TokenAllocator<Unit>
 {
-    public Collection<Token> addUnit(Unit newUnit, int numTokens);
+    Collection<Token> addUnit(Unit newUnit, int numTokens);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorBase.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorBase.java 
b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorBase.java
new file mode 100644
index 0000000..f59bfd4
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorBase.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht.tokenallocator;
+
+import java.util.Map;
+import java.util.NavigableMap;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+
+public abstract class TokenAllocatorBase<Unit> implements TokenAllocator<Unit>
+{
+    final NavigableMap<Token, Unit> sortedTokens;
+    final ReplicationStrategy<Unit> strategy;
+    final IPartitioner partitioner;
+
+    protected TokenAllocatorBase(NavigableMap<Token, Unit> sortedTokens,
+                             ReplicationStrategy<Unit> strategy,
+                             IPartitioner partitioner)
+    {
+        this.sortedTokens = sortedTokens;
+        this.strategy = strategy;
+        this.partitioner = partitioner;
+    }
+
+    public abstract int getReplicas();
+
+    protected Map<Unit, UnitInfo<Unit>> createUnitInfos(Map<Object, GroupInfo> 
groups)
+    {
+        Map<Unit, UnitInfo<Unit>> map = Maps.newHashMap();
+        for (Unit n : sortedTokens.values())
+        {
+            UnitInfo<Unit> ni = map.get(n);
+            if (ni == null)
+                map.put(n, ni = new UnitInfo<>(n, 0, groups, strategy));
+            ni.tokenCount++;
+        }
+        return map;
+    }
+
+    private Map.Entry<Token, Unit> mapEntryFor(Token t)
+    {
+        Map.Entry<Token, Unit> en = sortedTokens.floorEntry(t);
+        if (en == null)
+            en = sortedTokens.lastEntry();
+        return en;
+    }
+
+    Unit unitFor(Token t)
+    {
+        return mapEntryFor(t).getValue();
+    }
+
+    // get or initialise the shared GroupInfo associated with the unit
+    private static <Unit> GroupInfo getGroup(Unit unit, Map<Object, GroupInfo> 
groupMap, ReplicationStrategy<Unit> strategy)
+    {
+        Object groupClass = strategy.getGroup(unit);
+        GroupInfo group = groupMap.get(groupClass);
+        if (group == null)
+            groupMap.put(groupClass, group = new GroupInfo(groupClass));
+        return group;
+    }
+
+    /**
+     * Unique group object that one or more UnitInfo objects link to.
+     */
+    static class GroupInfo
+    {
+        /**
+         * Group identifier given by ReplicationStrategy.getGroup(Unit).
+         */
+        final Object group;
+
+        /**
+         * Seen marker. When non-null, the group is already seen in 
replication walks.
+         * Also points to previous seen group to enable walking the seen 
groups and clearing the seen markers.
+         */
+        GroupInfo prevSeen = null;
+        /**
+         * Same marker/chain used by populateTokenInfo.
+         */
+        GroupInfo prevPopulate = null;
+
+        /**
+         * Value used as terminator for seen chains.
+         */
+        static GroupInfo TERMINATOR = new GroupInfo(null);
+
+        public GroupInfo(Object group)
+        {
+            this.group = group;
+        }
+
+        public String toString()
+        {
+            return group.toString() + (prevSeen != null ? "*" : "");
+        }
+    }
+
+    /**
+     * Unit information created and used by ReplicationAwareTokenDistributor. 
Contained vnodes all point to the same
+     * instance.
+     */
+    static class UnitInfo<Unit>
+    {
+        final Unit unit;
+        final GroupInfo group;
+        double ownership;
+        int tokenCount;
+
+        /**
+         * During evaluateImprovement this is used to form a chain of units 
affected by the candidate insertion.
+         */
+        UnitInfo<Unit> prevUsed;
+        /**
+         * During evaluateImprovement this holds the ownership after the 
candidate insertion.
+         */
+        double adjustedOwnership;
+
+        private UnitInfo(Unit unit, GroupInfo group)
+        {
+            this.unit = unit;
+            this.group = group;
+            this.tokenCount = 0;
+        }
+
+        public UnitInfo(Unit unit, double ownership, Map<Object, GroupInfo> 
groupMap, ReplicationStrategy<Unit> strategy)
+        {
+            this(unit, getGroup(unit, groupMap, strategy));
+            this.ownership = ownership;
+        }
+
+        public String toString()
+        {
+            return String.format("%s%s(%.2e)%s",
+                                 unit, unit == group.group ? (group.prevSeen 
!= null ? "*" : "") : ":" + group.toString(),
+                                 ownership, prevUsed != null ? (prevUsed == 
this ? "#" : "->" + prevUsed.toString()) : "");
+        }
+    }
+
+    private static class CircularList<T extends CircularList<T>>
+    {
+        T prev;
+        T next;
+
+        /**
+         * Inserts this after unit in the circular list which starts at head. 
Returns the new head of the list, which
+         * only changes if head was null.
+         */
+        @SuppressWarnings("unchecked")
+        T insertAfter(T head, T unit)
+        {
+            if (head == null)
+            {
+                return prev = next = (T) this;
+            }
+            assert unit != null;
+            assert unit.next != null;
+            prev = unit;
+            next = unit.next;
+            prev.next = (T) this;
+            next.prev = (T) this;
+            return head;
+        }
+
+        /**
+         * Removes this from the list that starts at head. Returns the new 
head of the list, which only changes if the
+         * head was removed.
+         */
+        T removeFrom(T head)
+        {
+            next.prev = prev;
+            prev.next = next;
+            return this == head ? (this == next ? null : next) : head;
+        }
+    }
+
+    static class BaseTokenInfo<Unit, T extends BaseTokenInfo<Unit, T>> extends 
CircularList<T>
+    {
+        final Token token;
+        final UnitInfo<Unit> owningUnit;
+
+        /**
+         * Start of the replication span for the vnode, i.e. the first token 
of the RF'th group seen before the token.
+         * The replicated ownership of the unit is the range between {@code 
replicationStart} and {@code token}.
+         */
+        Token replicationStart;
+        /**
+         * The closest position that the new candidate can take to become the 
new replication start. If candidate is
+         * closer, the start moves to this position. Used to determine 
replicationStart after insertion of new token.
+         *
+         * Usually the RF minus one boundary, i.e. the first token of the 
RF-1'th group seen before the token.
+         */
+        Token replicationThreshold;
+        /**
+         * Current replicated ownership. This number is reflected in the 
owning unit's ownership.
+         */
+        double replicatedOwnership = 0;
+
+        public BaseTokenInfo(Token token, UnitInfo<Unit> owningUnit)
+        {
+            this.token = token;
+            this.owningUnit = owningUnit;
+        }
+
+        public String toString()
+        {
+            return String.format("%s(%s)", token, owningUnit);
+        }
+
+        /**
+         * Previous unit in the token ring. For existing tokens this is prev,
+         * for candidates it's "split".
+         */
+        TokenInfo<Unit> prevInRing()
+        {
+            return null;
+        }
+    }
+
+    /**
+     * TokenInfo about existing tokens/vnodes.
+     */
+    static class TokenInfo<Unit> extends BaseTokenInfo<Unit, TokenInfo<Unit>>
+    {
+        public TokenInfo(Token token, UnitInfo<Unit> owningUnit)
+        {
+            super(token, owningUnit);
+        }
+
+        TokenInfo<Unit> prevInRing()
+        {
+            return prev;
+        }
+    }
+
+    static class Weighted<T> implements Comparable<Weighted<T>>
+    {
+        final double weight;
+        final T value;
+
+        public Weighted(double weight, T value)
+        {
+            this.weight = weight;
+            this.value = value;
+        }
+
+        @Override
+        public int compareTo(Weighted<T> o)
+        {
+            int cmp = Double.compare(o.weight, this.weight);
+            return cmp;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s<%s>", value, weight);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java 
b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
new file mode 100644
index 0000000..f8c972d
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht.tokenallocator;
+
+import java.net.InetAddress;
+import java.util.NavigableMap;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+
+public class TokenAllocatorFactory
+{
+    public static TokenAllocator<InetAddress> 
createTokenAllocator(NavigableMap<Token, InetAddress> sortedTokens,
+                                                     
ReplicationStrategy<InetAddress> strategy,
+                                                     IPartitioner partitioner)
+    {
+        if(strategy.replicas() == 1)
+            return new NoReplicationTokenAllocator<>(sortedTokens, strategy, 
partitioner);
+        return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, 
partitioner);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
 
b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
index 80e980a..eb79f12 100644
--- 
a/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
+++ 
b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
@@ -36,82 +36,8 @@ import org.apache.cassandra.dht.Token;
  * we need to separate classes to avoid timeous in case flaky tests need to be 
repeated, see CASSANDRA-12784.
  */
 @Ignore
-abstract class AbstractReplicationAwareTokenAllocatorTest
+abstract class AbstractReplicationAwareTokenAllocatorTest extends 
TokenAllocatorTestBase
 {
-    private static final int TARGET_CLUSTER_SIZE = 250;
-
-    interface TestReplicationStrategy extends ReplicationStrategy<Unit>
-    {
-        void addUnit(Unit n);
-
-        void removeUnit(Unit n);
-
-        /**
-         * Returns a list of all replica units for given token.
-         */
-        List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens);
-
-        /**
-         * Returns the start of the token span that is replicated in this 
token.
-         * Note: Though this is not trivial to see, the replicated span is 
always contiguous. A token in the same
-         * group acts as a barrier; if one is not found the token replicates 
everything up to the replica'th distinct
-         * group seen in front of it.
-         */
-        Token replicationStart(Token token, Unit unit, NavigableMap<Token, 
Unit> sortedTokens);
-
-        /**
-         * Multiplier for the acceptable disbalance in the cluster. With some 
strategies it is harder to achieve good
-         * results.
-         */
-        public double spreadExpectation();
-    }
-
-    static class NoReplicationStrategy implements TestReplicationStrategy
-    {
-        public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens)
-        {
-            return 
Collections.singletonList(sortedTokens.ceilingEntry(token).getValue());
-        }
-
-        public Token replicationStart(Token token, Unit unit, 
NavigableMap<Token, Unit> sortedTokens)
-        {
-            return sortedTokens.lowerKey(token);
-        }
-
-        public String toString()
-        {
-            return "No replication";
-        }
-
-        public void addUnit(Unit n)
-        {
-        }
-
-        public void removeUnit(Unit n)
-        {
-        }
-
-        public int replicas()
-        {
-            return 1;
-        }
-
-        public boolean sameGroup(Unit n1, Unit n2)
-        {
-            return false;
-        }
-
-        public Object getGroup(Unit unit)
-        {
-            return unit;
-        }
-
-        public double spreadExpectation()
-        {
-            return 1;
-        }
-    }
-
     static class SimpleReplicationStrategy implements TestReplicationStrategy
     {
         int replicas;
@@ -455,60 +381,6 @@ abstract class AbstractReplicationAwareTokenAllocatorTest
         return ts.replicationStart(token, sortedTokens.get(token), 
sortedTokens).size(next);
     }
 
-    static interface TokenCount
-    {
-        int tokenCount(int perUnitCount, Random rand);
-
-        double spreadExpectation();
-    }
-
-    static TokenCount fixedTokenCount = new TokenCount()
-    {
-        public int tokenCount(int perUnitCount, Random rand)
-        {
-            return perUnitCount;
-        }
-
-        public double spreadExpectation()
-        {
-            return 4;  // High tolerance to avoid flakiness.
-        }
-    };
-
-    static TokenCount varyingTokenCount = new TokenCount()
-    {
-        public int tokenCount(int perUnitCount, Random rand)
-        {
-            if (perUnitCount == 1) return 1;
-            // 25 to 175%
-            return rand.nextInt(perUnitCount * 3 / 2) + (perUnitCount + 3) / 4;
-        }
-
-        public double spreadExpectation()
-        {
-            return 8;  // High tolerance to avoid flakiness.
-        }
-    };
-
-    Random seededRand = new Random(2);
-
-    private void random(Map<Token, Unit> map, TestReplicationStrategy rs,
-                        int unitCount, TokenCount tc, int perUnitCount, 
IPartitioner partitioner)
-    {
-        System.out.format("\nRandom generation of %d units with %d tokens 
each\n", unitCount, perUnitCount);
-        Random rand = seededRand;
-        for (int i = 0; i < unitCount; i++)
-        {
-            Unit unit = new Unit();
-            rs.addUnit(unit);
-            int tokens = tc.tokenCount(perUnitCount, rand);
-            for (int j = 0; j < tokens; j++)
-            {
-                map.put(partitioner.getRandomToken(rand), unit);
-            }
-        }
-    }
-
     protected void testExistingCluster(IPartitioner partitioner, int 
maxVNodeCount)
     {
         for (int rf = 1; rf <= 5; ++rf)
@@ -607,25 +479,6 @@ abstract class AbstractReplicationAwareTokenAllocatorTest
         grow(t, fullCount, tc, perUnitCount, true);
     }
 
-    static class Summary
-    {
-        double min = 1;
-        double max = 1;
-        double stddev = 0;
-
-        void update(SummaryStatistics stat)
-        {
-            min = Math.min(min, stat.getMin());
-            max = Math.max(max, stat.getMax());
-            stddev = Math.max(stddev, stat.getStandardDeviation());
-        }
-
-        public String toString()
-        {
-            return String.format("max %.2f min %.2f stddev %.4f", max, min, 
stddev);
-        }
-    }
-
     public void grow(ReplicationAwareTokenAllocator<Unit> t, int 
targetClusterSize, TokenCount tc, int perUnitCount, boolean verifyMetrics)
     {
         int size = t.unitCount();
@@ -661,7 +514,6 @@ abstract class AbstractReplicationAwareTokenAllocatorTest
         }
     }
 
-
     private void updateSummary(ReplicationAwareTokenAllocator<Unit> t, Summary 
su, Summary st, boolean print)
     {
         int size = t.sortedTokens.size();
@@ -688,29 +540,4 @@ abstract class AbstractReplicationAwareTokenAllocatorTest
             System.out.format("Worst intermediate unit\t%s  token %s\n", su, 
st);
         }
     }
-
-
-    private static String mms(SummaryStatistics s)
-    {
-        return String.format("max %.2f min %.2f stddev %.4f", s.getMax(), 
s.getMin(), s.getStandardDeviation());
-    }
-
-
-    int nextUnitId = 0;
-
-    final class Unit implements Comparable<Unit>
-    {
-        int unitId = nextUnitId++;
-
-        public String toString()
-        {
-            return Integer.toString(unitId);
-        }
-
-        @Override
-        public int compareTo(Unit o)
-        {
-            return Integer.compare(unitId, o.unitId);
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
 
b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
new file mode 100644
index 0000000..fdcc6b8
--- /dev/null
+++ 
b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht.tokenallocator;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.PriorityQueue;
+import java.util.Random;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+
+public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase
+{
+
+    @Test
+    public void testNewClusterWithMurmur3Partitioner()
+    {
+        testNewCluster(new Murmur3Partitioner());
+    }
+
+    @Test
+    public void testNewClusterWithRandomPartitioner()
+    {
+        testNewCluster(new RandomPartitioner());
+    }
+
+    private void testNewCluster(IPartitioner partitioner)
+    {
+        for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; 
perUnitCount *= 4)
+        {
+            testNewCluster(perUnitCount, fixedTokenCount, new 
NoReplicationStrategy(), partitioner);
+        }
+    }
+
+    public void testNewCluster(int perUnitCount, TokenCount tc, 
NoReplicationStrategy rs, IPartitioner partitioner)
+    {
+        System.out.println("Testing new cluster, target " + perUnitCount + " 
vnodes, replication " + rs);
+        final int targetClusterSize = TARGET_CLUSTER_SIZE;
+        NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap();
+
+        NoReplicationTokenAllocator<Unit> t = new 
NoReplicationTokenAllocator<Unit>(tokenMap, rs, partitioner);
+        grow(t, targetClusterSize * 2 / 5, tc, perUnitCount, false);
+        grow(t, targetClusterSize, tc, perUnitCount, true);
+        System.out.println();
+    }
+
+    @Test
+    public void testExistingClusterWithMurmur3Partitioner()
+    {
+        testExistingCluster(new Murmur3Partitioner());
+    }
+
+    @Test
+    public void testExistingClusterWithRandomPartitioner()
+    {
+        testExistingCluster(new RandomPartitioner());
+    }
+
+    private void testExistingCluster(IPartitioner partitioner)
+    {
+        for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; 
perUnitCount *= 4)
+        {
+            testExistingCluster(perUnitCount, fixedTokenCount, new 
NoReplicationStrategy(), partitioner);
+        }
+    }
+
+    public NoReplicationTokenAllocator<Unit> 
randomWithTokenAllocator(NavigableMap<Token, Unit> map, NoReplicationStrategy 
rs,
+                                                                      int 
unitCount, TokenCount tc, int perUnitCount,
+                                                                      
IPartitioner partitioner)
+    {
+        super.random(map, rs, unitCount, tc, perUnitCount, partitioner);
+        NoReplicationTokenAllocator<Unit> t = new 
NoReplicationTokenAllocator<Unit>(map, rs, partitioner);
+        t.createTokenInfos();
+        return t;
+    }
+
+    public void testExistingCluster(int perUnitCount, TokenCount tc, 
NoReplicationStrategy rs, IPartitioner partitioner)
+    {
+        System.out.println("Testing existing cluster, target " + perUnitCount 
+ " vnodes, replication " + rs);
+        final int targetClusterSize = TARGET_CLUSTER_SIZE;
+        NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap();
+        NoReplicationTokenAllocator<Unit> t = 
randomWithTokenAllocator(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount, 
partitioner);
+        updateSummaryBeforeGrow(t);
+
+        grow(t, targetClusterSize * 9 / 10, tc, perUnitCount, false);
+        grow(t, targetClusterSize, tc, perUnitCount, true);
+        loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount, 
partitioner);
+        System.out.println();
+    }
+
+    private void loseAndReplace(NoReplicationTokenAllocator<Unit> t, int 
howMany,
+                                TokenCount tc, int perUnitCount, IPartitioner 
partitioner)
+    {
+        int fullCount = t.sortedUnits.size();
+        System.out.format("Losing %d units. ", howMany);
+        for (int i = 0; i < howMany; ++i)
+        {
+            Unit u = t.unitFor(partitioner.getRandomToken(seededRand));
+            t.removeUnit(u);
+        }
+        // Grow half without verifying.
+        grow(t, (t.sortedUnits.size() + fullCount * 3) / 4, tc, perUnitCount, 
false);
+        // Metrics should be back to normal by now. Check that they remain so.
+        grow(t, fullCount, tc, perUnitCount, true);
+    }
+
+    private void updateSummaryBeforeGrow(NoReplicationTokenAllocator<Unit> t)
+    {
+        Summary su = new Summary();
+        Summary st = new Summary();
+        System.out.println("Before growing cluster: ");
+        updateSummary(t, su, st, true);
+    }
+
+    private void grow(NoReplicationTokenAllocator<Unit> t, int 
targetClusterSize, TokenCount tc, int perUnitCount, boolean verifyMetrics)
+    {
+        int size = t.sortedUnits.size();
+        Summary su = new Summary();
+        Summary st = new Summary();
+        Random rand = new Random(targetClusterSize + perUnitCount);
+        TestReplicationStrategy strategy = (TestReplicationStrategy) 
t.strategy;
+        if (size < targetClusterSize)
+        {
+            System.out.format("Adding %d unit(s) using %s...", 
targetClusterSize - size, t.toString());
+            long time = System.currentTimeMillis();
+
+            while (size < targetClusterSize)
+            {
+                int num_tokens = tc.tokenCount(perUnitCount, rand);
+                Unit unit = new Unit();
+                t.addUnit(unit, num_tokens);
+                ++size;
+                if (verifyMetrics)
+                    updateSummary(t, su, st, false);
+            }
+            System.out.format(" Done in %.3fs\n", (System.currentTimeMillis() 
- time) / 1000.0);
+
+            if (verifyMetrics)
+            {
+                updateSummary(t, su, st, true);
+                double maxExpected = 1.0 + tc.spreadExpectation() * 
strategy.spreadExpectation() / perUnitCount;
+                if (su.max > maxExpected)
+                {
+                    Assert.fail(String.format("Expected max unit size below 
%.4f, was %.4f", maxExpected, su.max));
+                }
+            }
+        }
+    }
+
+    private void updateSummary(NoReplicationTokenAllocator<Unit> t, Summary 
su, Summary st, boolean print)
+    {
+        int size = t.sortedTokens.size();
+
+        SummaryStatistics unitStat = new SummaryStatistics();
+        for (TokenAllocatorBase.Weighted<TokenAllocatorBase.UnitInfo> wu : 
t.sortedUnits)
+        {
+            unitStat.addValue(wu.weight * size / 
t.tokensInUnits.get(wu.value.unit).size());
+        }
+        su.update(unitStat);
+
+        SummaryStatistics tokenStat = new SummaryStatistics();
+        for 
(PriorityQueue<TokenAllocatorBase.Weighted<TokenAllocatorBase.TokenInfo>> 
tokens : t.tokensInUnits.values())
+        {
+            for (TokenAllocatorBase.Weighted<TokenAllocatorBase.TokenInfo> 
token : tokens)
+            {
+                tokenStat.addValue(token.weight);
+            }
+        }
+        st.update(tokenStat);
+
+        if (print)
+        {
+            System.out.format("Size %d(%d)   \tunit %s  token %s   %s\n",
+                              t.sortedUnits.size(), size,
+                              mms(unitStat),
+                              mms(tokenStat),
+                              t.strategy);
+            System.out.format("Worst intermediate unit\t%s  token %s\n", su, 
st);
+        }
+    }
+
+    static class NoReplicationStrategy implements TestReplicationStrategy
+    {
+        public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens)
+        {
+            return 
Collections.singletonList(sortedTokens.ceilingEntry(token).getValue());
+        }
+
+        public Token replicationStart(Token token, Unit unit, 
NavigableMap<Token, Unit> sortedTokens)
+        {
+            return sortedTokens.lowerKey(token);
+        }
+
+        public String toString()
+        {
+            return "No replication";
+        }
+
+        public void addUnit(Unit n)
+        {
+        }
+
+        public void removeUnit(Unit n)
+        {
+        }
+
+        public int replicas()
+        {
+            return 1;
+        }
+
+        public Object getGroup(Unit unit)
+        {
+            return unit;
+        }
+
+        public double spreadExpectation()
+        {
+            return 1;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java 
b/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java
new file mode 100644
index 0000000..8612ac1
--- /dev/null
+++ 
b/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht.tokenallocator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Base class for {@link NoReplicationTokenAllocatorTest} and {@link 
AbstractReplicationAwareTokenAllocatorTest},
+ */
+abstract class TokenAllocatorTestBase
+{
+    protected static final int TARGET_CLUSTER_SIZE = 250;
+    protected static final int MAX_VNODE_COUNT = 64;
+
+    interface TestReplicationStrategy extends ReplicationStrategy<Unit>
+    {
+        void addUnit(Unit n);
+
+        void removeUnit(Unit n);
+
+        /**
+         * Returns a list of all replica units for given token.
+         */
+        List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> 
sortedTokens);
+
+        /**
+         * Returns the start of the token span that is replicated in this 
token.
+         * Note: Though this is not trivial to see, the replicated span is 
always contiguous. A token in the same
+         * group acts as a barrier; if one is not found the token replicates 
everything up to the replica'th distinct
+         * group seen in front of it.
+         */
+        Token replicationStart(Token token, Unit unit, NavigableMap<Token, 
Unit> sortedTokens);
+
+        /**
+         * Multiplier for the acceptable disbalance in the cluster. With some 
strategies it is harder to achieve good
+         * results.
+         */
+        double spreadExpectation();
+    }
+
+    interface TokenCount
+    {
+        int tokenCount(int perUnitCount, Random rand);
+
+        double spreadExpectation();
+    }
+
+    TokenCount fixedTokenCount = new TokenCount()
+    {
+        public int tokenCount(int perUnitCount, Random rand)
+        {
+            return perUnitCount;
+        }
+
+        public double spreadExpectation()
+        {
+            return 4;  // High tolerance to avoid flakiness.
+        }
+    };
+
+    TokenCount varyingTokenCount = new TokenCount()
+    {
+        public int tokenCount(int perUnitCount, Random rand)
+        {
+            if (perUnitCount == 1) return 1;
+            // 25 to 175%
+            return rand.nextInt(perUnitCount * 3 / 2) + (perUnitCount + 3) / 4;
+        }
+
+        public double spreadExpectation()
+        {
+            return 8;  // High tolerance to avoid flakiness.
+        }
+    };
+
+    Random seededRand = new Random(2);
+
+    public void random(Map<Token, Unit> map, TestReplicationStrategy rs,
+                              int unitCount, TokenCount tc, int perUnitCount, 
IPartitioner partitioner)
+    {
+        System.out.format("\nRandom generation of %d units with %d tokens 
each\n", unitCount, perUnitCount);
+        Random rand = seededRand;
+        for (int i = 0; i < unitCount; i++)
+        {
+            Unit unit = new Unit();
+            rs.addUnit(unit);
+            int tokens = tc.tokenCount(perUnitCount, rand);
+            for (int j = 0; j < tokens; j++)
+            {
+                map.put(partitioner.getRandomToken(rand), unit);
+            }
+        }
+    }
+
+    public String mms(SummaryStatistics s)
+    {
+        return String.format("max %.2f min %.2f stddev %.4f", s.getMax(), 
s.getMin(), s.getStandardDeviation());
+    }
+
+    class Summary
+    {
+        double min = 1;
+        double max = 1;
+        double stddev = 0;
+
+        void update(SummaryStatistics stat)
+        {
+            min = Math.min(min, stat.getMin());
+            max = Math.max(max, stat.getMax());
+            stddev = Math.max(stddev, stat.getStandardDeviation());
+        }
+
+        public String toString()
+        {
+            return String.format("max %.2f min %.2f stddev %.4f", max, min, 
stddev);
+        }
+    }
+
+    int nextUnitId = 0;
+
+    final class Unit implements Comparable<Unit>
+    {
+        int unitId = nextUnitId++;
+
+        public String toString()
+        {
+            return Integer.toString(unitId);
+        }
+
+        @Override
+        public int compareTo(Unit o)
+        {
+            return Integer.compare(unitId, o.unitId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java 
b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
index 87ba741..97f2dcc 100644
--- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
+++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
@@ -57,6 +57,11 @@ public class LengthPartitioner implements IPartitioner
         return new BigIntegerToken(midpair.left);
     }
 
+    public Token split(Token left, Token right, double ratioToLeft)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public BigIntegerToken getMinimumToken()
     {
         return MINIMUM;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java 
b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
index 19aba40..ad81f7f 100644
--- a/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
+++ b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.dht;
 
+import org.junit.Test;
+
 public class Murmur3PartitionerTest extends PartitionerTestCase
 {
     public void initPartitioner()
@@ -34,5 +36,26 @@ public class Murmur3PartitionerTest extends 
PartitionerTestCase
         assertMidpoint(mintoken, mintoken, 62);
         assertMidpoint(tok("a"), mintoken, 16);
     }
+
+    @Test
+    public void testSplit()
+    {
+        assertSplit(tok("a"), tok("b"), 16);
+        assertSplit(tok("a"), tok("bbb"), 16);
+    }
+
+    @Test
+    public void testSplitWrapping()
+    {
+        assertSplit(tok("b"), tok("a"), 16);
+        assertSplit(tok("bbb"), tok("a"), 16);
+    }
+
+    @Test
+    public void testSplitExceedMaximumCase()
+    {
+        Murmur3Partitioner.LongToken left = new 
Murmur3Partitioner.LongToken(Long.MAX_VALUE - 100);
+        assertSplit(left, tok("a"), 16);
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java 
b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
index 30b773b..c411695 100644
--- a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
+++ b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
@@ -118,6 +118,38 @@ public abstract class PartitionerTestCase
         assertMidpoint(tok("bbb"), tok("a"), 16);
     }
 
+    /**
+     * Test split token ranges
+     */
+    public void assertSplit(Token left, Token right, int depth)
+    {
+        Random rand = new Random();
+        for (int i = 0; i < 1000; i++)
+        {
+            assertSplit(left, right ,rand, depth);
+        }
+    }
+
+    private void assertSplit(Token left, Token right, Random rand, int depth)
+    {
+        double ratio = rand.nextDouble();
+        Token newToken = partitioner.split(left, right, ratio);
+
+        assert new Range<Token>(left, right).contains(newToken)
+                : "For " + left + "," + right + ": range did not contain new 
token:" + newToken;
+
+        assertEquals("For " + left + "," + right + ", new token: " + newToken,
+                     ratio, left.size(newToken) / left.size(right), 0.1);
+
+        if (depth < 1)
+            return;
+
+        if (rand.nextBoolean())
+            assertSplit(left, newToken, rand, depth-1);
+        else
+            assertSplit(newToken, right, rand, depth-1);
+    }
+
     @Test
     public void testTokenFactoryBytes()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java 
b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java
index 5e68644..d7fe602 100644
--- a/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java
+++ b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java
@@ -18,10 +18,36 @@
 
 package org.apache.cassandra.dht;
 
+import java.math.BigInteger;
+
+import org.junit.Test;
+
 public class RandomPartitionerTest extends PartitionerTestCase
 {
     public void initPartitioner()
     {
         partitioner = RandomPartitioner.instance;
     }
+
+    @Test
+    public void testSplit()
+    {
+        assertSplit(tok("a"), tok("b"), 16);
+        assertSplit(tok("a"), tok("bbb"), 16);
+    }
+
+    @Test
+    public void testSplitWrapping()
+    {
+        assertSplit(tok("b"), tok("a"), 16);
+        assertSplit(tok("bbb"), tok("a"), 16);
+    }
+
+    @Test
+    public void testSplitExceedMaximumCase()
+    {
+        RandomPartitioner.BigIntegerToken left = new 
RandomPartitioner.BigIntegerToken(RandomPartitioner.MAXIMUM.subtract(BigInteger.valueOf(10)));
+
+        assertSplit(left, tok("a"), 16);
+    }
 }

Reply via email to