Fix timeout in ReplicationAwareTokenAllocatorTest patch by Stefania Alborghetti; reviewed by Branimir Lambov for CASSANDRA-12784
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6ec31ba Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6ec31ba Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6ec31ba Branch: refs/heads/trunk Commit: c6ec31bada33b9b803d09a863414ea9cad72752e Parents: 7a5118c Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Thu Oct 13 16:32:58 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Tue Oct 18 09:08:56 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/dht/RandomPartitioner.java | 2 +- .../apache/cassandra/utils/GuidGenerator.java | 5 +- ...tractReplicationAwareTokenAllocatorTest.java | 716 ++++++++++++++++++ ...rmur3ReplicationAwareTokenAllocatorTest.java | 48 ++ ...andomReplicationAwareTokenAllocatorTest.java | 54 ++ .../ReplicationAwareTokenAllocatorTest.java | 755 ------------------- test/unit/org/apache/cassandra/Util.java | 20 +- 8 files changed, 835 insertions(+), 766 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d230462..32a2dfd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) * Improve sum aggregate functions (CASSANDRA-12417) * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) * cqlsh fails to format collections when using aliases (CASSANDRA-11534) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/src/java/org/apache/cassandra/dht/RandomPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index 7c8f6ac..ee3b862 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -93,7 +93,7 @@ public class RandomPartitioner implements IPartitioner public BigIntegerToken getRandomToken(Random random) { - BigInteger token = FBUtilities.hashToBigInteger(GuidGenerator.guidAsBytes(random)); + BigInteger token = FBUtilities.hashToBigInteger(GuidGenerator.guidAsBytes(random, 0)); if ( token.signum() == -1 ) token = token.multiply(BigInteger.valueOf(-1L)); return new BigIntegerToken(token); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/src/java/org/apache/cassandra/utils/GuidGenerator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/GuidGenerator.java b/src/java/org/apache/cassandra/utils/GuidGenerator.java index 0843344..c5ed7a7 100644 --- a/src/java/org/apache/cassandra/utils/GuidGenerator.java +++ b/src/java/org/apache/cassandra/utils/GuidGenerator.java @@ -76,10 +76,9 @@ public class GuidGenerator return convertToStandardFormat( sb.toString() ); } - public static ByteBuffer guidAsBytes(Random random) + public static ByteBuffer guidAsBytes(Random random, long time) { StringBuilder sbValueBeforeMD5 = new StringBuilder(); - long time = System.currentTimeMillis(); long rand = 0; rand = random.nextLong(); sbValueBeforeMD5.append(s_id) @@ -94,7 +93,7 @@ public class GuidGenerator public static ByteBuffer guidAsBytes() { - return guidAsBytes(myRand); + return guidAsBytes(myRand, System.currentTimeMillis()); } /* http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java new file mode 100644 index 0000000..80e980a --- /dev/null +++ b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.dht.tokenallocator; + +import java.util.*; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.commons.math3.stat.descriptive.SummaryStatistics; + +import org.junit.Assert; +import org.junit.Ignore; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; + +/** + * Base class for {@link Murmur3ReplicationAwareTokenAllocatorTest} and {@link RandomReplicationAwareTokenAllocatorTest}, + * we need to separate classes to avoid timeous in case flaky tests need to be repeated, see CASSANDRA-12784. + */ +@Ignore +abstract class AbstractReplicationAwareTokenAllocatorTest +{ + private static final int TARGET_CLUSTER_SIZE = 250; + + interface TestReplicationStrategy extends ReplicationStrategy<Unit> + { + void addUnit(Unit n); + + void removeUnit(Unit n); + + /** + * Returns a list of all replica units for given token. + */ + List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens); + + /** + * Returns the start of the token span that is replicated in this token. + * Note: Though this is not trivial to see, the replicated span is always contiguous. A token in the same + * group acts as a barrier; if one is not found the token replicates everything up to the replica'th distinct + * group seen in front of it. + */ + Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens); + + /** + * Multiplier for the acceptable disbalance in the cluster. With some strategies it is harder to achieve good + * results. + */ + public double spreadExpectation(); + } + + static class NoReplicationStrategy implements TestReplicationStrategy + { + public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens) + { + return Collections.singletonList(sortedTokens.ceilingEntry(token).getValue()); + } + + public Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens) + { + return sortedTokens.lowerKey(token); + } + + public String toString() + { + return "No replication"; + } + + public void addUnit(Unit n) + { + } + + public void removeUnit(Unit n) + { + } + + public int replicas() + { + return 1; + } + + public boolean sameGroup(Unit n1, Unit n2) + { + return false; + } + + public Object getGroup(Unit unit) + { + return unit; + } + + public double spreadExpectation() + { + return 1; + } + } + + static class SimpleReplicationStrategy implements TestReplicationStrategy + { + int replicas; + + public SimpleReplicationStrategy(int replicas) + { + super(); + this.replicas = replicas; + } + + public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens) + { + List<Unit> endpoints = new ArrayList<Unit>(replicas); + + token = sortedTokens.ceilingKey(token); + if (token == null) + token = sortedTokens.firstKey(); + Iterator<Unit> iter = Iterables.concat(sortedTokens.tailMap(token, true).values(), sortedTokens.values()).iterator(); + while (endpoints.size() < replicas) + { + if (!iter.hasNext()) + return endpoints; + Unit ep = iter.next(); + if (!endpoints.contains(ep)) + endpoints.add(ep); + } + return endpoints; + } + + public Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens) + { + Set<Unit> seenUnits = Sets.newHashSet(); + int unitsFound = 0; + + for (Map.Entry<Token, Unit> en : Iterables.concat( + sortedTokens.headMap(token, false).descendingMap().entrySet(), + sortedTokens.descendingMap().entrySet())) + { + Unit n = en.getValue(); + // Same group as investigated unit is a break; anything that could replicate in it replicates there. + if (n == unit) + break; + + if (seenUnits.add(n)) + { + if (++unitsFound == replicas) + break; + } + token = en.getKey(); + } + return token; + } + + public void addUnit(Unit n) + { + } + + public void removeUnit(Unit n) + { + } + + public String toString() + { + return String.format("Simple %d replicas", replicas); + } + + public int replicas() + { + return replicas; + } + + public boolean sameGroup(Unit n1, Unit n2) + { + return false; + } + + public Unit getGroup(Unit unit) + { + // The unit is the group. + return unit; + } + + public double spreadExpectation() + { + return 1; + } + } + + static abstract class GroupReplicationStrategy implements TestReplicationStrategy + { + final int replicas; + final Map<Unit, Integer> groupMap; + + public GroupReplicationStrategy(int replicas) + { + this.replicas = replicas; + this.groupMap = Maps.newHashMap(); + } + + public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens) + { + List<Unit> endpoints = new ArrayList<Unit>(replicas); + BitSet usedGroups = new BitSet(); + + if (sortedTokens.isEmpty()) + return endpoints; + + token = sortedTokens.ceilingKey(token); + if (token == null) + token = sortedTokens.firstKey(); + Iterator<Unit> iter = Iterables.concat(sortedTokens.tailMap(token, true).values(), sortedTokens.values()).iterator(); + while (endpoints.size() < replicas) + { + // For simlicity assuming list can't be exhausted before finding all replicas. + Unit ep = iter.next(); + int group = groupMap.get(ep); + if (!usedGroups.get(group)) + { + endpoints.add(ep); + usedGroups.set(group); + } + } + return endpoints; + } + + public Token lastReplicaToken(Token token, NavigableMap<Token, Unit> sortedTokens) + { + BitSet usedGroups = new BitSet(); + int groupsFound = 0; + + token = sortedTokens.ceilingKey(token); + if (token == null) + token = sortedTokens.firstKey(); + for (Map.Entry<Token, Unit> en : + Iterables.concat(sortedTokens.tailMap(token, true).entrySet(), + sortedTokens.entrySet())) + { + Unit ep = en.getValue(); + int group = groupMap.get(ep); + if (!usedGroups.get(group)) + { + usedGroups.set(group); + if (++groupsFound >= replicas) + return en.getKey(); + } + } + return token; + } + + public Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens) + { + // replicated ownership + int unitGroup = groupMap.get(unit); // unit must be already added + BitSet seenGroups = new BitSet(); + int groupsFound = 0; + + for (Map.Entry<Token, Unit> en : Iterables.concat( + sortedTokens.headMap(token, false).descendingMap().entrySet(), + sortedTokens.descendingMap().entrySet())) + { + Unit n = en.getValue(); + int ngroup = groupMap.get(n); + // Same group as investigated unit is a break; anything that could replicate in it replicates there. + if (ngroup == unitGroup) + break; + + if (!seenGroups.get(ngroup)) + { + if (++groupsFound == replicas) + break; + seenGroups.set(ngroup); + } + token = en.getKey(); + } + return token; + } + + public String toString() + { + Map<Integer, Integer> idToSize = instanceToCount(groupMap); + Map<Integer, Integer> sizeToCount = Maps.newTreeMap(); + sizeToCount.putAll(instanceToCount(idToSize)); + return String.format("%s strategy, %d replicas, group size to count %s", getClass().getSimpleName(), replicas, sizeToCount); + } + + @Override + public int replicas() + { + return replicas; + } + + public boolean sameGroup(Unit n1, Unit n2) + { + return groupMap.get(n1).equals(groupMap.get(n2)); + } + + public void removeUnit(Unit n) + { + groupMap.remove(n); + } + + public Integer getGroup(Unit unit) + { + return groupMap.get(unit); + } + + public double spreadExpectation() + { + return 1.5; // Even balanced racks get disbalanced when they lose nodes. + } + } + + private static <T> Map<T, Integer> instanceToCount(Map<?, T> map) + { + Map<T, Integer> idToCount = Maps.newHashMap(); + for (Map.Entry<?, T> en : map.entrySet()) + { + Integer old = idToCount.get(en.getValue()); + idToCount.put(en.getValue(), old != null ? old + 1 : 1); + } + return idToCount; + } + + /** + * Group strategy spreading units into a fixed number of groups. + */ + static class FixedGroupCountReplicationStrategy extends GroupReplicationStrategy + { + int groupId; + int groupCount; + + public FixedGroupCountReplicationStrategy(int replicas, int groupCount) + { + super(replicas); + assert groupCount >= replicas; + groupId = 0; + this.groupCount = groupCount; + } + + public void addUnit(Unit n) + { + groupMap.put(n, groupId++ % groupCount); + } + } + + /** + * Group strategy with a fixed number of units per group. + */ + static class BalancedGroupReplicationStrategy extends GroupReplicationStrategy + { + int groupId; + int groupSize; + + public BalancedGroupReplicationStrategy(int replicas, int groupSize) + { + super(replicas); + groupId = 0; + this.groupSize = groupSize; + } + + public void addUnit(Unit n) + { + groupMap.put(n, groupId++ / groupSize); + } + } + + static class UnbalancedGroupReplicationStrategy extends GroupReplicationStrategy + { + int groupId; + int nextSize; + int num; + int minGroupSize; + int maxGroupSize; + Random rand; + + public UnbalancedGroupReplicationStrategy(int replicas, int minGroupSize, int maxGroupSize, Random rand) + { + super(replicas); + groupId = -1; + nextSize = 0; + num = 0; + this.maxGroupSize = maxGroupSize; + this.minGroupSize = minGroupSize; + this.rand = rand; + } + + public void addUnit(Unit n) + { + if (++num > nextSize) + { + nextSize = minGroupSize + rand.nextInt(maxGroupSize - minGroupSize + 1); + ++groupId; + num = 0; + } + groupMap.put(n, groupId); + } + + public double spreadExpectation() + { + return 2; + } + } + + static Map<Unit, Double> evaluateReplicatedOwnership(ReplicationAwareTokenAllocator<Unit> t) + { + Map<Unit, Double> ownership = Maps.newHashMap(); + Iterator<Token> it = t.sortedTokens.keySet().iterator(); + if (!it.hasNext()) + return ownership; + + Token current = it.next(); + while (it.hasNext()) + { + Token next = it.next(); + addOwnership(t, current, next, ownership); + current = next; + } + addOwnership(t, current, t.sortedTokens.firstKey(), ownership); + + return ownership; + } + + private static void addOwnership(ReplicationAwareTokenAllocator<Unit> t, Token current, Token next, Map<Unit, Double> ownership) + { + TestReplicationStrategy ts = (TestReplicationStrategy) t.strategy; + double size = current.size(next); + Token representative = t.partitioner.midpoint(current, next); + for (Unit n : ts.getReplicas(representative, t.sortedTokens)) + { + Double v = ownership.get(n); + ownership.put(n, v != null ? v + size : size); + } + } + + private static double replicatedTokenOwnership(Token token, NavigableMap<Token, Unit> sortedTokens, ReplicationStrategy<Unit> strategy) + { + TestReplicationStrategy ts = (TestReplicationStrategy) strategy; + Token next = sortedTokens.higherKey(token); + if (next == null) + next = sortedTokens.firstKey(); + return ts.replicationStart(token, sortedTokens.get(token), sortedTokens).size(next); + } + + static interface TokenCount + { + int tokenCount(int perUnitCount, Random rand); + + double spreadExpectation(); + } + + static TokenCount fixedTokenCount = new TokenCount() + { + public int tokenCount(int perUnitCount, Random rand) + { + return perUnitCount; + } + + public double spreadExpectation() + { + return 4; // High tolerance to avoid flakiness. + } + }; + + static TokenCount varyingTokenCount = new TokenCount() + { + public int tokenCount(int perUnitCount, Random rand) + { + if (perUnitCount == 1) return 1; + // 25 to 175% + return rand.nextInt(perUnitCount * 3 / 2) + (perUnitCount + 3) / 4; + } + + public double spreadExpectation() + { + return 8; // High tolerance to avoid flakiness. + } + }; + + Random seededRand = new Random(2); + + private void random(Map<Token, Unit> map, TestReplicationStrategy rs, + int unitCount, TokenCount tc, int perUnitCount, IPartitioner partitioner) + { + System.out.format("\nRandom generation of %d units with %d tokens each\n", unitCount, perUnitCount); + Random rand = seededRand; + for (int i = 0; i < unitCount; i++) + { + Unit unit = new Unit(); + rs.addUnit(unit); + int tokens = tc.tokenCount(perUnitCount, rand); + for (int j = 0; j < tokens; j++) + { + map.put(partitioner.getRandomToken(rand), unit); + } + } + } + + protected void testExistingCluster(IPartitioner partitioner, int maxVNodeCount) + { + for (int rf = 1; rf <= 5; ++rf) + { + for (int perUnitCount = 1; perUnitCount <= maxVNodeCount; perUnitCount *= 4) + { + testExistingCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf), partitioner); + testExistingCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf), partitioner); + if (rf == 1) continue; // Replication strategy doesn't matter for RF = 1. + for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 4 < TARGET_CLUSTER_SIZE; groupSize *= 4) + { + testExistingCluster(perUnitCount, fixedTokenCount, + new BalancedGroupReplicationStrategy(rf, groupSize), partitioner); + testExistingCluster(perUnitCount, varyingTokenCount, + new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand), + partitioner); + } + testExistingCluster(perUnitCount, fixedTokenCount, + new FixedGroupCountReplicationStrategy(rf, rf * 2), partitioner); + } + } + } + + private void testExistingCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs, IPartitioner partitioner) + { + System.out.println("Testing existing cluster, target " + perUnitCount + " vnodes, replication " + rs); + final int targetClusterSize = TARGET_CLUSTER_SIZE; + NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap(); + + random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount, partitioner); + + ReplicationAwareTokenAllocator<Unit> t = new ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner); + grow(t, targetClusterSize * 9 / 10, tc, perUnitCount, false); + grow(t, targetClusterSize, tc, perUnitCount, true); + loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount, partitioner); + System.out.println(); + } + + protected void testNewCluster(IPartitioner partitioner, int maxVNodeCount) + { + // This test is flaky because the selection of the tokens for the first RF nodes (which is random, with an + // uncontrolled seed) can sometimes cause a pathological situation where the algorithm will find a (close to) + // ideal distribution of tokens for some number of nodes, which in turn will inevitably cause it to go into a + // bad (unacceptable to the test criteria) distribution after adding one more node. + + // This should happen very rarely, unless something is broken in the token allocation code. + + for (int rf = 2; rf <= 5; ++rf) + { + for (int perUnitCount = 1; perUnitCount <= maxVNodeCount; perUnitCount *= 4) + { + testNewCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf), partitioner); + testNewCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf), partitioner); + if (rf == 1) continue; // Replication strategy doesn't matter for RF = 1. + for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 8 < TARGET_CLUSTER_SIZE; groupSize *= 4) + { + testNewCluster(perUnitCount, fixedTokenCount, + new BalancedGroupReplicationStrategy(rf, groupSize), partitioner); + testNewCluster(perUnitCount, varyingTokenCount, + new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand), + partitioner); + } + testNewCluster(perUnitCount, fixedTokenCount, + new FixedGroupCountReplicationStrategy(rf, rf * 2), partitioner); + } + } + } + + private void testNewCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs, IPartitioner partitioner) + { + System.out.println("Testing new cluster, target " + perUnitCount + " vnodes, replication " + rs); + final int targetClusterSize = TARGET_CLUSTER_SIZE; + NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap(); + + ReplicationAwareTokenAllocator<Unit> t = new ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner); + grow(t, targetClusterSize * 2 / 5, tc, perUnitCount, false); + grow(t, targetClusterSize, tc, perUnitCount, true); + loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount, partitioner); + System.out.println(); + } + + private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int howMany, + TokenCount tc, int perUnitCount, IPartitioner partitioner) + { + int fullCount = t.unitCount(); + System.out.format("Losing %d units. ", howMany); + for (int i = 0; i < howMany; ++i) + { + Unit u = t.unitFor(partitioner.getRandomToken(seededRand)); + t.removeUnit(u); + ((TestReplicationStrategy) t.strategy).removeUnit(u); + } + // Grow half without verifying. + grow(t, (t.unitCount() + fullCount * 3) / 4, tc, perUnitCount, false); + // Metrics should be back to normal by now. Check that they remain so. + grow(t, fullCount, tc, perUnitCount, true); + } + + static class Summary + { + double min = 1; + double max = 1; + double stddev = 0; + + void update(SummaryStatistics stat) + { + min = Math.min(min, stat.getMin()); + max = Math.max(max, stat.getMax()); + stddev = Math.max(stddev, stat.getStandardDeviation()); + } + + public String toString() + { + return String.format("max %.2f min %.2f stddev %.4f", max, min, stddev); + } + } + + public void grow(ReplicationAwareTokenAllocator<Unit> t, int targetClusterSize, TokenCount tc, int perUnitCount, boolean verifyMetrics) + { + int size = t.unitCount(); + Summary su = new Summary(); + Summary st = new Summary(); + Random rand = new Random(targetClusterSize + perUnitCount); + TestReplicationStrategy strategy = (TestReplicationStrategy) t.strategy; + if (size < targetClusterSize) + { + System.out.format("Adding %d unit(s) using %s...", targetClusterSize - size, t.toString()); + long time = System.currentTimeMillis(); + while (size < targetClusterSize) + { + int tokens = tc.tokenCount(perUnitCount, rand); + Unit unit = new Unit(); + strategy.addUnit(unit); + t.addUnit(unit, tokens); + ++size; + if (verifyMetrics) + updateSummary(t, su, st, false); + } + System.out.format(" Done in %.3fs\n", (System.currentTimeMillis() - time) / 1000.0); + if (verifyMetrics) + { + updateSummary(t, su, st, true); + double maxExpected = 1.0 + tc.spreadExpectation() * strategy.spreadExpectation() / (perUnitCount * t.replicas); + if (su.max > maxExpected) + { + Assert.fail(String.format("Expected max unit size below %.4f, was %.4f", maxExpected, su.max)); + } + // We can't verify lower side range as small loads can't always be fixed. + } + } + } + + + private void updateSummary(ReplicationAwareTokenAllocator<Unit> t, Summary su, Summary st, boolean print) + { + int size = t.sortedTokens.size(); + double inverseAverage = 1.0 * size / t.strategy.replicas(); + + Map<Unit, Double> ownership = evaluateReplicatedOwnership(t); + SummaryStatistics unitStat = new SummaryStatistics(); + for (Map.Entry<Unit, Double> en : ownership.entrySet()) + unitStat.addValue(en.getValue() * inverseAverage / t.unitToTokens.get(en.getKey()).size()); + su.update(unitStat); + + SummaryStatistics tokenStat = new SummaryStatistics(); + for (Token tok : t.sortedTokens.keySet()) + tokenStat.addValue(replicatedTokenOwnership(tok, t.sortedTokens, t.strategy) * inverseAverage); + st.update(tokenStat); + + if (print) + { + System.out.format("Size %d(%d) \tunit %s token %s %s\n", + t.unitCount(), size, + mms(unitStat), + mms(tokenStat), + t.strategy); + System.out.format("Worst intermediate unit\t%s token %s\n", su, st); + } + } + + + private static String mms(SummaryStatistics s) + { + return String.format("max %.2f min %.2f stddev %.4f", s.getMax(), s.getMin(), s.getStandardDeviation()); + } + + + int nextUnitId = 0; + + final class Unit implements Comparable<Unit> + { + int unitId = nextUnitId++; + + public String toString() + { + return Integer.toString(unitId); + } + + @Override + public int compareTo(Unit o) + { + return Integer.compare(unitId, o.unitId); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/long/org/apache/cassandra/dht/tokenallocator/Murmur3ReplicationAwareTokenAllocatorTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/Murmur3ReplicationAwareTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/Murmur3ReplicationAwareTokenAllocatorTest.java new file mode 100644 index 0000000..e28ecfa --- /dev/null +++ b/test/long/org/apache/cassandra/dht/tokenallocator/Murmur3ReplicationAwareTokenAllocatorTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht.tokenallocator; + +import org.junit.Test; + +import org.apache.cassandra.Util; +import org.apache.cassandra.dht.Murmur3Partitioner; + +public class Murmur3ReplicationAwareTokenAllocatorTest extends AbstractReplicationAwareTokenAllocatorTest +{ + private static final int MAX_VNODE_COUNT = 64; + + @Test + public void testExistingCluster() + { + super.testExistingCluster(new Murmur3Partitioner(), MAX_VNODE_COUNT); + } + + @Test + public void testNewCluster() + { + Util.flakyTest(this::flakyTestNewCluster, + 2, + "It tends to fail sometimes due to the random selection of the tokens in the first few nodes."); + } + + private void flakyTestNewCluster() + { + testNewCluster(new Murmur3Partitioner(), MAX_VNODE_COUNT); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java new file mode 100644 index 0000000..bd94442 --- /dev/null +++ b/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht.tokenallocator; + +import org.junit.Test; + +import org.apache.cassandra.Util; +import org.apache.cassandra.dht.RandomPartitioner; + +public class RandomReplicationAwareTokenAllocatorTest extends AbstractReplicationAwareTokenAllocatorTest +{ + /** The maximum number of vnodes to use in the tests. + * For RandomPartitioner we use a smaller number because + * the tests take much longer and would otherwise timeout, + * see CASSANDRA-12784. + * */ + private static final int MAX_VNODE_COUNT = 16; + + @Test + public void testExistingCluster() + { + testExistingCluster(new RandomPartitioner(), MAX_VNODE_COUNT); + } + + @Test + public void testNewClusterr() + { + Util.flakyTest(this::flakyTestNewCluster, + 3, + "It tends to fail sometimes due to the random selection of the tokens in the first few nodes."); + } + + private void flakyTestNewCluster() + { + testNewCluster(new RandomPartitioner(), MAX_VNODE_COUNT); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java deleted file mode 100644 index 482e2ac..0000000 --- a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java +++ /dev/null @@ -1,755 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.dht.tokenallocator; - -import java.util.*; - -import junit.framework.Assert; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import org.apache.commons.math3.stat.descriptive.SummaryStatistics; - -import org.junit.Test; - -import org.apache.cassandra.Util; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.dht.RandomPartitioner; -import org.apache.cassandra.dht.Token; - -public class ReplicationAwareTokenAllocatorTest -{ - private static final int MAX_VNODE_COUNT = 64; - - private static final int TARGET_CLUSTER_SIZE = 250; - - interface TestReplicationStrategy extends ReplicationStrategy<Unit> - { - void addUnit(Unit n); - - void removeUnit(Unit n); - - /** - * Returns a list of all replica units for given token. - */ - List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens); - - /** - * Returns the start of the token span that is replicated in this token. - * Note: Though this is not trivial to see, the replicated span is always contiguous. A token in the same - * group acts as a barrier; if one is not found the token replicates everything up to the replica'th distinct - * group seen in front of it. - */ - Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens); - - /** - * Multiplier for the acceptable disbalance in the cluster. With some strategies it is harder to achieve good - * results. - */ - public double spreadExpectation(); - } - - static class NoReplicationStrategy implements TestReplicationStrategy - { - public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens) - { - return Collections.singletonList(sortedTokens.ceilingEntry(token).getValue()); - } - - public Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens) - { - return sortedTokens.lowerKey(token); - } - - public String toString() - { - return "No replication"; - } - - public void addUnit(Unit n) - { - } - - public void removeUnit(Unit n) - { - } - - public int replicas() - { - return 1; - } - - public boolean sameGroup(Unit n1, Unit n2) - { - return false; - } - - public Object getGroup(Unit unit) - { - return unit; - } - - public double spreadExpectation() - { - return 1; - } - } - - static class SimpleReplicationStrategy implements TestReplicationStrategy - { - int replicas; - - public SimpleReplicationStrategy(int replicas) - { - super(); - this.replicas = replicas; - } - - public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens) - { - List<Unit> endpoints = new ArrayList<Unit>(replicas); - - token = sortedTokens.ceilingKey(token); - if (token == null) - token = sortedTokens.firstKey(); - Iterator<Unit> iter = Iterables.concat(sortedTokens.tailMap(token, true).values(), sortedTokens.values()).iterator(); - while (endpoints.size() < replicas) - { - if (!iter.hasNext()) - return endpoints; - Unit ep = iter.next(); - if (!endpoints.contains(ep)) - endpoints.add(ep); - } - return endpoints; - } - - public Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens) - { - Set<Unit> seenUnits = Sets.newHashSet(); - int unitsFound = 0; - - for (Map.Entry<Token, Unit> en : Iterables.concat( - sortedTokens.headMap(token, false).descendingMap().entrySet(), - sortedTokens.descendingMap().entrySet())) - { - Unit n = en.getValue(); - // Same group as investigated unit is a break; anything that could replicate in it replicates there. - if (n == unit) - break; - - if (seenUnits.add(n)) - { - if (++unitsFound == replicas) - break; - } - token = en.getKey(); - } - return token; - } - - public void addUnit(Unit n) - { - } - - public void removeUnit(Unit n) - { - } - - public String toString() - { - return String.format("Simple %d replicas", replicas); - } - - public int replicas() - { - return replicas; - } - - public boolean sameGroup(Unit n1, Unit n2) - { - return false; - } - - public Unit getGroup(Unit unit) - { - // The unit is the group. - return unit; - } - - public double spreadExpectation() - { - return 1; - } - } - - static abstract class GroupReplicationStrategy implements TestReplicationStrategy - { - final int replicas; - final Map<Unit, Integer> groupMap; - - public GroupReplicationStrategy(int replicas) - { - this.replicas = replicas; - this.groupMap = Maps.newHashMap(); - } - - public List<Unit> getReplicas(Token token, NavigableMap<Token, Unit> sortedTokens) - { - List<Unit> endpoints = new ArrayList<Unit>(replicas); - BitSet usedGroups = new BitSet(); - - if (sortedTokens.isEmpty()) - return endpoints; - - token = sortedTokens.ceilingKey(token); - if (token == null) - token = sortedTokens.firstKey(); - Iterator<Unit> iter = Iterables.concat(sortedTokens.tailMap(token, true).values(), sortedTokens.values()).iterator(); - while (endpoints.size() < replicas) - { - // For simlicity assuming list can't be exhausted before finding all replicas. - Unit ep = iter.next(); - int group = groupMap.get(ep); - if (!usedGroups.get(group)) - { - endpoints.add(ep); - usedGroups.set(group); - } - } - return endpoints; - } - - public Token lastReplicaToken(Token token, NavigableMap<Token, Unit> sortedTokens) - { - BitSet usedGroups = new BitSet(); - int groupsFound = 0; - - token = sortedTokens.ceilingKey(token); - if (token == null) - token = sortedTokens.firstKey(); - for (Map.Entry<Token, Unit> en : - Iterables.concat(sortedTokens.tailMap(token, true).entrySet(), - sortedTokens.entrySet())) - { - Unit ep = en.getValue(); - int group = groupMap.get(ep); - if (!usedGroups.get(group)) - { - usedGroups.set(group); - if (++groupsFound >= replicas) - return en.getKey(); - } - } - return token; - } - - public Token replicationStart(Token token, Unit unit, NavigableMap<Token, Unit> sortedTokens) - { - // replicated ownership - int unitGroup = groupMap.get(unit); // unit must be already added - BitSet seenGroups = new BitSet(); - int groupsFound = 0; - - for (Map.Entry<Token, Unit> en : Iterables.concat( - sortedTokens.headMap(token, false).descendingMap().entrySet(), - sortedTokens.descendingMap().entrySet())) - { - Unit n = en.getValue(); - int ngroup = groupMap.get(n); - // Same group as investigated unit is a break; anything that could replicate in it replicates there. - if (ngroup == unitGroup) - break; - - if (!seenGroups.get(ngroup)) - { - if (++groupsFound == replicas) - break; - seenGroups.set(ngroup); - } - token = en.getKey(); - } - return token; - } - - public String toString() - { - Map<Integer, Integer> idToSize = instanceToCount(groupMap); - Map<Integer, Integer> sizeToCount = Maps.newTreeMap(); - sizeToCount.putAll(instanceToCount(idToSize)); - return String.format("%s strategy, %d replicas, group size to count %s", getClass().getSimpleName(), replicas, sizeToCount); - } - - @Override - public int replicas() - { - return replicas; - } - - public boolean sameGroup(Unit n1, Unit n2) - { - return groupMap.get(n1).equals(groupMap.get(n2)); - } - - public void removeUnit(Unit n) - { - groupMap.remove(n); - } - - public Integer getGroup(Unit unit) - { - return groupMap.get(unit); - } - - public double spreadExpectation() - { - return 1.5; // Even balanced racks get disbalanced when they lose nodes. - } - } - - private static <T> Map<T, Integer> instanceToCount(Map<?, T> map) - { - Map<T, Integer> idToCount = Maps.newHashMap(); - for (Map.Entry<?, T> en : map.entrySet()) - { - Integer old = idToCount.get(en.getValue()); - idToCount.put(en.getValue(), old != null ? old + 1 : 1); - } - return idToCount; - } - - /** - * Group strategy spreading units into a fixed number of groups. - */ - static class FixedGroupCountReplicationStrategy extends GroupReplicationStrategy - { - int groupId; - int groupCount; - - public FixedGroupCountReplicationStrategy(int replicas, int groupCount) - { - super(replicas); - assert groupCount >= replicas; - groupId = 0; - this.groupCount = groupCount; - } - - public void addUnit(Unit n) - { - groupMap.put(n, groupId++ % groupCount); - } - } - - /** - * Group strategy with a fixed number of units per group. - */ - static class BalancedGroupReplicationStrategy extends GroupReplicationStrategy - { - int groupId; - int groupSize; - - public BalancedGroupReplicationStrategy(int replicas, int groupSize) - { - super(replicas); - groupId = 0; - this.groupSize = groupSize; - } - - public void addUnit(Unit n) - { - groupMap.put(n, groupId++ / groupSize); - } - } - - static class UnbalancedGroupReplicationStrategy extends GroupReplicationStrategy - { - int groupId; - int nextSize; - int num; - int minGroupSize; - int maxGroupSize; - Random rand; - - public UnbalancedGroupReplicationStrategy(int replicas, int minGroupSize, int maxGroupSize, Random rand) - { - super(replicas); - groupId = -1; - nextSize = 0; - num = 0; - this.maxGroupSize = maxGroupSize; - this.minGroupSize = minGroupSize; - this.rand = rand; - } - - public void addUnit(Unit n) - { - if (++num > nextSize) - { - nextSize = minGroupSize + rand.nextInt(maxGroupSize - minGroupSize + 1); - ++groupId; - num = 0; - } - groupMap.put(n, groupId); - } - - public double spreadExpectation() - { - return 2; - } - } - - static Map<Unit, Double> evaluateReplicatedOwnership(ReplicationAwareTokenAllocator<Unit> t) - { - Map<Unit, Double> ownership = Maps.newHashMap(); - Iterator<Token> it = t.sortedTokens.keySet().iterator(); - if (!it.hasNext()) - return ownership; - - Token current = it.next(); - while (it.hasNext()) - { - Token next = it.next(); - addOwnership(t, current, next, ownership); - current = next; - } - addOwnership(t, current, t.sortedTokens.firstKey(), ownership); - - return ownership; - } - - private static void addOwnership(ReplicationAwareTokenAllocator<Unit> t, Token current, Token next, Map<Unit, Double> ownership) - { - TestReplicationStrategy ts = (TestReplicationStrategy) t.strategy; - double size = current.size(next); - Token representative = t.partitioner.midpoint(current, next); - for (Unit n : ts.getReplicas(representative, t.sortedTokens)) - { - Double v = ownership.get(n); - ownership.put(n, v != null ? v + size : size); - } - } - - private static double replicatedTokenOwnership(Token token, NavigableMap<Token, Unit> sortedTokens, ReplicationStrategy<Unit> strategy) - { - TestReplicationStrategy ts = (TestReplicationStrategy) strategy; - Token next = sortedTokens.higherKey(token); - if (next == null) - next = sortedTokens.firstKey(); - return ts.replicationStart(token, sortedTokens.get(token), sortedTokens).size(next); - } - - static interface TokenCount - { - int tokenCount(int perUnitCount, Random rand); - - double spreadExpectation(); - } - - static TokenCount fixedTokenCount = new TokenCount() - { - public int tokenCount(int perUnitCount, Random rand) - { - return perUnitCount; - } - - public double spreadExpectation() - { - return 4; // High tolerance to avoid flakiness. - } - }; - - static TokenCount varyingTokenCount = new TokenCount() - { - public int tokenCount(int perUnitCount, Random rand) - { - if (perUnitCount == 1) return 1; - // 25 to 175% - return rand.nextInt(perUnitCount * 3 / 2) + (perUnitCount + 3) / 4; - } - - public double spreadExpectation() - { - return 8; // High tolerance to avoid flakiness. - } - }; - - Random seededRand = new Random(2); - - private void random(Map<Token, Unit> map, TestReplicationStrategy rs, - int unitCount, TokenCount tc, int perUnitCount, IPartitioner partitioner) - { - System.out.format("\nRandom generation of %d units with %d tokens each\n", unitCount, perUnitCount); - Random rand = seededRand; - for (int i = 0; i < unitCount; i++) - { - Unit unit = new Unit(); - rs.addUnit(unit); - int tokens = tc.tokenCount(perUnitCount, rand); - for (int j = 0; j < tokens; j++) - { - map.put(partitioner.getRandomToken(rand), unit); - } - } - } - - @Test - public void testExistingClusterWithRandomPartitioner() - { - testExistingCluster(new RandomPartitioner()); - } - - @Test - public void testExistingClusterWithMurmur3Partitioner() - { - testExistingCluster(new Murmur3Partitioner()); - } - - public void testExistingCluster(IPartitioner partitioner) - { - for (int rf = 1; rf <= 5; ++rf) - { - for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4) - { - testExistingCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf), partitioner); - testExistingCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf), partitioner); - if (rf == 1) continue; // Replication strategy doesn't matter for RF = 1. - for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 4 < TARGET_CLUSTER_SIZE; groupSize *= 4) - { - testExistingCluster(perUnitCount, fixedTokenCount, - new BalancedGroupReplicationStrategy(rf, groupSize), partitioner); - testExistingCluster(perUnitCount, varyingTokenCount, - new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand), - partitioner); - } - testExistingCluster(perUnitCount, fixedTokenCount, - new FixedGroupCountReplicationStrategy(rf, rf * 2), partitioner); - } - } - } - - public void testExistingCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs, IPartitioner partitioner) - { - System.out.println("Testing existing cluster, target " + perUnitCount + " vnodes, replication " + rs); - final int targetClusterSize = TARGET_CLUSTER_SIZE; - NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap(); - - random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount, partitioner); - - ReplicationAwareTokenAllocator<Unit> t = new ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner); - grow(t, targetClusterSize * 9 / 10, tc, perUnitCount, false); - grow(t, targetClusterSize, tc, perUnitCount, true); - loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount, partitioner); - System.out.println(); - } - - @Test - public void testNewClusterWithRandomPartitioner() - { - Util.flakyTest(this::flakyTestNewClusterWithRandomPartitioner, - 5, - "It tends to fail sometimes due to the random selection of the tokens in the first few nodes."); - } - - @Test - public void testNewClusterWithMurmur3Partitioner() - { - Util.flakyTest(this::flakyTestNewClusterWithMurmur3Partitioner, - 5, - "It tends to fail sometimes due to the random selection of the tokens in the first few nodes."); - } - - public void flakyTestNewClusterWithRandomPartitioner() - { - flakyTestNewCluster(new RandomPartitioner()); - } - - public void flakyTestNewClusterWithMurmur3Partitioner() - { - flakyTestNewCluster(new Murmur3Partitioner()); - } - - public void flakyTestNewCluster(IPartitioner partitioner) - { - // This test is flaky because the selection of the tokens for the first RF nodes (which is random, with an - // uncontrolled seed) can sometimes cause a pathological situation where the algorithm will find a (close to) - // ideal distribution of tokens for some number of nodes, which in turn will inevitably cause it to go into a - // bad (unacceptable to the test criteria) distribution after adding one more node. - - // This should happen very rarely, unless something is broken in the token allocation code. - - for (int rf = 2; rf <= 5; ++rf) - { - for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4) - { - testNewCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf), partitioner); - testNewCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf), partitioner); - if (rf == 1) continue; // Replication strategy doesn't matter for RF = 1. - for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 8 < TARGET_CLUSTER_SIZE; groupSize *= 4) - { - testNewCluster(perUnitCount, fixedTokenCount, - new BalancedGroupReplicationStrategy(rf, groupSize), partitioner); - testNewCluster(perUnitCount, varyingTokenCount, - new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand), - partitioner); - } - testNewCluster(perUnitCount, fixedTokenCount, - new FixedGroupCountReplicationStrategy(rf, rf * 2), partitioner); - } - } - } - - public void testNewCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs, IPartitioner partitioner) - { - System.out.println("Testing new cluster, target " + perUnitCount + " vnodes, replication " + rs); - final int targetClusterSize = TARGET_CLUSTER_SIZE; - NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap(); - - ReplicationAwareTokenAllocator<Unit> t = new ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner); - grow(t, targetClusterSize * 2 / 5, tc, perUnitCount, false); - grow(t, targetClusterSize, tc, perUnitCount, true); - loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount, partitioner); - System.out.println(); - } - - private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int howMany, - TokenCount tc, int perUnitCount, IPartitioner partitioner) - { - int fullCount = t.unitCount(); - System.out.format("Losing %d units. ", howMany); - for (int i = 0; i < howMany; ++i) - { - Unit u = t.unitFor(partitioner.getRandomToken(seededRand)); - t.removeUnit(u); - ((TestReplicationStrategy) t.strategy).removeUnit(u); - } - // Grow half without verifying. - grow(t, (t.unitCount() + fullCount * 3) / 4, tc, perUnitCount, false); - // Metrics should be back to normal by now. Check that they remain so. - grow(t, fullCount, tc, perUnitCount, true); - } - - static class Summary - { - double min = 1; - double max = 1; - double stddev = 0; - - void update(SummaryStatistics stat) - { - min = Math.min(min, stat.getMin()); - max = Math.max(max, stat.getMax()); - stddev = Math.max(stddev, stat.getStandardDeviation()); - } - - public String toString() - { - return String.format("max %.2f min %.2f stddev %.4f", max, min, stddev); - } - } - - public void grow(ReplicationAwareTokenAllocator<Unit> t, int targetClusterSize, TokenCount tc, int perUnitCount, boolean verifyMetrics) - { - int size = t.unitCount(); - Summary su = new Summary(); - Summary st = new Summary(); - Random rand = new Random(targetClusterSize + perUnitCount); - TestReplicationStrategy strategy = (TestReplicationStrategy) t.strategy; - if (size < targetClusterSize) - { - System.out.format("Adding %d unit(s) using %s...", targetClusterSize - size, t.toString()); - long time = System.currentTimeMillis(); - while (size < targetClusterSize) - { - int tokens = tc.tokenCount(perUnitCount, rand); - Unit unit = new Unit(); - strategy.addUnit(unit); - t.addUnit(unit, tokens); - ++size; - if (verifyMetrics) - updateSummary(t, su, st, false); - } - System.out.format(" Done in %.3fs\n", (System.currentTimeMillis() - time) / 1000.0); - if (verifyMetrics) - { - updateSummary(t, su, st, true); - double maxExpected = 1.0 + tc.spreadExpectation() * strategy.spreadExpectation() / (perUnitCount * t.replicas); - if (su.max > maxExpected) - { - Assert.fail(String.format("Expected max unit size below %.4f, was %.4f", maxExpected, su.max)); - } - // We can't verify lower side range as small loads can't always be fixed. - } - } - } - - - private void updateSummary(ReplicationAwareTokenAllocator<Unit> t, Summary su, Summary st, boolean print) - { - int size = t.sortedTokens.size(); - double inverseAverage = 1.0 * size / t.strategy.replicas(); - - Map<Unit, Double> ownership = evaluateReplicatedOwnership(t); - SummaryStatistics unitStat = new SummaryStatistics(); - for (Map.Entry<Unit, Double> en : ownership.entrySet()) - unitStat.addValue(en.getValue() * inverseAverage / t.unitToTokens.get(en.getKey()).size()); - su.update(unitStat); - - SummaryStatistics tokenStat = new SummaryStatistics(); - for (Token tok : t.sortedTokens.keySet()) - tokenStat.addValue(replicatedTokenOwnership(tok, t.sortedTokens, t.strategy) * inverseAverage); - st.update(tokenStat); - - if (print) - { - System.out.format("Size %d(%d) \tunit %s token %s %s\n", - t.unitCount(), size, - mms(unitStat), - mms(tokenStat), - t.strategy); - System.out.format("Worst intermediate unit\t%s token %s\n", su, st); - } - } - - - private static String mms(SummaryStatistics s) - { - return String.format("max %.2f min %.2f stddev %.4f", s.getMax(), s.getMin(), s.getStandardDeviation()); - } - - - int nextUnitId = 0; - - final class Unit implements Comparable<Unit> - { - int unitId = nextUnitId++; - - public String toString() - { - return Integer.toString(unitId); - } - - @Override - public int compareTo(Unit o) - { - return Integer.compare(unitId, o.unitId); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6ec31ba/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 2f083c9..f93ce5c 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -36,6 +36,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; @@ -67,7 +70,9 @@ import static org.junit.Assert.assertTrue; public class Util { - private static List<UUID> hostIdPool = new ArrayList<UUID>(); + private static final Logger logger = LoggerFactory.getLogger(Util.class); + + private static List<UUID> hostIdPool = new ArrayList<>(); public static IPartitioner testPartitioner() { @@ -601,10 +606,9 @@ public class Util AssertionError e = runCatchingAssertionError(test); if (e == null) return; // success - System.err.format("Test failed. %s%n" - + "Re-running %d times to verify it isn't failing more often than it should.%n" - + "Failure was: %s%n", message, rerunsOnFailure, e); - e.printStackTrace(); + + logger.info("Test failed. {}", message, e); + logger.info("Re-running {} times to verify it isn't failing more often than it should.", rerunsOnFailure); int rerunsFailed = 0; for (int i = 0; i < rerunsOnFailure; ++i) @@ -614,15 +618,17 @@ public class Util { ++rerunsFailed; e.addSuppressed(t); + + logger.debug("Test failed again, total num failures: {}", rerunsFailed, t); } } if (rerunsFailed > 0) { - System.err.format("Test failed in %d of the %d reruns.%n", rerunsFailed, rerunsOnFailure); + logger.error("Test failed in {} of the {} reruns.", rerunsFailed, rerunsOnFailure); throw e; } - System.err.println("All reruns succeeded. Failure treated as flake."); + logger.info("All reruns succeeded. Failure treated as flake."); } // for use with Optional in tests, can be used as an argument to orElseThrow