cassandra-stress simultaneous inserts over same seed (take two) patch by benedict; reviewed by rstupp CASSANDRA-7964
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/597a1d5d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/597a1d5d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/597a1d5d Branch: refs/heads/trunk Commit: 597a1d5db27ef9e37f7066868e76cc9450fc3c9c Parents: 51f7cad Author: Benedict Elliott Smith <bened...@apache.org> Authored: Fri Dec 12 15:07:15 2014 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Fri Dec 12 15:07:15 2014 +0000 ---------------------------------------------------------------------- .../apache/cassandra/stress/WorkManager.java | 65 ++ .../stress/generate/PartitionIterator.java | 632 +++++++++++++++++++ .../SampledOpDistributionFactory.java | 1 + 3 files changed, 698 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/WorkManager.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java new file mode 100644 index 0000000..c6a3eee --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java @@ -0,0 +1,65 @@ +package org.apache.cassandra.stress; + +import java.util.concurrent.atomic.AtomicLong; + +interface WorkManager +{ + // -1 indicates consumer should terminate + int takePermits(int count); + + // signal all consumers to terminate + void stop(); + + static final class FixedWorkManager implements WorkManager + { + + final AtomicLong permits; + + public FixedWorkManager(long permits) + { + this.permits = new AtomicLong(permits); + } + + @Override + public int takePermits(int count) + { + while (true) + { + long cur = permits.get(); + if (cur == 0) + return -1; + count = (int) Math.min(count, cur); + long next = cur - count; + if (permits.compareAndSet(cur, next)) + return count; + } + } + + @Override + public void stop() + { + permits.getAndSet(0); + } + } + + static final class ContinuousWorkManager implements WorkManager + { + + volatile boolean stop = false; + + @Override + public int takePermits(int count) + { + if (stop) + return -1; + return count; + } + + @Override + public void stop() + { + stop = true; + } + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java new file mode 100644 index 0000000..baab867 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java @@ -0,0 +1,632 @@ +package org.apache.cassandra.stress.generate; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.stress.Operation; +import org.apache.cassandra.stress.generate.values.Generator; + +// a partition is re-used to reduce garbage generation, as is its internal RowIterator +// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to +// generate huge partitions with only a small number of clustering components; i.e. we should generate seeds for batches +// of a single component, and then generate the values within those batches as necessary. this will be difficult with +// generating sorted partitions, and may require generator support (e.g. we may need to support generating prefixes +// that are extended/suffixed to generate each batch, so that we can sort the prefixes) +public abstract class PartitionIterator implements Iterator<Row> +{ + + // we reuse the row object to save garbage + abstract boolean reset(double useChance, int targetCount, Operation op); + + long idseed; + Seed seed; + final Object[] partitionKey; + final PartitionGenerator generator; + final SeedManager seedManager; + final Row row; + + public static PartitionIterator get(PartitionGenerator generator, SeedManager seedManager) + { + if (generator.clusteringComponents.size() > 0) + return new MultiRowIterator(generator, seedManager); + else + return new SingleRowIterator(generator, seedManager); + } + + private PartitionIterator(PartitionGenerator generator, SeedManager seedManager) + { + this.generator = generator; + this.seedManager = seedManager; + this.partitionKey = new Object[generator.partitionKey.size()]; + this.row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]); + } + + private void setSeed(Seed seed) + { + long idseed = 0; + for (int i = 0 ; i < partitionKey.length ; i++) + { + Generator generator = this.generator.partitionKey.get(i); + // set the partition key seed based on the current work item we're processing + generator.setSeed(seed.seed); + Object key = generator.generate(); + partitionKey[i] = key; + // then contribute this value to the data seed + idseed = seed(key, generator.type, idseed); + } + this.seed = seed; + this.idseed = idseed; + } + + public boolean reset(Seed seed, double useChance, Operation op) + { + setSeed(seed); + return reset(useChance, 0, op); + } + + public boolean reset(Seed seed, int targetCount, Operation op) + { + setSeed(seed); + return reset(Double.NaN, targetCount, op); + } + + static class SingleRowIterator extends PartitionIterator + { + boolean done; + boolean isWrite; + + private SingleRowIterator(PartitionGenerator generator, SeedManager seedManager) + { + super(generator, seedManager); + } + + boolean reset(double useChance, int targetCount, Operation op) + { + done = false; + isWrite = op.isWrite(); + return true; + } + + public boolean hasNext() + { + return !done; + } + + public Row next() + { + if (done) + throw new NoSuchElementException(); + for (int i = 0 ; i < row.row.length ; i++) + { + Generator gen = generator.valueComponents.get(i); + gen.setSeed(idseed); + row.row[i] = gen.generate(); + } + done = true; + if (isWrite) + { + seedManager.markFirstWrite(seed, true); + seedManager.markLastWrite(seed, true); + } + return row; + } + } + + // permits iterating a random subset of the procedurally generated rows in this partition. this is the only mechanism for visiting rows. + // we maintain a stack of clustering components and their seeds; for each clustering component we visit, we generate all values it takes at that level, + // and then, using the average (total) number of children it takes we randomly choose whether or not we visit its children; + // if we do, we generate all possible values the immediate children can take, and repeat the process. So at any one time we are using space proportional + // to C.N, where N is the average number of values each clustering component takes, as opposed to N^C total values in the partition. + // TODO : support first/last row, and constraining reads to rows we know are populated + static class MultiRowIterator extends PartitionIterator + { + + // probability any single row will be generated in this iteration + double useChance; + + // the seed used to generate the current values for the clustering components at each depth; + // used to save recalculating it for each row, so we only need to recalc from prior row. + final long[] clusteringSeeds = new long[generator.clusteringComponents.size()]; + // the components remaining to be visited for each level of the current stack + final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()]; + + // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level + // so that we know with what chance we reached there, and we adjust our roll at that level by that amount + final double[] chancemodifier = new double[generator.clusteringComponents.size()]; + final double[] rollmodifier = new double[generator.clusteringComponents.size()]; + final ThreadLocalRandom random = ThreadLocalRandom.current(); + + // track where in the partition we are, and where we are limited to + final int[] position = new int[generator.clusteringComponents.size()]; + final int[] limit = new int[position.length]; + boolean hasNext, isFirstWrite, isWrite; + + // reusable collections for generating unique and sorted clustering components + final Set<Object> unique = new HashSet<>(); + final List<Object> tosort = new ArrayList<>(); + + MultiRowIterator(PartitionGenerator generator, SeedManager seedManager) + { + super(generator, seedManager); + for (int i = 0 ; i < clusteringComponents.length ; i++) + clusteringComponents[i] = new ArrayDeque<>(); + rollmodifier[0] = 1f; + chancemodifier[0] = generator.clusteringDescendantAverages[0]; + } + + // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit + // count to decide how much we should return in one iteration + boolean reset(double useChance, int targetCount, Operation op) + { + if (this.useChance < 1d) + { + // we clear our prior roll-modifiers if the use chance was previously less-than zero + Arrays.fill(rollmodifier, 1d); + Arrays.fill(chancemodifier, 1d); + } + + // set the seed for the first clustering component + generator.clusteringComponents.get(0).setSeed(idseed); + + // calculate how many first clustering components we'll generate, and how many total rows this predicts + int firstComponentCount = (int) generator.clusteringComponents.get(0).clusteringDistribution.next(); + int expectedRowCount; + + int position = seed.position(); + isWrite = op.isWrite(); + + if (isWrite) + expectedRowCount = firstComponentCount * generator.clusteringDescendantAverages[0]; + else if (position != 0) + expectedRowCount = setLimit(position); + else + expectedRowCount = setNoLimit(firstComponentCount); + + if (Double.isNaN(useChance)) + useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount)); + this.useChance = useChance; + + while (true) + { + // TODO: we could avoid repopulating these each loop, by tracking our prior position + for (Queue<?> q : clusteringComponents) + q.clear(); + clusteringSeeds[0] = idseed; + fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0)); + + // we loop in case we have picked an entirely non-existent range, in which case + // we will reset the seed's position, then try again (until we exhaust it or find + // some real range) - this only happens for writes, so we only keep this logic in the loop + + if (isWrite) + { + position = seed.moveForwards(Math.max(1, expectedRowCount / seed.visits)); + isFirstWrite = position == 0; + } + + // seek to our start position + switch (seek(isWrite ? position : null)) + { + case END_OF_PARTITION: + return false; + case SUCCESS: + return true; + } + + if (!isWrite) + throw new IllegalStateException(); + + // TODO: recompose our real position into the nearest scalar position, and ensure the seed position is >= this + } + } + + private void decompose(int scalar, int[] decomposed) + { + for (int i = 0 ; i < decomposed.length ; i++) + { + int avg = generator.clusteringDescendantAverages[i]; + decomposed[i] = scalar / avg; + scalar %= avg; + } + for (int i = limit.length - 1 ; i > 0 ; i--) + { + int avg = generator.clusteringComponentAverages[i]; + if (decomposed[i] >= avg) + { + decomposed[i - 1] += decomposed[i] / avg; + decomposed[i] %= avg; + } + } + } + + private int setNoLimit(int firstComponentCount) + { + Arrays.fill(limit, Integer.MAX_VALUE); + return firstComponentCount * generator.clusteringDescendantAverages[0]; + } + + private int setLimit(int position) + { + decompose(position, limit); + int expectedRowCount = 0; + for (int i = 0 ; i < limit.length ; i++) + { + int l = limit[i]; + expectedRowCount += l * generator.clusteringDescendantAverages[i]; + } + return expectedRowCount; + } + + static enum State + { + END_OF_PARTITION, AFTER_LIMIT, SUCCESS; + } + + // seek to the provided position (or the first entry if null) + private State seek(int scalar) + { + if (scalar == 0) + { + this.position[0] = -1; + clusteringComponents[0].addFirst(this); + return setHasNext(advance(0, true)); + } + + int[] position = this.position; + decompose(scalar, position); + boolean incremented = false; + for (int i = 0 ; i < position.length ; i++) + { + if (i != 0) + fill(i); + for (int c = position[i] ; c > 0 ; c--) + clusteringComponents[i].poll(); + + // we can have started from a position that does not exist, in which + // case we need to ascend back up our clustering components, advancing as we go + if (clusteringComponents[i].isEmpty()) + { + int j = i; + while (--j >= 0) + { + clusteringComponents[j].poll(); + if (!clusteringComponents[j].isEmpty()) + break; + } + + // if we've exhausted the whole partition, we're done + if (j < 0) + return setHasNext(false); + + // we don't check here to see if we've exceeded our limit, + // because if we came to a non-existent position and generated a limit + // we want to at least find the next real position, and set it on the seed + // in this case we do then yield false and select a different seed to continue with + position[j]++; + Arrays.fill(position, j + 1, position.length, 0); + while (j < i) + fill(++j); + incremented = true; + } + if (clusteringComponents[i].isEmpty()) + throw new IllegalStateException(); + row.row[i] = clusteringComponents[i].peek(); + } + + if (incremented && compareToLastRow() > 0) + return setHasNext(false); + + position[position.length - 1]--; + // call advance so we honour any select chance + clusteringComponents[position.length - 1].addFirst(this); + + return setHasNext(advance(position.length - 1, true)); + } + + // normal method for moving the iterator forward; maintains the row object, and delegates to advance(int) + // to move the iterator to the next item + void advance() + { + // we are always at the leaf level when this method is invoked + // so we calculate the seed for generating the row by combining the seed that generated the clustering components + int depth = clusteringComponents.length - 1; + long parentSeed = clusteringSeeds[depth]; + long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed); + + // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver + for (int i = clusteringSeeds.length ; i < row.row.length ; i++) + { + Generator gen = generator.valueComponents.get(i - clusteringSeeds.length); + gen.setSeed(rowSeed); + row.row[i] = gen.generate(); + } + + // then we advance the leaf level + setHasNext(advance(depth, false)); + } + + private boolean advance(int depth, boolean first) + { + // advance the leaf component + clusteringComponents[depth].poll(); + position[depth]++; + while (true) + { + if (clusteringComponents[depth].isEmpty()) + { + // if we've run out of clustering components at this level, ascend + if (depth == 0) + return false; + depth--; + clusteringComponents[depth].poll(); + if (++position[depth] > limit[depth]) + return false; + continue; + } + + int compareToLastRow = compareToLastRow(); + if (compareToLastRow > 0 && !first) + return false; + boolean forceReturnOne = first && compareToLastRow >= 0; + + // the chance of descending is the uniform usechance, multiplied by the number of children + // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children + // then we will always descend), multiplied by 1/(compound roll), where (compound roll) is the + // chance with which we reached this depth, i.e. if we already beat 50/50 odds, we double our + // chance of beating this next roll + double thischance = useChance * chancemodifier[depth]; + if (forceReturnOne || thischance > 0.99999f || thischance >= random.nextDouble()) + { + // if we're descending, we fill in our clustering component and increase our depth + row.row[depth] = clusteringComponents[depth].peek(); + depth++; + if (depth == clusteringComponents.length) + return true; + // if we haven't reached the leaf, we update our probability statistics, fill in all of + // this level's clustering components, and repeat + if (useChance < 1d) + { + rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance); + chancemodifier[depth] = generator.clusteringDescendantAverages[depth] * rollmodifier[depth]; + } + position[depth] = 0; + fill(depth); + continue; + } + + if (compareToLastRow >= 0) + return false; + + // if we don't descend, we remove the clustering suffix we've skipped and continue + clusteringComponents[depth].poll(); + position[depth]++; + } + } + + private static int compare(int[] a, int[] b) + { + for (int i = 0 ; i != a.length ; i++) + if (a[i] != b[i]) + return Integer.compare(a[i], b[i]); + return 0; + } + + private int compareToLastRow() + { + int c = position.length - 1; + for (int i = 0 ; i <= c ; i++) + { + int p = position[i], l = limit[i], r = clusteringComponents[i].size(); + if (i == c && p == l - 1) + return 0; + if ((p < l) & (r > 1)) + return -1; + if (p > l) + return 1; + } + return 1; + } + + // generate the clustering components for the provided depth; requires preceding components + // to have been generated and their seeds populated into clusteringSeeds + void fill(int depth) + { + long seed = clusteringSeeds[depth - 1]; + Generator gen = generator.clusteringComponents.get(depth); + gen.setSeed(seed); + clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed); + fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen); + } + + // generate the clustering components into the queue + void fill(Queue<Object> queue, int count, Generator generator) + { + if (count == 1) + { + queue.add(generator.generate()); + return; + } + + switch (this.generator.order) + { + case SORTED: + if (Comparable.class.isAssignableFrom(generator.clazz)) + { + tosort.clear(); + for (int i = 0 ; i < count ; i++) + tosort.add(generator.generate()); + Collections.sort((List<Comparable>) (List<?>) tosort); + for (int i = 0 ; i < count ; i++) + if (i == 0 || ((Comparable) tosort.get(i - 1)).compareTo(i) < 0) + queue.add(tosort.get(i)); + break; + } + case ARBITRARY: + unique.clear(); + for (int i = 0 ; i < count ; i++) + { + Object next = generator.generate(); + if (unique.add(next)) + queue.add(next); + } + break; + case SHUFFLED: + unique.clear(); + tosort.clear(); + ThreadLocalRandom rand = ThreadLocalRandom.current(); + for (int i = 0 ; i < count ; i++) + { + Object next = generator.generate(); + if (unique.add(next)) + tosort.add(next); + } + for (int i = 0 ; i < tosort.size() ; i++) + { + int index = rand.nextInt(i, tosort.size()); + Object obj = tosort.get(index); + tosort.set(index, tosort.get(i)); + queue.add(obj); + } + break; + default: + throw new IllegalStateException(); + } + } + + public boolean hasNext() + { + return hasNext; + } + + public Row next() + { + if (!hasNext()) + throw new NoSuchElementException(); + advance(); + return row; + } + + public boolean finishedPartition() + { + return clusteringComponents[0].isEmpty(); + } + + private State setHasNext(boolean hasNext) + { + if (!hasNext) + { + this.hasNext = false; + boolean isLast = finishedPartition(); + if (isWrite) + { + boolean isFirst = isFirstWrite; + if (isFirst) + seedManager.markFirstWrite(seed, isLast); + if (isLast) + seedManager.markLastWrite(seed, isFirst); + } + return isLast ? State.END_OF_PARTITION : State.AFTER_LIMIT; + } + this.hasNext = hasNext; + return State.SUCCESS; + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + // calculate a new seed based on the combination of a parent seed and the generated child, to generate + // any children of this child + static long seed(Object object, AbstractType type, long seed) + { + if (object instanceof ByteBuffer) + { + ByteBuffer buf = (ByteBuffer) object; + for (int i = buf.position() ; i < buf.limit() ; i++) + seed = (31 * seed) + buf.get(i); + return seed; + } + else if (object instanceof String) + { + String str = (String) object; + for (int i = 0 ; i < str.length() ; i++) + seed = (31 * seed) + str.charAt(i); + return seed; + } + else if (object instanceof Number) + { + return (seed * 31) + ((Number) object).longValue(); + } + else if (object instanceof UUID) + { + return seed * 31 + (((UUID) object).getLeastSignificantBits() ^ ((UUID) object).getMostSignificantBits()); + } + else + { + return seed(type.decompose(object), BytesType.instance, seed); + } + } + + public Object getPartitionKey(int i) + { + return partitionKey[i]; + } + + public String getKeyAsString() + { + StringBuilder sb = new StringBuilder(); + int i = 0; + for (Object key : partitionKey) + { + if (i > 0) + sb.append("|"); + AbstractType type = generator.partitionKey.get(i++).type; + sb.append(type.getString(type.decompose(key))); + } + return sb.toString(); + } + + // used for thrift smart routing - if it's a multi-part key we don't try to route correctly right now + public ByteBuffer getToken() + { + return generator.partitionKey.get(0).type.decompose(partitionKey[0]); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java index 9e1a5e8..9713e93 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java @@ -32,6 +32,7 @@ import org.apache.commons.math3.util.Pair; import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.generate.DistributionFactory; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.util.Timer; public abstract class SampledOpDistributionFactory<T> implements OpDistributionFactory