Repository: cassandra Updated Branches: refs/heads/cassandra-3.X b98a40605 -> e2a0d75b0 refs/heads/trunk 250573302 -> 6d40809d8
Implement the NoReplicationTokenAllocator Patch by Dikang Gu; reviewed by Branimir Lambov for CASSANDRA-12777 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2a0d75b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2a0d75b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2a0d75b Branch: refs/heads/cassandra-3.X Commit: e2a0d75b024463ad481333bdae826928b55ac589 Parents: b98a406 Author: Dikang Gu <[email protected]> Authored: Thu Oct 13 12:31:39 2016 -0700 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue Oct 25 16:56:07 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/dht/ByteOrderedPartitioner.java | 5 + .../org/apache/cassandra/dht/IPartitioner.java | 5 + .../apache/cassandra/dht/LocalPartitioner.java | 5 + .../cassandra/dht/Murmur3Partitioner.java | 36 +++ .../dht/OrderPreservingPartitioner.java | 5 + .../apache/cassandra/dht/RandomPartitioner.java | 37 ++- .../NoReplicationTokenAllocator.java | 266 ++++++++++++++++++ .../ReplicationAwareTokenAllocator.java | 250 +---------------- .../dht/tokenallocator/TokenAllocation.java | 2 +- .../dht/tokenallocator/TokenAllocator.java | 2 +- .../dht/tokenallocator/TokenAllocatorBase.java | 279 +++++++++++++++++++ .../tokenallocator/TokenAllocatorFactory.java | 37 +++ ...tractReplicationAwareTokenAllocatorTest.java | 175 +----------- .../NoReplicationTokenAllocatorTest.java | 249 +++++++++++++++++ .../tokenallocator/TokenAllocatorTestBase.java | 160 +++++++++++ .../apache/cassandra/dht/LengthPartitioner.java | 5 + .../cassandra/dht/Murmur3PartitionerTest.java | 23 ++ .../cassandra/dht/PartitionerTestCase.java | 32 +++ .../cassandra/dht/RandomPartitionerTest.java | 26 ++ 20 files changed, 1177 insertions(+), 423 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dea2003..0f16a0e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777) * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419) * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803) * Use different build directories for Eclipse and Ant (CASSANDRA-12466) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java index d334604..1271a5a 100644 --- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java +++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java @@ -158,6 +158,11 @@ public class ByteOrderedPartitioner implements IPartitioner return new BytesToken(bytesForBig(midpair.left, sigbytes, midpair.right)); } + public Token split(Token left, Token right, double ratioToLeft) + { + throw new UnsupportedOperationException(); + } + /** * Convert a byte array containing the most significant of 'sigbytes' bytes * representing a big-endian magnitude into a BigInteger. http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/IPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java index eb4aafb..e342bd0 100644 --- a/src/java/org/apache/cassandra/dht/IPartitioner.java +++ b/src/java/org/apache/cassandra/dht/IPartitioner.java @@ -45,6 +45,11 @@ public interface IPartitioner public Token midpoint(Token left, Token right); /** + * Calculate a Token which take approximate 0 <= ratioToLeft <= 1 ownership of the given range. + */ + public Token split(Token left, Token right, double ratioToLeft); + + /** * @return A Token smaller than all others in the range that is being partitioned. * Not legal to assign to a node or key. (But legal to use in range scans.) */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/LocalPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java index 9922eb0..0a9b7cb 100644 --- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java +++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java @@ -50,6 +50,11 @@ public class LocalPartitioner implements IPartitioner throw new UnsupportedOperationException(); } + public Token split(Token left, Token right, double ratioToLeft) + { + throw new UnsupportedOperationException(); + } + public LocalToken getMinimumToken() { return new LocalToken(ByteBufferUtil.EMPTY_BYTE_BUFFER); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java index 9ed0cca..0f922e3 100644 --- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java +++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java @@ -94,6 +94,42 @@ public class Murmur3Partitioner implements IPartitioner return new LongToken(midpoint.longValue()); } + public Token split(Token lToken, Token rToken, double ratioToLeft) + { + BigDecimal l = BigDecimal.valueOf(((LongToken) lToken).token), + r = BigDecimal.valueOf(((LongToken) rToken).token), + ratio = BigDecimal.valueOf(ratioToLeft); + long newToken; + + if (l.compareTo(r) < 0) + { + newToken = r.subtract(l).multiply(ratio).add(l).toBigInteger().longValue(); + } + else + { + // wrapping case + // L + ((R - min) + (max - L)) * pct + BigDecimal max = BigDecimal.valueOf(MAXIMUM); + BigDecimal min = BigDecimal.valueOf(MINIMUM.token); + + BigInteger token = max.subtract(min).add(r).subtract(l).multiply(ratio).add(l).toBigInteger(); + + BigInteger maxToken = BigInteger.valueOf(MAXIMUM); + + if (token.compareTo(maxToken) <= 0) + { + newToken = token.longValue(); + } + else + { + // if the value is above maximum + BigInteger minToken = BigInteger.valueOf(MINIMUM.token); + newToken = minToken.add(token.subtract(maxToken)).longValue(); + } + } + return new LongToken(newToken); + } + public LongToken getMinimumToken() { return MINIMUM; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java index ab552c4..954b0af 100644 --- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java +++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java @@ -63,6 +63,11 @@ public class OrderPreservingPartitioner implements IPartitioner return new StringToken(stringForBig(midpair.left, sigchars, midpair.right)); } + public Token split(Token left, Token right, double ratioToLeft) + { + throw new UnsupportedOperationException(); + } + /** * Copies the characters of the given string into a BigInteger. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/RandomPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index f6090e0..82c2493 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -78,6 +78,32 @@ public class RandomPartitioner implements IPartitioner return new BigIntegerToken(midpair.left); } + public Token split(Token ltoken, Token rtoken, double ratioToLeft) + { + BigDecimal left = ltoken.equals(MINIMUM) ? BigDecimal.ZERO : new BigDecimal(((BigIntegerToken)ltoken).token), + right = rtoken.equals(MINIMUM) ? BigDecimal.ZERO : new BigDecimal(((BigIntegerToken)rtoken).token), + ratio = BigDecimal.valueOf(ratioToLeft); + + BigInteger newToken; + + if (left.compareTo(right) < 0) + { + newToken = right.subtract(left).multiply(ratio).add(left).toBigInteger(); + } + else + { + // wrapping case + // L + ((R - min) + (max - L)) * ratio + BigDecimal max = new BigDecimal(MAXIMUM); + + newToken = max.add(right).subtract(left).multiply(ratio).add(left).toBigInteger().mod(MAXIMUM); + } + + assert isValidToken(newToken) : "Invalid tokens from split"; + + return new BigIntegerToken(newToken); + } + public BigIntegerToken getMinimumToken() { return MINIMUM; @@ -99,6 +125,10 @@ public class RandomPartitioner implements IPartitioner return new BigIntegerToken(token); } + private boolean isValidToken(BigInteger token) { + return token.compareTo(ZERO) >= 0 && token.compareTo(MAXIMUM) <= 0; + } + private final Token.TokenFactory tokenFactory = new Token.TokenFactory() { public ByteBuffer toByteArray(Token token) @@ -122,11 +152,8 @@ public class RandomPartitioner implements IPartitioner { try { - BigInteger i = new BigInteger(token); - if (i.compareTo(ZERO) < 0) - throw new ConfigurationException("Token must be >= 0"); - if (i.compareTo(MAXIMUM) > 0) - throw new ConfigurationException("Token must be <= 2**127"); + if(!isValidToken(new BigInteger(token))) + throw new ConfigurationException("Token must be >= 0 and <= 2**127"); } catch (NumberFormatException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java b/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java new file mode 100644 index 0000000..54d80dc --- /dev/null +++ b/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht.tokenallocator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; + +public class NoReplicationTokenAllocator<Unit> extends TokenAllocatorBase<Unit> +{ + PriorityQueue<Weighted<UnitInfo>> sortedUnits = Queues.newPriorityQueue(); + Map<Unit, PriorityQueue<Weighted<TokenInfo>>> tokensInUnits = Maps.newHashMap(); + + private static final double MAX_TAKEOVER_RATIO = 0.90; + private static final double MIN_TAKEOVER_RATIO = 1.0 - MAX_TAKEOVER_RATIO; + + public NoReplicationTokenAllocator(NavigableMap<Token, Unit> sortedTokens, + ReplicationStrategy<Unit> strategy, + IPartitioner partitioner) + { + super(sortedTokens, strategy, partitioner); + } + + /** + * Construct the token ring as a CircularList of TokenInfo, + * and populate the ownership of the UnitInfo's provided + */ + private TokenInfo<Unit> createTokenInfos(Map<Unit, UnitInfo<Unit>> units) + { + if (units.isEmpty()) + return null; + + // build the circular list + TokenInfo<Unit> prev = null; + TokenInfo<Unit> first = null; + for (Map.Entry<Token, Unit> en : sortedTokens.entrySet()) + { + Token t = en.getKey(); + UnitInfo<Unit> ni = units.get(en.getValue()); + TokenInfo<Unit> ti = new TokenInfo<>(t, ni); + first = ti.insertAfter(first, prev); + prev = ti; + } + + TokenInfo<Unit> curr = first; + tokensInUnits.clear(); + sortedUnits.clear(); + do + { + populateTokenInfoAndAdjustUnit(curr); + curr = curr.next; + } while (curr != first); + + for (UnitInfo<Unit> unitInfo : units.values()) + { + sortedUnits.add(new Weighted<UnitInfo>(unitInfo.ownership, unitInfo)); + } + + return first; + } + + /** + * Used in tests. + */ + protected void createTokenInfos() + { + createTokenInfos(createUnitInfos(Maps.newHashMap())); + } + + private void populateTokenInfoAndAdjustUnit(TokenInfo<Unit> token) + { + token.replicationStart = token.prevInRing().token; + token.replicationThreshold = token.token; + token.replicatedOwnership = token.replicationStart.size(token.token); + token.owningUnit.ownership += token.replicatedOwnership; + + PriorityQueue<Weighted<TokenInfo>> unitTokens = tokensInUnits.get(token.owningUnit.unit); + if (unitTokens == null) + { + unitTokens = Queues.newPriorityQueue(); + tokensInUnits.put(token.owningUnit.unit, unitTokens); + } + unitTokens.add(new Weighted<TokenInfo>(token.replicatedOwnership, token)); + } + + private Collection<Token> generateRandomTokens(UnitInfo<Unit> newUnit, int numTokens, Map<Unit, UnitInfo<Unit>> unitInfos) + { + Set<Token> tokens = new HashSet<>(numTokens); + while (tokens.size() < numTokens) + { + Token token = partitioner.getRandomToken(); + if (!sortedTokens.containsKey(token)) + { + tokens.add(token); + sortedTokens.put(token, newUnit.unit); + } + } + unitInfos.put(newUnit.unit, newUnit); + createTokenInfos(unitInfos); + return tokens; + } + + public Collection<Token> addUnit(Unit newUnit, int numTokens) + { + assert !tokensInUnits.containsKey(newUnit); + + Map<Object, GroupInfo> groups = Maps.newHashMap(); + UnitInfo<Unit> newUnitInfo = new UnitInfo<>(newUnit, 0, groups, strategy); + Map<Unit, UnitInfo<Unit>> unitInfos = createUnitInfos(groups); + + if (unitInfos.isEmpty()) + return generateRandomTokens(newUnitInfo, numTokens, unitInfos); + + if (numTokens > sortedTokens.size()) + return generateRandomTokens(newUnitInfo, numTokens, unitInfos); + + TokenInfo<Unit> head = createTokenInfos(unitInfos); + + // Select the nodes we will work with, extract them from sortedUnits and calculate targetAverage + double targetAverage = 0.0; + double sum = 0.0; + List<Weighted<UnitInfo>> unitsToChange = new ArrayList<>(); + + for (int i = 0; i < numTokens; i++) + { + Weighted<UnitInfo> unit = sortedUnits.peek(); + + if (unit == null) + break; + + sum += unit.weight; + double average = sum / (unitsToChange.size() + 2); // unit and newUnit must be counted + if (unit.weight <= average) + // No point to include later nodes, target can only decrease from here. + break; + + sortedUnits.remove(); + unitsToChange.add(unit); + targetAverage = average; + } + + List<Token> newTokens = Lists.newArrayListWithCapacity(numTokens); + + int nr = 0; + // calculate the tokens + for (Weighted<UnitInfo> unit : unitsToChange) + { + // TODO: Any better ways to assign how many tokens to change in each node? + int tokensToChange = numTokens / unitsToChange.size() + (nr < numTokens % unitsToChange.size() ? 1 : 0); + + Queue<Weighted<TokenInfo>> unitTokens = tokensInUnits.get(unit.value.unit); + List<Weighted<TokenInfo>> tokens = Lists.newArrayListWithCapacity(tokensToChange); + + double workWeight = 0; + // Extract biggest vnodes and calculate how much weight we can work with. + for (int i = 0; i < tokensToChange; i++) + { + Weighted<TokenInfo> wt = unitTokens.remove(); + tokens.add(wt); + workWeight += wt.weight; + unit.value.ownership -= wt.weight; + } + + double toTakeOver = unit.weight - targetAverage; + // Split toTakeOver proportionally between the vnodes. + for (Weighted<TokenInfo> wt : tokens) + { + double slice; + Token token; + + if (toTakeOver < workWeight) + { + // Spread decrease. + slice = toTakeOver / workWeight; + + if (slice < MIN_TAKEOVER_RATIO) + slice = MIN_TAKEOVER_RATIO; + if (slice > MAX_TAKEOVER_RATIO) + slice = MAX_TAKEOVER_RATIO; + } + else + { + slice = MAX_TAKEOVER_RATIO; + } + token = partitioner.split(wt.value.prevInRing().token, wt.value.token, slice); + + //Token selected, now change all data + sortedTokens.put(token, newUnit); + + TokenInfo<Unit> ti = new TokenInfo<>(token, newUnitInfo); + + ti.insertAfter(head, wt.value.prevInRing()); + + populateTokenInfoAndAdjustUnit(ti); + populateTokenInfoAndAdjustUnit(wt.value); + newTokens.add(token); + } + + // adjust the weight for current unit + sortedUnits.add(new Weighted<>(unit.value.ownership, unit.value)); + ++nr; + } + sortedUnits.add(new Weighted<>(newUnitInfo.ownership, newUnitInfo)); + + return newTokens; + } + + /** + * For testing, remove the given unit preserving correct state of the allocator. + */ + void removeUnit(Unit n) + { + Iterator<Weighted<UnitInfo>> it = sortedUnits.iterator(); + while (it.hasNext()) + { + if (it.next().value.unit.equals(n)) + { + it.remove(); + break; + } + } + + PriorityQueue<Weighted<TokenInfo>> tokenInfos = tokensInUnits.remove(n); + Collection<Token> tokens = Lists.newArrayListWithCapacity(tokenInfos.size()); + for (Weighted<TokenInfo> tokenInfo : tokenInfos) + { + tokens.add(tokenInfo.value.token); + } + sortedTokens.keySet().removeAll(tokens); + } + + public int getReplicas() + { + return 1; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java index a60be94..87dba59 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java @@ -36,23 +36,23 @@ import org.apache.cassandra.dht.Token; * ownership needs to be evenly distributed. At the moment only nodes as a whole are treated as units, but that * will change with the introduction of token ranges per disk. */ -class ReplicationAwareTokenAllocator<Unit> implements TokenAllocator<Unit> +class ReplicationAwareTokenAllocator<Unit> extends TokenAllocatorBase<Unit> { - final NavigableMap<Token, Unit> sortedTokens; final Multimap<Unit, Token> unitToTokens; - final ReplicationStrategy<Unit> strategy; - final IPartitioner partitioner; final int replicas; ReplicationAwareTokenAllocator(NavigableMap<Token, Unit> sortedTokens, ReplicationStrategy<Unit> strategy, IPartitioner partitioner) { - this.sortedTokens = sortedTokens; + super(sortedTokens, strategy, partitioner); unitToTokens = HashMultimap.create(); for (Map.Entry<Token, Unit> en : sortedTokens.entrySet()) unitToTokens.put(en.getValue(), en.getKey()); - this.strategy = strategy; this.replicas = strategy.replicas(); - this.partitioner = partitioner; + } + + public int getReplicas() + { + return replicas; } public Collection<Token> addUnit(Unit newUnit, int numTokens) @@ -151,19 +151,6 @@ class ReplicationAwareTokenAllocator<Unit> implements TokenAllocator<Unit> return tokens; } - private Map<Unit, UnitInfo<Unit>> createUnitInfos(Map<Object, GroupInfo> groups) - { - Map<Unit, UnitInfo<Unit>> map = Maps.newHashMap(); - for (Unit n : sortedTokens.values()) - { - UnitInfo<Unit> ni = map.get(n); - if (ni == null) - map.put(n, ni = new UnitInfo<>(n, 0, groups, strategy)); - ni.tokenCount++; - } - return map; - } - /** * Construct the token ring as a CircularList of TokenInfo, * and populate the ownership of the UnitInfo's provided @@ -506,19 +493,6 @@ class ReplicationAwareTokenAllocator<Unit> implements TokenAllocator<Unit> } } - private Map.Entry<Token, Unit> mapEntryFor(Token t) - { - Map.Entry<Token, Unit> en = sortedTokens.floorEntry(t); - if (en == null) - en = sortedTokens.lastEntry(); - return en; - } - - Unit unitFor(Token t) - { - return mapEntryFor(t).getValue(); - } - private double optimalTokenOwnership(int tokensToAdd) { return 1.0 * replicas / (sortedTokens.size() + tokensToAdd); @@ -554,7 +528,7 @@ class ReplicationAwareTokenAllocator<Unit> implements TokenAllocator<Unit> sortedTokens.keySet().removeAll(tokens); } - int unitCount() + public int unitCount() { return unitToTokens.asMap().size(); } @@ -564,189 +538,6 @@ class ReplicationAwareTokenAllocator<Unit> implements TokenAllocator<Unit> return getClass().getSimpleName(); } - // get or initialise the shared GroupInfo associated with the unit - private static <Unit> GroupInfo getGroup(Unit unit, Map<Object, GroupInfo> groupMap, ReplicationStrategy<Unit> strategy) - { - Object groupClass = strategy.getGroup(unit); - GroupInfo group = groupMap.get(groupClass); - if (group == null) - groupMap.put(groupClass, group = new GroupInfo(groupClass)); - return group; - } - - /** - * Unique group object that one or more UnitInfo objects link to. - */ - private static class GroupInfo - { - /** - * Group identifier given by ReplicationStrategy.getGroup(Unit). - */ - final Object group; - - /** - * Seen marker. When non-null, the group is already seen in replication walks. - * Also points to previous seen group to enable walking the seen groups and clearing the seen markers. - */ - GroupInfo prevSeen = null; - /** - * Same marker/chain used by populateTokenInfo. - */ - GroupInfo prevPopulate = null; - - /** - * Value used as terminator for seen chains. - */ - static GroupInfo TERMINATOR = new GroupInfo(null); - - public GroupInfo(Object group) - { - this.group = group; - } - - public String toString() - { - return group.toString() + (prevSeen != null ? "*" : ""); - } - } - - /** - * Unit information created and used by ReplicationAwareTokenDistributor. Contained vnodes all point to the same - * instance. - */ - static class UnitInfo<Unit> - { - final Unit unit; - final GroupInfo group; - double ownership; - int tokenCount; - - /** - * During evaluateImprovement this is used to form a chain of units affected by the candidate insertion. - */ - UnitInfo<Unit> prevUsed; - /** - * During evaluateImprovement this holds the ownership after the candidate insertion. - */ - double adjustedOwnership; - - private UnitInfo(Unit unit, GroupInfo group) - { - this.unit = unit; - this.group = group; - this.tokenCount = 0; - } - - public UnitInfo(Unit unit, double ownership, Map<Object, GroupInfo> groupMap, ReplicationStrategy<Unit> strategy) - { - this(unit, getGroup(unit, groupMap, strategy)); - this.ownership = ownership; - } - - public String toString() - { - return String.format("%s%s(%.2e)%s", - unit, unit == group.group ? (group.prevSeen != null ? "*" : "") : ":" + group.toString(), - ownership, prevUsed != null ? (prevUsed == this ? "#" : "->" + prevUsed.toString()) : ""); - } - } - - private static class CircularList<T extends CircularList<T>> - { - T prev; - T next; - - /** - * Inserts this after unit in the circular list which starts at head. Returns the new head of the list, which - * only changes if head was null. - */ - @SuppressWarnings("unchecked") - T insertAfter(T head, T unit) - { - if (head == null) - { - return prev = next = (T) this; - } - assert unit != null; - assert unit.next != null; - prev = unit; - next = unit.next; - prev.next = (T) this; - next.prev = (T) this; - return head; - } - - /** - * Removes this from the list that starts at head. Returns the new head of the list, which only changes if the - * head was removed. - */ - T removeFrom(T head) - { - next.prev = prev; - prev.next = next; - return this == head ? (this == next ? null : next) : head; - } - } - - private static class BaseTokenInfo<Unit, T extends BaseTokenInfo<Unit, T>> extends CircularList<T> - { - final Token token; - final UnitInfo<Unit> owningUnit; - - /** - * Start of the replication span for the vnode, i.e. the first token of the RF'th group seen before the token. - * The replicated ownership of the unit is the range between {@code replicationStart} and {@code token}. - */ - Token replicationStart; - /** - * The closest position that the new candidate can take to become the new replication start. If candidate is - * closer, the start moves to this position. Used to determine replicationStart after insertion of new token. - * - * Usually the RF minus one boundary, i.e. the first token of the RF-1'th group seen before the token. - */ - Token replicationThreshold; - /** - * Current replicated ownership. This number is reflected in the owning unit's ownership. - */ - double replicatedOwnership = 0; - - public BaseTokenInfo(Token token, UnitInfo<Unit> owningUnit) - { - this.token = token; - this.owningUnit = owningUnit; - } - - public String toString() - { - return String.format("%s(%s)", token, owningUnit); - } - - /** - * Previous unit in the token ring. For existing tokens this is prev, - * for candidates it's "split". - */ - TokenInfo<Unit> prevInRing() - { - return null; - } - } - - /** - * TokenInfo about existing tokens/vnodes. - */ - private static class TokenInfo<Unit> extends BaseTokenInfo<Unit, TokenInfo<Unit>> - { - public TokenInfo(Token token, UnitInfo<Unit> owningUnit) - { - super(token, owningUnit); - } - - TokenInfo<Unit> prevInRing() - { - return prev; - } - } - /** * TokenInfo about candidate new tokens/vnodes. */ @@ -776,30 +567,5 @@ class ReplicationAwareTokenAllocator<Unit> implements TokenAllocator<Unit> token = token.next; } while (token != null && token != tokens); } - - static class Weighted<T> implements Comparable<Weighted<T>> - { - final double weight; - final T value; - - public Weighted(double weight, T value) - { - this.weight = weight; - this.value = value; - } - - @Override - public int compareTo(Weighted<T> o) - { - int cmp = Double.compare(o.weight, this.weight); - return cmp; - } - - @Override - public String toString() - { - return String.format("%s<%s>", value, weight); - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java index ebbd690..36824a1 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java @@ -147,7 +147,7 @@ public class TokenAllocation if (strategy.inAllocationRing(en.getValue())) sortedTokens.put(en.getKey(), en.getValue()); } - return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, tokenMetadata.partitioner); + return TokenAllocatorFactory.createTokenAllocator(sortedTokens, strategy, tokenMetadata.partitioner); } interface StrategyAdapter extends ReplicationStrategy<InetAddress> http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java index 580f2ec..2eb9a4c 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocator.java @@ -23,5 +23,5 @@ import org.apache.cassandra.dht.Token; public interface TokenAllocator<Unit> { - public Collection<Token> addUnit(Unit newUnit, int numTokens); + Collection<Token> addUnit(Unit newUnit, int numTokens); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorBase.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorBase.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorBase.java new file mode 100644 index 0000000..f59bfd4 --- /dev/null +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorBase.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht.tokenallocator; + +import java.util.Map; +import java.util.NavigableMap; + +import com.google.common.collect.Maps; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; + +public abstract class TokenAllocatorBase<Unit> implements TokenAllocator<Unit> +{ + final NavigableMap<Token, Unit> sortedTokens; + final ReplicationStrategy<Unit> strategy; + final IPartitioner partitioner; + + protected TokenAllocatorBase(NavigableMap<Token, Unit> sortedTokens, + ReplicationStrategy<Unit> strategy, + IPartitioner partitioner) + { + this.sortedTokens = sortedTokens; + this.strategy = strategy; + this.partitioner = partitioner; + } + + public abstract int getReplicas(); + + protected Map<Unit, UnitInfo<Unit>> createUnitInfos(Map<Object, GroupInfo> groups) + { + Map<Unit, UnitInfo<Unit>> map = Maps.newHashMap(); + for (Unit n : sortedTokens.values()) + { + UnitInfo<Unit> ni = map.get(n); + if (ni == null) + map.put(n, ni = new UnitInfo<>(n, 0, groups, strategy)); + ni.tokenCount++; + } + return map; + } + + private Map.Entry<Token, Unit> mapEntryFor(Token t) + { + Map.Entry<Token, Unit> en = sortedTokens.floorEntry(t); + if (en == null) + en = sortedTokens.lastEntry(); + return en; + } + + Unit unitFor(Token t) + { + return mapEntryFor(t).getValue(); + } + + // get or initialise the shared GroupInfo associated with the unit + private static <Unit> GroupInfo getGroup(Unit unit, Map<Object, GroupInfo> groupMap, ReplicationStrategy<Unit> strategy) + { + Object groupClass = strategy.getGroup(unit); + GroupInfo group = groupMap.get(groupClass); + if (group == null) + groupMap.put(groupClass, group = new GroupInfo(groupClass)); + return group; + } + + /** + * Unique group object that one or more UnitInfo objects link to. + */ + static class GroupInfo + { + /** + * Group identifier given by ReplicationStrategy.getGroup(Unit). + */ + final Object group; + + /** + * Seen marker. When non-null, the group is already seen in replication walks. + * Also points to previous seen group to enable walking the seen groups and clearing the seen markers. + */ + GroupInfo prevSeen = null; + /** + * Same marker/chain used by populateTokenInfo. + */ + GroupInfo prevPopulate = null; + + /** + * Value used as terminator for seen chains. + */ + static GroupInfo TERMINATOR = new GroupInfo(null); + + public GroupInfo(Object group) + { + this.group = group; + } + + public String toString() + { + return group.toString() + (prevSeen != null ? "*" : ""); + } + } + + /** + * Unit information created and used by ReplicationAwareTokenDistributor. Contained vnodes all point to the same + * instance. + */ + static class UnitInfo<Unit> + { + final Unit unit; + final GroupInfo group; + double ownership; + int tokenCount; + + /** + * During evaluateImprovement this is used to form a chain of units affected by the candidate insertion. + */ + UnitInfo<Unit> prevUsed; + /** + * During evaluateImprovement this holds the ownership after the candidate insertion. + */ + double adjustedOwnership; + + private UnitInfo(Unit unit, GroupInfo group) + { + this.unit = unit; + this.group = group; + this.tokenCount = 0; + } + + public UnitInfo(Unit unit, double ownership, Map<Object, GroupInfo> groupMap, ReplicationStrategy<Unit> strategy) + { + this(unit, getGroup(unit, groupMap, strategy)); + this.ownership = ownership; + } + + public String toString() + { + return String.format("%s%s(%.2e)%s", + unit, unit == group.group ? (group.prevSeen != null ? "*" : "") : ":" + group.toString(), + ownership, prevUsed != null ? (prevUsed == this ? "#" : "->" + prevUsed.toString()) : ""); + } + } + + private static class CircularList<T extends CircularList<T>> + { + T prev; + T next; + + /** + * Inserts this after unit in the circular list which starts at head. Returns the new head of the list, which + * only changes if head was null. + */ + @SuppressWarnings("unchecked") + T insertAfter(T head, T unit) + { + if (head == null) + { + return prev = next = (T) this; + } + assert unit != null; + assert unit.next != null; + prev = unit; + next = unit.next; + prev.next = (T) this; + next.prev = (T) this; + return head; + } + + /** + * Removes this from the list that starts at head. Returns the new head of the list, which only changes if the + * head was removed. + */ + T removeFrom(T head) + { + next.prev = prev; + prev.next = next; + return this == head ? (this == next ? null : next) : head; + } + } + + static class BaseTokenInfo<Unit, T extends BaseTokenInfo<Unit, T>> extends CircularList<T> + { + final Token token; + final UnitInfo<Unit> owningUnit; + + /** + * Start of the replication span for the vnode, i.e. the first token of the RF'th group seen before the token. + * The replicated ownership of the unit is the range between {@code replicationStart} and {@code token}. + */ + Token replicationStart; + /** + * The closest position that the new candidate can take to become the new replication start. If candidate is + * closer, the start moves to this position. Used to determine replicationStart after insertion of new token. + * + * Usually the RF minus one boundary, i.e. the first token of the RF-1'th group seen before the token. + */ + Token replicationThreshold; + /** + * Current replicated ownership. This number is reflected in the owning unit's ownership. + */ + double replicatedOwnership = 0; + + public BaseTokenInfo(Token token, UnitInfo<Unit> owningUnit) + { + this.token = token; + this.owningUnit = owningUnit; + } + + public String toString() + { + return String.format("%s(%s)", token, owningUnit); + } + + /** + * Previous unit in the token ring. For existing tokens this is prev, + * for candidates it's "split". + */ + TokenInfo<Unit> prevInRing() + { + return null; + } + } + + /** + * TokenInfo about existing tokens/vnodes. + */ + static class TokenInfo<Unit> extends BaseTokenInfo<Unit, TokenInfo<Unit>> + { + public TokenInfo(Token token, UnitInfo<Unit> owningUnit) + { + super(token, owningUnit); + } + + TokenInfo<Unit> prevInRing() + { + return prev; + } + } + + static class Weighted<T> implements Comparable<Weighted<T>> + { + final double weight; + final T value; + + public Weighted(double weight, T value) + { + this.weight = weight; + this.value = value; + } + + @Override + public int compareTo(Weighted<T> o) + { + int cmp = Double.compare(o.weight, this.weight); + return cmp; + } + + @Override + public String toString() + { + return String.format("%s<%s>", value, weight); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java new file mode 100644 index 0000000..f8c972d --- /dev/null +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht.tokenallocator; + +import java.net.InetAddress; +import java.util.NavigableMap; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; + +public class TokenAllocatorFactory +{ + public static TokenAllocator<InetAddress> createTokenAllocator(NavigableMap<Token, InetAddress> sortedTokens, + ReplicationStrategy<InetAddress> strategy, + IPartitioner partitioner) + { + if(strategy.replicas() == 1) + return new NoReplicationTokenAllocator<>(sortedTokens, strategy, partitioner); + return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java index 80e980a..eb79f12 100644 --- a/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java +++ b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java @@ -36,82 +36,8 @@ import org.apache.cassandra.dht.Token; * we need to separate classes to avoid timeous in case flaky tests need to be repeated, see CASSANDRA-12784. */ @Ignore -abstract class AbstractReplicationAwareTokenAllocatorTest +abstract class AbstractReplicationAwareTokenAllocatorTest extends TokenAllocatorTestBase { - private static final int TARGET_CLUSTER_SIZE = 250; - - interface TestReplicationStrategy extends ReplicationStrategy<Unit> - { - void addUnit(Unit n); - - void removeUnit(Unit n); - - /** - * Returns a list of all replica units for given token. - */ - List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens); - - /** - * Returns the start of the token span that is replicated in this token. - * Note: Though this is not trivial to see, the replicated span is always contiguous. A token in the same - * group acts as a barrier; if one is not found the token replicates everything up to the replica'th distinct - * group seen in front of it. - */ - Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens); - - /** - * Multiplier for the acceptable disbalance in the cluster. With some strategies it is harder to achieve good - * results. - */ - public double spreadExpectation(); - } - - static class NoReplicationStrategy implements TestReplicationStrategy - { - public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens) - { - return Collections.singletonList(sortedTokens.ceilingEntry(token).getValue()); - } - - public Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens) - { - return sortedTokens.lowerKey(token); - } - - public String toString() - { - return "No replication"; - } - - public void addUnit(Unit n) - { - } - - public void removeUnit(Unit n) - { - } - - public int replicas() - { - return 1; - } - - public boolean sameGroup(Unit n1, Unit n2) - { - return false; - } - - public Object getGroup(Unit unit) - { - return unit; - } - - public double spreadExpectation() - { - return 1; - } - } - static class SimpleReplicationStrategy implements TestReplicationStrategy { int replicas; @@ -455,60 +381,6 @@ abstract class AbstractReplicationAwareTokenAllocatorTest return ts.replicationStart(token, sortedTokens.get(token), sortedTokens).size(next); } - static interface TokenCount - { - int tokenCount(int perUnitCount, Random rand); - - double spreadExpectation(); - } - - static TokenCount fixedTokenCount = new TokenCount() - { - public int tokenCount(int perUnitCount, Random rand) - { - return perUnitCount; - } - - public double spreadExpectation() - { - return 4; // High tolerance to avoid flakiness. - } - }; - - static TokenCount varyingTokenCount = new TokenCount() - { - public int tokenCount(int perUnitCount, Random rand) - { - if (perUnitCount == 1) return 1; - // 25 to 175% - return rand.nextInt(perUnitCount * 3 / 2) + (perUnitCount + 3) / 4; - } - - public double spreadExpectation() - { - return 8; // High tolerance to avoid flakiness. - } - }; - - Random seededRand = new Random(2); - - private void random(Map<Token, Unit> map, TestReplicationStrategy rs, - int unitCount, TokenCount tc, int perUnitCount, IPartitioner partitioner) - { - System.out.format("\nRandom generation of %d units with %d tokens each\n", unitCount, perUnitCount); - Random rand = seededRand; - for (int i = 0; i < unitCount; i++) - { - Unit unit = new Unit(); - rs.addUnit(unit); - int tokens = tc.tokenCount(perUnitCount, rand); - for (int j = 0; j < tokens; j++) - { - map.put(partitioner.getRandomToken(rand), unit); - } - } - } - protected void testExistingCluster(IPartitioner partitioner, int maxVNodeCount) { for (int rf = 1; rf <= 5; ++rf) @@ -607,25 +479,6 @@ abstract class AbstractReplicationAwareTokenAllocatorTest grow(t, fullCount, tc, perUnitCount, true); } - static class Summary - { - double min = 1; - double max = 1; - double stddev = 0; - - void update(SummaryStatistics stat) - { - min = Math.min(min, stat.getMin()); - max = Math.max(max, stat.getMax()); - stddev = Math.max(stddev, stat.getStandardDeviation()); - } - - public String toString() - { - return String.format("max %.2f min %.2f stddev %.4f", max, min, stddev); - } - } - public void grow(ReplicationAwareTokenAllocator<Unit> t, int targetClusterSize, TokenCount tc, int perUnitCount, boolean verifyMetrics) { int size = t.unitCount(); @@ -661,7 +514,6 @@ abstract class AbstractReplicationAwareTokenAllocatorTest } } - private void updateSummary(ReplicationAwareTokenAllocator<Unit> t, Summary su, Summary st, boolean print) { int size = t.sortedTokens.size(); @@ -688,29 +540,4 @@ abstract class AbstractReplicationAwareTokenAllocatorTest System.out.format("Worst intermediate unit\t%s token %s\n", su, st); } } - - - private static String mms(SummaryStatistics s) - { - return String.format("max %.2f min %.2f stddev %.4f", s.getMax(), s.getMin(), s.getStandardDeviation()); - } - - - int nextUnitId = 0; - - final class Unit implements Comparable<Unit> - { - int unitId = nextUnitId++; - - public String toString() - { - return Integer.toString(unitId); - } - - @Override - public int compareTo(Unit o) - { - return Integer.compare(unitId, o.unitId); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java new file mode 100644 index 0000000..fdcc6b8 --- /dev/null +++ b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht.tokenallocator; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.PriorityQueue; +import java.util.Random; + +import com.google.common.collect.Maps; +import org.apache.commons.math3.stat.descriptive.SummaryStatistics; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Token; + +public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase +{ + + @Test + public void testNewClusterWithMurmur3Partitioner() + { + testNewCluster(new Murmur3Partitioner()); + } + + @Test + public void testNewClusterWithRandomPartitioner() + { + testNewCluster(new RandomPartitioner()); + } + + private void testNewCluster(IPartitioner partitioner) + { + for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4) + { + testNewCluster(perUnitCount, fixedTokenCount, new NoReplicationStrategy(), partitioner); + } + } + + public void testNewCluster(int perUnitCount, TokenCount tc, NoReplicationStrategy rs, IPartitioner partitioner) + { + System.out.println("Testing new cluster, target " + perUnitCount + " vnodes, replication " + rs); + final int targetClusterSize = TARGET_CLUSTER_SIZE; + NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap(); + + NoReplicationTokenAllocator<Unit> t = new NoReplicationTokenAllocator<Unit>(tokenMap, rs, partitioner); + grow(t, targetClusterSize * 2 / 5, tc, perUnitCount, false); + grow(t, targetClusterSize, tc, perUnitCount, true); + System.out.println(); + } + + @Test + public void testExistingClusterWithMurmur3Partitioner() + { + testExistingCluster(new Murmur3Partitioner()); + } + + @Test + public void testExistingClusterWithRandomPartitioner() + { + testExistingCluster(new RandomPartitioner()); + } + + private void testExistingCluster(IPartitioner partitioner) + { + for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4) + { + testExistingCluster(perUnitCount, fixedTokenCount, new NoReplicationStrategy(), partitioner); + } + } + + public NoReplicationTokenAllocator<Unit> randomWithTokenAllocator(NavigableMap<Token, Unit> map, NoReplicationStrategy rs, + int unitCount, TokenCount tc, int perUnitCount, + IPartitioner partitioner) + { + super.random(map, rs, unitCount, tc, perUnitCount, partitioner); + NoReplicationTokenAllocator<Unit> t = new NoReplicationTokenAllocator<Unit>(map, rs, partitioner); + t.createTokenInfos(); + return t; + } + + public void testExistingCluster(int perUnitCount, TokenCount tc, NoReplicationStrategy rs, IPartitioner partitioner) + { + System.out.println("Testing existing cluster, target " + perUnitCount + " vnodes, replication " + rs); + final int targetClusterSize = TARGET_CLUSTER_SIZE; + NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap(); + NoReplicationTokenAllocator<Unit> t = randomWithTokenAllocator(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount, partitioner); + updateSummaryBeforeGrow(t); + + grow(t, targetClusterSize * 9 / 10, tc, perUnitCount, false); + grow(t, targetClusterSize, tc, perUnitCount, true); + loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount, partitioner); + System.out.println(); + } + + private void loseAndReplace(NoReplicationTokenAllocator<Unit> t, int howMany, + TokenCount tc, int perUnitCount, IPartitioner partitioner) + { + int fullCount = t.sortedUnits.size(); + System.out.format("Losing %d units. ", howMany); + for (int i = 0; i < howMany; ++i) + { + Unit u = t.unitFor(partitioner.getRandomToken(seededRand)); + t.removeUnit(u); + } + // Grow half without verifying. + grow(t, (t.sortedUnits.size() + fullCount * 3) / 4, tc, perUnitCount, false); + // Metrics should be back to normal by now. Check that they remain so. + grow(t, fullCount, tc, perUnitCount, true); + } + + private void updateSummaryBeforeGrow(NoReplicationTokenAllocator<Unit> t) + { + Summary su = new Summary(); + Summary st = new Summary(); + System.out.println("Before growing cluster: "); + updateSummary(t, su, st, true); + } + + private void grow(NoReplicationTokenAllocator<Unit> t, int targetClusterSize, TokenCount tc, int perUnitCount, boolean verifyMetrics) + { + int size = t.sortedUnits.size(); + Summary su = new Summary(); + Summary st = new Summary(); + Random rand = new Random(targetClusterSize + perUnitCount); + TestReplicationStrategy strategy = (TestReplicationStrategy) t.strategy; + if (size < targetClusterSize) + { + System.out.format("Adding %d unit(s) using %s...", targetClusterSize - size, t.toString()); + long time = System.currentTimeMillis(); + + while (size < targetClusterSize) + { + int num_tokens = tc.tokenCount(perUnitCount, rand); + Unit unit = new Unit(); + t.addUnit(unit, num_tokens); + ++size; + if (verifyMetrics) + updateSummary(t, su, st, false); + } + System.out.format(" Done in %.3fs\n", (System.currentTimeMillis() - time) / 1000.0); + + if (verifyMetrics) + { + updateSummary(t, su, st, true); + double maxExpected = 1.0 + tc.spreadExpectation() * strategy.spreadExpectation() / perUnitCount; + if (su.max > maxExpected) + { + Assert.fail(String.format("Expected max unit size below %.4f, was %.4f", maxExpected, su.max)); + } + } + } + } + + private void updateSummary(NoReplicationTokenAllocator<Unit> t, Summary su, Summary st, boolean print) + { + int size = t.sortedTokens.size(); + + SummaryStatistics unitStat = new SummaryStatistics(); + for (TokenAllocatorBase.Weighted<TokenAllocatorBase.UnitInfo> wu : t.sortedUnits) + { + unitStat.addValue(wu.weight * size / t.tokensInUnits.get(wu.value.unit).size()); + } + su.update(unitStat); + + SummaryStatistics tokenStat = new SummaryStatistics(); + for (PriorityQueue<TokenAllocatorBase.Weighted<TokenAllocatorBase.TokenInfo>> tokens : t.tokensInUnits.values()) + { + for (TokenAllocatorBase.Weighted<TokenAllocatorBase.TokenInfo> token : tokens) + { + tokenStat.addValue(token.weight); + } + } + st.update(tokenStat); + + if (print) + { + System.out.format("Size %d(%d) \tunit %s token %s %s\n", + t.sortedUnits.size(), size, + mms(unitStat), + mms(tokenStat), + t.strategy); + System.out.format("Worst intermediate unit\t%s token %s\n", su, st); + } + } + + static class NoReplicationStrategy implements TestReplicationStrategy + { + public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens) + { + return Collections.singletonList(sortedTokens.ceilingEntry(token).getValue()); + } + + public Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens) + { + return sortedTokens.lowerKey(token); + } + + public String toString() + { + return "No replication"; + } + + public void addUnit(Unit n) + { + } + + public void removeUnit(Unit n) + { + } + + public int replicas() + { + return 1; + } + + public Object getGroup(Unit unit) + { + return unit; + } + + public double spreadExpectation() + { + return 1; + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java b/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java new file mode 100644 index 0000000..8612ac1 --- /dev/null +++ b/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht.tokenallocator; + +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Random; + +import org.apache.commons.math3.stat.descriptive.SummaryStatistics; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; + +/** + * Base class for {@link NoReplicationTokenAllocatorTest} and {@link AbstractReplicationAwareTokenAllocatorTest}, + */ +abstract class TokenAllocatorTestBase +{ + protected static final int TARGET_CLUSTER_SIZE = 250; + protected static final int MAX_VNODE_COUNT = 64; + + interface TestReplicationStrategy extends ReplicationStrategy<Unit> + { + void addUnit(Unit n); + + void removeUnit(Unit n); + + /** + * Returns a list of all replica units for given token. + */ + List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens); + + /** + * Returns the start of the token span that is replicated in this token. + * Note: Though this is not trivial to see, the replicated span is always contiguous. A token in the same + * group acts as a barrier; if one is not found the token replicates everything up to the replica'th distinct + * group seen in front of it. + */ + Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens); + + /** + * Multiplier for the acceptable disbalance in the cluster. With some strategies it is harder to achieve good + * results. + */ + double spreadExpectation(); + } + + interface TokenCount + { + int tokenCount(int perUnitCount, Random rand); + + double spreadExpectation(); + } + + TokenCount fixedTokenCount = new TokenCount() + { + public int tokenCount(int perUnitCount, Random rand) + { + return perUnitCount; + } + + public double spreadExpectation() + { + return 4; // High tolerance to avoid flakiness. + } + }; + + TokenCount varyingTokenCount = new TokenCount() + { + public int tokenCount(int perUnitCount, Random rand) + { + if (perUnitCount == 1) return 1; + // 25 to 175% + return rand.nextInt(perUnitCount * 3 / 2) + (perUnitCount + 3) / 4; + } + + public double spreadExpectation() + { + return 8; // High tolerance to avoid flakiness. + } + }; + + Random seededRand = new Random(2); + + public void random(Map<Token, Unit> map, TestReplicationStrategy rs, + int unitCount, TokenCount tc, int perUnitCount, IPartitioner partitioner) + { + System.out.format("\nRandom generation of %d units with %d tokens each\n", unitCount, perUnitCount); + Random rand = seededRand; + for (int i = 0; i < unitCount; i++) + { + Unit unit = new Unit(); + rs.addUnit(unit); + int tokens = tc.tokenCount(perUnitCount, rand); + for (int j = 0; j < tokens; j++) + { + map.put(partitioner.getRandomToken(rand), unit); + } + } + } + + public String mms(SummaryStatistics s) + { + return String.format("max %.2f min %.2f stddev %.4f", s.getMax(), s.getMin(), s.getStandardDeviation()); + } + + class Summary + { + double min = 1; + double max = 1; + double stddev = 0; + + void update(SummaryStatistics stat) + { + min = Math.min(min, stat.getMin()); + max = Math.max(max, stat.getMax()); + stddev = Math.max(stddev, stat.getStandardDeviation()); + } + + public String toString() + { + return String.format("max %.2f min %.2f stddev %.4f", max, min, stddev); + } + } + + int nextUnitId = 0; + + final class Unit implements Comparable<Unit> + { + int unitId = nextUnitId++; + + public String toString() + { + return Integer.toString(unitId); + } + + @Override + public int compareTo(Unit o) + { + return Integer.compare(unitId, o.unitId); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java index 87ba741..97f2dcc 100644 --- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java +++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java @@ -57,6 +57,11 @@ public class LengthPartitioner implements IPartitioner return new BigIntegerToken(midpair.left); } + public Token split(Token left, Token right, double ratioToLeft) + { + throw new UnsupportedOperationException(); + } + public BigIntegerToken getMinimumToken() { return MINIMUM; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java index 19aba40..ad81f7f 100644 --- a/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java +++ b/test/unit/org/apache/cassandra/dht/Murmur3PartitionerTest.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.dht; +import org.junit.Test; + public class Murmur3PartitionerTest extends PartitionerTestCase { public void initPartitioner() @@ -34,5 +36,26 @@ public class Murmur3PartitionerTest extends PartitionerTestCase assertMidpoint(mintoken, mintoken, 62); assertMidpoint(tok("a"), mintoken, 16); } + + @Test + public void testSplit() + { + assertSplit(tok("a"), tok("b"), 16); + assertSplit(tok("a"), tok("bbb"), 16); + } + + @Test + public void testSplitWrapping() + { + assertSplit(tok("b"), tok("a"), 16); + assertSplit(tok("bbb"), tok("a"), 16); + } + + @Test + public void testSplitExceedMaximumCase() + { + Murmur3Partitioner.LongToken left = new Murmur3Partitioner.LongToken(Long.MAX_VALUE - 100); + assertSplit(left, tok("a"), 16); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java index 30b773b..c411695 100644 --- a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java +++ b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java @@ -118,6 +118,38 @@ public abstract class PartitionerTestCase assertMidpoint(tok("bbb"), tok("a"), 16); } + /** + * Test split token ranges + */ + public void assertSplit(Token left, Token right, int depth) + { + Random rand = new Random(); + for (int i = 0; i < 1000; i++) + { + assertSplit(left, right ,rand, depth); + } + } + + private void assertSplit(Token left, Token right, Random rand, int depth) + { + double ratio = rand.nextDouble(); + Token newToken = partitioner.split(left, right, ratio); + + assert new Range<Token>(left, right).contains(newToken) + : "For " + left + "," + right + ": range did not contain new token:" + newToken; + + assertEquals("For " + left + "," + right + ", new token: " + newToken, + ratio, left.size(newToken) / left.size(right), 0.1); + + if (depth < 1) + return; + + if (rand.nextBoolean()) + assertSplit(left, newToken, rand, depth-1); + else + assertSplit(newToken, right, rand, depth-1); + } + @Test public void testTokenFactoryBytes() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2a0d75b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java index 5e68644..d7fe602 100644 --- a/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java +++ b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java @@ -18,10 +18,36 @@ package org.apache.cassandra.dht; +import java.math.BigInteger; + +import org.junit.Test; + public class RandomPartitionerTest extends PartitionerTestCase { public void initPartitioner() { partitioner = RandomPartitioner.instance; } + + @Test + public void testSplit() + { + assertSplit(tok("a"), tok("b"), 16); + assertSplit(tok("a"), tok("bbb"), 16); + } + + @Test + public void testSplitWrapping() + { + assertSplit(tok("b"), tok("a"), 16); + assertSplit(tok("bbb"), tok("a"), 16); + } + + @Test + public void testSplitExceedMaximumCase() + { + RandomPartitioner.BigIntegerToken left = new RandomPartitioner.BigIntegerToken(RandomPartitioner.MAXIMUM.subtract(BigInteger.valueOf(10))); + + assertSplit(left, tok("a"), 16); + } }
