Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 576a75f28 -> 1435b9a87
Fix cassandra-stress user-mode truncation of partition generation patch by benedict; reviewed by tjake for CASSANDRA-8608 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1435b9a8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1435b9a8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1435b9a8 Branch: refs/heads/cassandra-2.1 Commit: 1435b9a87a4e1878cb35bd6d75e631bf2093e460 Parents: 576a75f Author: Benedict Elliott Smith <bened...@apache.org> Authored: Wed Jan 21 16:50:55 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Wed Jan 21 16:50:55 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/stress/Operation.java | 4 +- .../stress/generate/PartitionIterator.java | 219 ++++++++++--------- 3 files changed, 122 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8f71269..0c2bab8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608) * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267) * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316) * Invalidate prepared BATCH statements when related tables http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/tools/stress/src/org/apache/cassandra/stress/Operation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java index edf3a54..05045f8 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Operation.java +++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java @@ -105,9 +105,9 @@ public abstract class Operation break; if (spec.useRatio == null) - success = partitionCache.get(i).reset(seed, spec.targetCount, this); + success = partitionCache.get(i).reset(seed, spec.targetCount, isWrite()); else - success = partitionCache.get(i).reset(seed, spec.useRatio.next(), this); + success = partitionCache.get(i).reset(seed, spec.useRatio.next(), isWrite()); } } partitionCount = i; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java index 0d0cba1..0466edb 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java @@ -50,14 +50,16 @@ import org.apache.cassandra.stress.generate.values.Generator; public abstract class PartitionIterator implements Iterator<Row> { - // we reuse the row object to save garbage - abstract boolean reset(double useChance, int targetCount, Operation op); + abstract boolean reset(double useChance, int targetCount, boolean isWrite); long idseed; Seed seed; - final Object[] partitionKey; + final PartitionGenerator generator; final SeedManager seedManager; + + // we reuse these objects to save garbage + final Object[] partitionKey; final Row row; public static PartitionIterator get(PartitionGenerator generator, SeedManager seedManager) @@ -93,16 +95,16 @@ public abstract class PartitionIterator implements Iterator<Row> this.idseed = idseed; } - public boolean reset(Seed seed, double useChance, Operation op) + public boolean reset(Seed seed, double useChance, boolean isWrite) { setSeed(seed); - return reset(useChance, 0, op); + return reset(useChance, 0, isWrite); } - public boolean reset(Seed seed, int targetCount, Operation op) + public boolean reset(Seed seed, int targetCount, boolean isWrite) { setSeed(seed); - return reset(Double.NaN, targetCount, op); + return reset(Double.NaN, targetCount, isWrite); } static class SingleRowIterator extends PartitionIterator @@ -115,10 +117,10 @@ public abstract class PartitionIterator implements Iterator<Row> super(generator, seedManager); } - boolean reset(double useChance, int targetCount, Operation op) + boolean reset(double useChance, int targetCount, boolean isWrite) { done = false; - isWrite = op.isWrite(); + this.isWrite = isWrite; return true; } @@ -155,24 +157,22 @@ public abstract class PartitionIterator implements Iterator<Row> // TODO : support first/last row, and constraining reads to rows we know are populated static class MultiRowIterator extends PartitionIterator { - - // probability any single row will be generated in this iteration - double useChance; - // the seed used to generate the current values for the clustering components at each depth; // used to save recalculating it for each row, so we only need to recalc from prior row. final long[] clusteringSeeds = new long[generator.clusteringComponents.size()]; // the components remaining to be visited for each level of the current stack final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()]; + // probability any single row will be generated in this iteration + double useChance; // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level // so that we know with what chance we reached there, and we adjust our roll at that level by that amount final double[] chancemodifier = new double[generator.clusteringComponents.size()]; final double[] rollmodifier = new double[generator.clusteringComponents.size()]; // track where in the partition we are, and where we are limited to - final int[] position = new int[generator.clusteringComponents.size()]; - final int[] limit = new int[position.length]; + final int[] currentRow = new int[generator.clusteringComponents.size()]; + final int[] lastRow = new int[currentRow.length]; boolean hasNext, isFirstWrite, isWrite; // reusable collections for generating unique and sorted clustering components @@ -188,10 +188,22 @@ public abstract class PartitionIterator implements Iterator<Row> chancemodifier[0] = generator.clusteringDescendantAverages[0]; } - // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit - // count to decide how much we should return in one iteration - boolean reset(double useChance, int targetCount, Operation op) + /** + * initialise the iterator state + * + * if we're a write, the expected behaviour is that the requested + * batch count is compounded with the seed's visit count to decide + * how much we should return in one iteration + * + * @param useChance uniform chance of visiting any single row (NaN if targetCount provided) + * @param targetCount number of rows we would like to visit (0 if useChance provided) + * @param isWrite true if the action requires write semantics + * + * @return true if there is data to return, false otherwise + */ + boolean reset(double useChance, int targetCount, boolean isWrite) { + this.isWrite = isWrite; if (this.useChance < 1d) { // we clear our prior roll-modifiers if the use chance was previously less-than zero @@ -207,14 +219,13 @@ public abstract class PartitionIterator implements Iterator<Row> int expectedRowCount; int position = seed.position(); - isWrite = op.isWrite(); if (isWrite) expectedRowCount = firstComponentCount * generator.clusteringDescendantAverages[0]; else if (position != 0) - expectedRowCount = setLimit(position); + expectedRowCount = setLastRow(position - 1); else - expectedRowCount = setNoLimit(firstComponentCount); + expectedRowCount = setNoLastRow(firstComponentCount); if (Double.isNaN(useChance)) useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount)); @@ -222,38 +233,84 @@ public abstract class PartitionIterator implements Iterator<Row> while (true) { - // TODO: we could avoid repopulating these each loop, by tracking our prior position + // we loop in case we have picked an entirely non-existent range, in which case + // we will reset the seed's position, then try again (until we exhaust it or find + // some real range) + for (Queue<?> q : clusteringComponents) q.clear(); clusteringSeeds[0] = idseed; fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0)); - // we loop in case we have picked an entirely non-existent range, in which case - // we will reset the seed's position, then try again (until we exhaust it or find - // some real range) - this only happens for writes, so we only keep this logic in the loop - - if (isWrite) + if (!isWrite) { - position = seed.moveForwards(Math.max(1, expectedRowCount / seed.visits)); - isFirstWrite = position == 0; + if (seek(0) != State.SUCCESS) + throw new IllegalStateException(); + return true; } + + int count = Math.max(1, expectedRowCount / seed.visits); + position = seed.moveForwards(count); + isFirstWrite = position == 0; + setLastRow(position + count - 1); + // seek to our start position - switch (seek(isWrite ? position : 0)) + switch (seek(position)) { case END_OF_PARTITION: return false; case SUCCESS: return true; } + } + } - if (!isWrite) - throw new IllegalStateException(); + // returns expected row count + private int setNoLastRow(int firstComponentCount) + { + Arrays.fill(lastRow, Integer.MAX_VALUE); + return firstComponentCount * generator.clusteringDescendantAverages[0]; + } - // TODO: recompose our real position into the nearest scalar position, and ensure the seed position is >= this + // sets the last row we will visit + // returns expected distance from zero + private int setLastRow(int position) + { + if (position < 0) + throw new IllegalStateException(); + + decompose(position, lastRow); + int expectedRowCount = 0; + for (int i = 0 ; i < lastRow.length ; i++) + { + int l = lastRow[i]; + expectedRowCount += l * generator.clusteringDescendantAverages[i]; } + return expectedRowCount + 1; } + // returns 0 if we are currently on the last row we are allocated to visit; 1 if it is after, -1 if it is before + // this is defined by _limit_, which is wired up from expected (mean) row counts + // the last row is where position == lastRow, except the last index is 1 less; + // OR if that row does not exist, it is the last row prior to it + private int compareToLastRow(int depth) + { + for (int i = 0 ; i <= depth ; i++) + { + int p = currentRow[i], l = lastRow[i], r = clusteringComponents[i].size(); + if ((p == l) | (r == 1)) + continue; + return p - l; + } + return 0; + } + + /** + * Translate the scalar position into a tiered position based on mean expected counts + * @param scalar scalar position + * @param decomposed target container + */ private void decompose(int scalar, int[] decomposed) { for (int i = 0 ; i < decomposed.length ; i++) @@ -262,7 +319,7 @@ public abstract class PartitionIterator implements Iterator<Row> decomposed[i] = scalar / avg; scalar %= avg; } - for (int i = limit.length - 1 ; i > 0 ; i--) + for (int i = lastRow.length - 1 ; i > 0 ; i--) { int avg = generator.clusteringComponentAverages[i]; if (decomposed[i] >= avg) @@ -273,42 +330,28 @@ public abstract class PartitionIterator implements Iterator<Row> } } - private int setNoLimit(int firstComponentCount) - { - Arrays.fill(limit, Integer.MAX_VALUE); - return firstComponentCount * generator.clusteringDescendantAverages[0]; - } - - private int setLimit(int position) - { - decompose(position, limit); - int expectedRowCount = 0; - for (int i = 0 ; i < limit.length ; i++) - { - int l = limit[i]; - expectedRowCount += l * generator.clusteringDescendantAverages[i]; - } - return expectedRowCount; - } - static enum State { END_OF_PARTITION, AFTER_LIMIT, SUCCESS; } - // seek to the provided position (or the first entry if null) + /** + * seek to the provided position to initialise the iterator + * + * @param scalar scalar position + * @return resultant iterator state + */ private State seek(int scalar) { if (scalar == 0) { - this.position[0] = -1; + this.currentRow[0] = -1; clusteringComponents[0].addFirst(this); return setHasNext(advance(0, true)); } - int[] position = this.position; + int[] position = this.currentRow; decompose(scalar, position); - boolean incremented = false; for (int i = 0 ; i < position.length ; i++) { if (i != 0) @@ -321,39 +364,36 @@ public abstract class PartitionIterator implements Iterator<Row> if (clusteringComponents[i].isEmpty()) { int j = i; - while (--j >= 0) + while (true) { + // if we've exhausted the whole partition, we're done + if (--j < 0) + return setHasNext(false); + clusteringComponents[j].poll(); if (!clusteringComponents[j].isEmpty()) break; } - // if we've exhausted the whole partition, we're done - if (j < 0) - return setHasNext(false); - - // we don't check here to see if we've exceeded our limit, - // because if we came to a non-existent position and generated a limit + // we don't check here to see if we've exceeded our lastRow, + // because if we came to a non-existent position and generated a lastRow // we want to at least find the next real position, and set it on the seed // in this case we do then yield false and select a different seed to continue with position[j]++; Arrays.fill(position, j + 1, position.length, 0); while (j < i) fill(++j); - incremented = true; } - if (clusteringComponents[i].isEmpty()) - throw new IllegalStateException(); + row.row[i] = clusteringComponents[i].peek(); } - if (incremented && compareToLastRow() > 0) + if (compareToLastRow(currentRow.length - 1) > 0) return setHasNext(false); - position[position.length - 1]--; // call advance so we honour any select chance + position[position.length - 1]--; clusteringComponents[position.length - 1].addFirst(this); - return setHasNext(advance(position.length - 1, true)); } @@ -384,7 +424,7 @@ public abstract class PartitionIterator implements Iterator<Row> ThreadLocalRandom random = ThreadLocalRandom.current(); // advance the leaf component clusteringComponents[depth].poll(); - position[depth]++; + currentRow[depth]++; while (true) { if (clusteringComponents[depth].isEmpty()) @@ -394,15 +434,18 @@ public abstract class PartitionIterator implements Iterator<Row> return false; depth--; clusteringComponents[depth].poll(); - if (++position[depth] > limit[depth]) + if (++currentRow[depth] > lastRow[depth]) return false; continue; } - int compareToLastRow = compareToLastRow(); - if (compareToLastRow > 0 && !first) + int compareToLastRow = compareToLastRow(depth); + if (compareToLastRow > 0) + { + assert !first; return false; - boolean forceReturnOne = first && compareToLastRow >= 0; + } + boolean forceReturnOne = first && compareToLastRow == 0; // the chance of descending is the uniform usechance, multiplied by the number of children // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children @@ -424,7 +467,7 @@ public abstract class PartitionIterator implements Iterator<Row> rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance); chancemodifier[depth] = generator.clusteringDescendantAverages[depth] * rollmodifier[depth]; } - position[depth] = 0; + currentRow[depth] = 0; fill(depth); continue; } @@ -434,32 +477,8 @@ public abstract class PartitionIterator implements Iterator<Row> // if we don't descend, we remove the clustering suffix we've skipped and continue clusteringComponents[depth].poll(); - position[depth]++; - } - } - - private static int compare(int[] a, int[] b) - { - for (int i = 0 ; i != a.length ; i++) - if (a[i] != b[i]) - return Integer.compare(a[i], b[i]); - return 0; - } - - private int compareToLastRow() - { - int c = position.length - 1; - for (int i = 0 ; i <= c ; i++) - { - int p = position[i], l = limit[i], r = clusteringComponents[i].size(); - if (i == c && p == l - 1) - return 0; - if ((p < l) & (r > 1)) - return -1; - if (p > l) - return 1; + currentRow[depth]++; } - return 1; } // generate the clustering components for the provided depth; requires preceding components