Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/942b83ca Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/942b83ca Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/942b83ca Branch: refs/heads/trunk Commit: 942b83ca93d6d2380a830cb775e8506cc1cf0324 Parents: 1dc1aa1 a5ce963 Author: Jeff Jirsa <[email protected]> Authored: Mon Feb 27 17:22:31 2017 -0800 Committer: Jeff Jirsa <[email protected]> Committed: Mon Feb 27 17:23:17 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/io/sstable/SSTable.java | 2 + .../io/sstable/metadata/MetadataCollector.java | 2 +- .../cassandra/utils/StreamingHistogram.java | 133 ++++-- .../microbench/StreamingHistogramBench.java | 405 +++++++++++++++++++ .../db/compaction/CompactionsTest.java | 3 + .../DateTieredCompactionStrategyTest.java | 4 + .../LeveledCompactionStrategyTest.java | 4 + .../SizeTieredCompactionStrategyTest.java | 4 + .../cassandra/db/compaction/TTLExpiryTest.java | 4 + .../TimeWindowCompactionStrategyTest.java | 4 + .../cassandra/utils/StreamingHistogramTest.java | 40 +- 12 files changed, 569 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index f5b9d28,1100bfd..1810cae --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,13 -1,5 +1,14 @@@ -3.0.12 +3.11.0 + * Fix equality comparisons of columns using the duration type (CASSANDRA-13174) + * Obfuscate password in stress-graphs (CASSANDRA-12233) + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034) + * nodetool stopdaemon errors out (CASSANDRA-13030) + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954) + * Fix primary index calculation for SASI (CASSANDRA-12910) + * More fixes to the TokenAllocator (CASSANDRA-12990) + * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983) +Merged from 3.0: + * Faster StreamingHistogram (CASSANDRA-13038) * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237) * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070) * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185) http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTable.java index 601f5a0,1e4488c..fdcd17f --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@@ -59,7 -58,10 +59,9 @@@ public abstract class SSTabl { static final Logger logger = LoggerFactory.getLogger(SSTable.class); - public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100; + public static final int TOMBSTONE_HISTOGRAM_SPOOL_SIZE = 100000; + public static final int TOMBSTONE_HISTOGRAM_TTL_ROUND_SECONDS = Integer.valueOf(System.getProperty("cassandra.streaminghistogram.roundseconds", "60")); public final Descriptor descriptor; protected final Set<Component> components; http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/src/java/org/apache/cassandra/utils/StreamingHistogram.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/StreamingHistogram.java index a500450,fffa73e..6fde931 --- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java +++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java @@@ -39,11 -39,11 +39,14 @@@ public class StreamingHistogra public static final StreamingHistogramSerializer serializer = new StreamingHistogramSerializer(); // TreeMap to hold bins of histogram. - private final TreeMap<Double, Long> bin; + // The key is a numeric type so we can avoid boxing/unboxing streams of different key types + // The value is a unboxed long array always of length == 1 + // Serialized Histograms always writes with double keys for backwards compatibility + private final TreeMap<Number, long[]> bin; + // Keep a second, larger buffer to spool data in, before finalizing it into `bin` - private final TreeMap<Double, Long> spool; ++ private final TreeMap<Number, long[]> spool; + // maximum bin size for this histogram private final int maxBinSize; @@@ -51,22 -57,22 +60,31 @@@ * Creates a new histogram with max bin size of maxBinSize * @param maxBinSize maximum number of bins this histogram can have */ - public StreamingHistogram(int maxBinSize) + public StreamingHistogram(int maxBinSize, int maxSpoolSize, int roundSeconds) { this.maxBinSize = maxBinSize; + this.maxSpoolSize = maxSpoolSize; + this.roundSeconds = roundSeconds; - bin = new TreeMap<>(); - spool = new TreeMap<>(); + bin = new TreeMap<>((o1, o2) -> { + if (o1.getClass().equals(o2.getClass())) + return ((Comparable)o1).compareTo(o2); + else + return Double.compare(o1.doubleValue(), o2.doubleValue()); + }); ++ spool = new TreeMap<>((o1, o2) -> { ++ if (o1.getClass().equals(o2.getClass())) ++ return ((Comparable)o1).compareTo(o2); ++ else ++ return Double.compare(o1.doubleValue(), o2.doubleValue()); ++ }); ++ } - private StreamingHistogram(int maxBinSize, Map<Double, Long> bin) - private StreamingHistogram(int maxBinSize, int maxSpoolSize, int roundSeconds, Map<Double, Long> bin) ++ private StreamingHistogram(int maxBinSize, int maxSpoolSize, int roundSeconds, Map<Double, Long> bin) { - this(maxBinSize); - this.maxBinSize = maxBinSize; - this.maxSpoolSize = maxSpoolSize; - this.roundSeconds = roundSeconds; - this.bin = new TreeMap<>(bin); - this.spool = new TreeMap<>(); ++ this(maxBinSize, maxSpoolSize, roundSeconds); + for (Map.Entry<Double, Long> entry : bin.entrySet()) + this.bin.put(entry.getKey(), new long[]{entry.getValue()}); } /** @@@ -83,9 -89,13 +101,13 @@@ * @param p * @param m */ - public void update(double p, long m) + public void update(Number p, long m) { - long[] mi = bin.get(p); - double d = p % this.roundSeconds; - if (d > 0) - p = p + (this.roundSeconds - d); ++ Number d = p.longValue() % this.roundSeconds; ++ if (d.longValue() > 0) ++ p =p.longValue() + (this.roundSeconds - d.longValue()); + - Long mi = spool.get(p); ++ long[] mi = spool.get(p); if (mi != null) { // we found the same p so increment that counter @@@ -93,38 -103,65 +115,77 @@@ } else { - spool.put(p, m); + mi = new long[]{m}; - bin.put(p, mi); - // if bin size exceeds maximum bin size then trim down to max size - while (bin.size() > maxBinSize) ++ spool.put(p, mi); + } ++ ++ // If spool has overflowed, compact it + if(spool.size() > maxSpoolSize) + flushHistogram(); + } + + /** + * Drain the temporary spool into the final bins + */ + public void flushHistogram() + { - if(spool.size() > 0) ++ if (spool.size() > 0) + { - Long spoolValue; - Long binValue; ++ long[] spoolValue; ++ long[] binValue; + + // Iterate over the spool, copying the value into the primary bin map + // and compacting that map as necessary - for (Map.Entry<Double, Long> entry : spool.entrySet()) ++ for (Map.Entry<Number, long[]> entry : spool.entrySet()) { - // find points p1, p2 which have smallest difference - Iterator<Number> keys = bin.keySet().iterator(); - double p1 = keys.next().doubleValue(); - double p2 = keys.next().doubleValue(); - double smallestDiff = p2 - p1; - double q1 = p1, q2 = p2; - while (keys.hasNext()) - Double key = entry.getKey(); ++ Number key = entry.getKey(); + spoolValue = entry.getValue(); + binValue = bin.get(key); + - if (binValue != null) ++ // If this value is already in the final histogram bins ++ // Simply increment and update, otherwise, insert a new long[1] value ++ if(binValue != null) + { - binValue += spoolValue; ++ binValue[0] += spoolValue[0]; + bin.put(key, binValue); - } else - { - bin.put(key, spoolValue); ++ } ++ else + { - p1 = p2; - p2 = keys.next().doubleValue(); - double diff = p2 - p1; - if (diff < smallestDiff) ++ bin.put(key, new long[]{spoolValue[0]}); + } + - // if bin size exceeds maximum bin size then trim down to max size + if (bin.size() > maxBinSize) + { + // find points p1, p2 which have smallest difference - Iterator<Double> keys = bin.keySet().iterator(); - double p1 = keys.next(); - double p2 = keys.next(); ++ Iterator<Number> keys = bin.keySet().iterator(); ++ double p1 = keys.next().doubleValue(); ++ double p2 = keys.next().doubleValue(); + double smallestDiff = p2 - p1; + double q1 = p1, q2 = p2; - while (keys.hasNext()) { ++ while (keys.hasNext()) + { - smallestDiff = diff; - q1 = p1; - q2 = p2; + p1 = p2; - p2 = keys.next(); ++ p2 = keys.next().doubleValue(); + double diff = p2 - p1; - if (diff < smallestDiff) { ++ if (diff < smallestDiff) ++ { + smallestDiff = diff; + q1 = p1; + q2 = p2; + } } + // merge those two - long k1 = bin.remove(q1); - long k2 = bin.remove(q2); - bin.put((q1 * k1 + q2 * k2) / (k1 + k2), k1 + k2); ++ long[] a1 = bin.remove(q1); ++ long[] a2 = bin.remove(q2); ++ long k1 = a1[0]; ++ long k2 = a2[0]; ++ ++ a1[0] += k2; ++ bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1); ++ } - // merge those two - long[] a1 = bin.remove(q1); - long[] a2 = bin.remove(q2); - long k1 = a1[0]; - long k2 = a2[0]; - - a1[0] += k2; - bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1); } + spool.clear(); } } @@@ -138,8 -175,10 +199,10 @@@ if (other == null) return; - flushHistogram(); ++ other.flushHistogram(); + - for (Map.Entry<Double, Long> entry : other.getAsMap().entrySet()) - update(entry.getKey(), entry.getValue()); + for (Map.Entry<Number, long[]> entry : other.getAsMap().entrySet()) + update(entry.getKey(), entry.getValue()[0]); } /** @@@ -150,9 -189,10 +213,10 @@@ */ public double sum(double b) { + flushHistogram(); double sum = 0; // find the points pi, pnext which satisfy pi <= b < pnext - Map.Entry<Double, Long> pnext = bin.higherEntry(b); + Map.Entry<Number, long[]> pnext = bin.higherEntry(b); if (pnext == null) { // if b is greater than any key in this histogram, @@@ -177,8 -217,9 +241,9 @@@ return sum; } - public Map<Double, Long> getAsMap() + public Map<Number, long[]> getAsMap() { + flushHistogram(); return Collections.unmodifiableMap(bin); } @@@ -186,13 -227,13 +251,14 @@@ { public void serialize(StreamingHistogram histogram, DataOutputPlus out) throws IOException { ++ histogram.flushHistogram(); out.writeInt(histogram.maxBinSize); - Map<Double, Long> entries = histogram.getAsMap(); + Map<Number, long[]> entries = histogram.getAsMap(); out.writeInt(entries.size()); - for (Map.Entry<Double, Long> entry : entries.entrySet()) + for (Map.Entry<Number, long[]> entry : entries.entrySet()) { - out.writeDouble(entry.getKey()); - out.writeLong(entry.getValue()); + out.writeDouble(entry.getKey().doubleValue()); + out.writeLong(entry.getValue()[0]); } } @@@ -230,13 -271,13 +296,16 @@@ return false; StreamingHistogram that = (StreamingHistogram) o; -- return maxBinSize == that.maxBinSize && bin.equals(that.bin); ++ return maxBinSize == that.maxBinSize && ++ maxSpoolSize == that.maxSpoolSize && ++ spool.equals(that.spool) && ++ bin.equals(that.bin); } @Override public int hashCode() { - return Objects.hashCode(bin.hashCode(), maxBinSize); - return Objects.hashCode(bin.hashCode(), spool.hashCode(), maxBinSize); ++ return Objects.hashCode(bin.hashCode(), spool.hashCode(), maxBinSize, maxSpoolSize); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java index 94aac9e,21c736e..f64cbd9 --- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java +++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java @@@ -33,7 -32,7 +33,7 @@@ public class StreamingHistogramTes @Test public void testFunction() throws Exception { - StreamingHistogram hist = new StreamingHistogram(5); - StreamingHistogram hist = new StreamingHistogram(5, 5, 1); ++ StreamingHistogram hist = new StreamingHistogram(5, 0, 1); long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45}; // add 7 points to histogram of 5 bins @@@ -59,7 -58,7 +59,7 @@@ } // merge test - StreamingHistogram hist2 = new StreamingHistogram(3); - StreamingHistogram hist2 = new StreamingHistogram(3, 0, 1); ++ StreamingHistogram hist2 = new StreamingHistogram(3, 3, 1); for (int i = 7; i < samples.length; i++) { hist2.update(samples[i]); @@@ -121,32 -120,36 +121,64 @@@ } } + + @Test + public void testNumericTypes() throws Exception + { - StreamingHistogram hist = new StreamingHistogram(5); ++ StreamingHistogram hist = new StreamingHistogram(5, 0, 1); + + hist.update(2); + hist.update(2.0); + hist.update(2L); + + Map<Number, long[]> asMap = hist.getAsMap(); + + assertEquals(1, asMap.size()); + assertEquals(3L, asMap.get(2)[0]); + + //Make sure it's working with Serde + DataOutputBuffer out = new DataOutputBuffer(); + StreamingHistogram.serializer.serialize(hist, out); + byte[] bytes = out.toByteArray(); + + StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputBuffer(bytes)); + + deserialized.update(2L); + + asMap = deserialized.getAsMap(); + assertEquals(1, asMap.size()); + assertEquals(4L, asMap.get(2)[0]); + } ++ + @Test + public void testOverflow() throws Exception + { + StreamingHistogram hist = new StreamingHistogram(5, 10, 1); + long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 31, - 32, 32, 33, 34, 35, 70, 78, 80, 90, 100, - 32, 32, 33, 34, 35, 70, 78, 80, 90, 100 - }; ++ 32, 32, 33, 34, 35, 70, 78, 80, 90, 100, ++ 32, 32, 33, 34, 35, 70, 78, 80, 90, 100 ++ }; + + // Hit the spool cap, force it to make bins + for (int i = 0; i < samples.length; i++) + { + hist.update(samples[i]); + } ++ + assertEquals(5, hist.getAsMap().keySet().size()); + + } + + @Test + public void testRounding() throws Exception + { + StreamingHistogram hist = new StreamingHistogram(5, 10, 60); + long[] samples = new long[] { 59, 60, 119, 180, 181, 300 }; // 60, 60, 120, 180, 240, 300 + for (int i = 0 ; i < samples.length ; i++) + hist.update(samples[i]); - - assertEquals(hist.getAsMap().keySet().size(), (int) 5); - assertEquals((long) hist.getAsMap().get(Double.valueOf(60)), (long) 2L); - assertEquals((long) hist.getAsMap().get(Double.valueOf(120)), (long) 1L); ++ assertEquals(hist.getAsMap().keySet().size(), 5); ++ assertEquals(hist.getAsMap().get(60)[0], 2); ++ assertEquals(hist.getAsMap().get(120)[0], 1); + + } - }
