Repository: cassandra Updated Branches: refs/heads/trunk 2b028b2b2 -> b4b1bdd32
Make sure we compact highly overlapping cold sstables with STCS Patch by marcuse; reviewed by carlyeks for CASSANDRA-8635 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9efa0173 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9efa0173 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9efa0173 Branch: refs/heads/trunk Commit: 9efa0173d0e621045f650e9a57a607d3c4c0bb50 Parents: 325169e Author: Marcus Eriksson <[email protected]> Authored: Wed Jan 28 13:49:42 2015 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Wed Jan 28 13:53:27 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../SizeTieredCompactionStrategy.java | 77 +++++++++++++++++++- .../cassandra/io/sstable/ColumnNameHelper.java | 22 ++++++ .../cassandra/io/sstable/SSTableReader.java | 50 +++++++++++++ .../SizeTieredCompactionStrategyTest.java | 12 +-- .../cassandra/db/filter/ColumnSliceTest.java | 14 ++++ 6 files changed, 168 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b247127..ff6a26f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.1.3 + * Make sure we compact highly overlapping cold sstables with + STCS (CASSANDRA-8635) * rpc_interface and listen_interface generate NPE on startup when specified interface doesn't exist (CASSANDRA-8677) * Fix ArrayIndexOutOfBoundsException in nodetool cfhistograms (CASSANDRA-8514) * Switch from yammer metrics for nodetool cf/proxy histograms (CASSANDRA-8662) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 4b44426..fbd715c 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -24,12 +24,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.statements.CFPropDefs; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.ColumnNameHelper; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.Pair; @@ -80,7 +82,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy int maxThreshold = cfs.getMaximumCompactionThreshold(); Iterable<SSTableReader> candidates = filterSuspectSSTables(Sets.intersection(cfs.getUncompactingSSTables(), sstables)); - candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit); + candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit, cfs.getMinimumCompactionThreshold()); List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize); logger.debug("Compaction buckets are {}", buckets); @@ -109,10 +111,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy * across all sstables * @param sstables all sstables to consider * @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything) + * @param minThreshold min compaction threshold * @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit */ @VisibleForTesting - static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit) + static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit, int minThreshold) { if (coldReadsToOmit == 0.0) return sstables; @@ -167,10 +170,78 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy totalColdReads += reads; cutoffIndex++; } + List<SSTableReader> hotSSTables = new ArrayList<>(sstables.subList(cutoffIndex, sstables.size())); + List<SSTableReader> coldSSTables = sstables.subList(0, cutoffIndex); + logger.debug("hotSSTables={}, coldSSTables={}", hotSSTables.size(), coldSSTables.size()); + if (hotSSTables.size() >= minThreshold) + return hotSSTables; + if (coldSSTables.size() < minThreshold) + return Collections.emptyList(); + + Map<SSTableReader, Set<SSTableReader>> overlapMap = new HashMap<>(); + for (int i = 0; i < coldSSTables.size(); i++) + { + SSTableReader sstable = coldSSTables.get(i); + Set<SSTableReader> overlaps = new HashSet<>(); + for (int j = 0; j < coldSSTables.size(); j++) + { + SSTableReader innerSSTable = coldSSTables.get(j); + if (ColumnNameHelper.overlaps(sstable.getSSTableMetadata().minColumnNames, + sstable.getSSTableMetadata().maxColumnNames, + innerSSTable.getSSTableMetadata().minColumnNames, + innerSSTable.getSSTableMetadata().maxColumnNames, + sstable.metadata.comparator)) + { + overlaps.add(innerSSTable); + } + } + overlapMap.put(sstable, overlaps); + } + List<Set<SSTableReader>> overlapChains = new ArrayList<>(); + for (SSTableReader sstable : overlapMap.keySet()) + overlapChains.add(createOverlapChain(sstable, overlapMap)); + + Collections.sort(overlapChains, new Comparator<Set<SSTableReader>>() + { + @Override + public int compare(Set<SSTableReader> o1, Set<SSTableReader> o2) + { + return Longs.compare(SSTableReader.getTotalBytes(o2), SSTableReader.getTotalBytes(o1)); + } + }); + for (Set<SSTableReader> overlapping : overlapChains) + { + // if we are expecting to only keep 70% of the keys after a compaction, run a compaction on these cold sstables: + if (SSTableReader.estimateCompactionGain(overlapping) < 0.7) + return new ArrayList<>(overlapping); + } + return Collections.emptyList(); + } - return sstables.subList(cutoffIndex, sstables.size()); + /** + * returns a set with all overlapping sstables starting with s. + * if we have 3 sstables, a, b, c where a overlaps with b, but not c and b overlaps with c, all sstables would be returned. + * + * m contains an sstable -> all overlapping mapping + */ + private static Set<SSTableReader> createOverlapChain(SSTableReader s, Map<SSTableReader, Set<SSTableReader>> m) + { + Deque<SSTableReader> sstables = new ArrayDeque<>(); + Set<SSTableReader> overlapChain = new HashSet<>(); + sstables.push(s); + while (!sstables.isEmpty()) + { + SSTableReader sstable = sstables.pop(); + if (overlapChain.add(sstable)) + { + if (m.containsKey(sstable)) + sstables.addAll(m.get(sstable)); + } + } + return overlapChain; } + /** * @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads * @param minThreshold minimum number of sstables in a bucket to qualify as interesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java b/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java index f74b86f..436975b 100644 --- a/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java +++ b/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java @@ -217,4 +217,26 @@ public class ColumnNameHelper return retList; } + + /** + * Checks if the given min/max column names could overlap (i.e they could share some column names based on the max/min column names in the sstables) + */ + public static boolean overlaps(List<ByteBuffer> minColumnNames1, List<ByteBuffer> maxColumnNames1, List<ByteBuffer> minColumnNames2, List<ByteBuffer> maxColumnNames2, CellNameType comparator) + { + if (minColumnNames1.isEmpty() || maxColumnNames1.isEmpty() || minColumnNames2.isEmpty() || maxColumnNames2.isEmpty()) + return true; + + return !(compare(maxColumnNames1, minColumnNames2, comparator) < 0 || compare(minColumnNames1, maxColumnNames2, comparator) > 0); + } + + private static int compare(List<ByteBuffer> columnNames1, List<ByteBuffer> columnNames2, CellNameType comparator) + { + for (int i = 0; i < Math.min(columnNames1.size(), columnNames2.size()); i++) + { + int cmp = comparator.subtype(i).compare(columnNames1.get(i), columnNames2.get(i)); + if (cmp != 0) + return cmp; + } + return 0; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index ee8b7c3..50bf3e3 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -54,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.cache.InstrumentingCache; @@ -272,6 +274,54 @@ public class SSTableReader extends SSTable return count; } + /** + * Estimates how much of the keys we would keep if the sstables were compacted together + */ + public static double estimateCompactionGain(Set<SSTableReader> overlapping) + { + Set<ICardinality> cardinalities = new HashSet<>(overlapping.size()); + for (SSTableReader sstable : overlapping) + { + try + { + ICardinality cardinality = ((CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION)).cardinalityEstimator; + if (cardinality != null) + cardinalities.add(cardinality); + else + logger.debug("Got a null cardinality estimator in: "+sstable.getFilename()); + } + catch (IOException e) + { + logger.warn("Could not read up compaction metadata for " + sstable, e); + } + } + long totalKeyCountBefore = 0; + for (ICardinality cardinality : cardinalities) + { + totalKeyCountBefore += cardinality.cardinality(); + } + if (totalKeyCountBefore == 0) + return 1; + + long totalKeyCountAfter = mergeCardinalities(cardinalities).cardinality(); + logger.debug("Estimated compaction gain: {}/{}={}", totalKeyCountAfter, totalKeyCountBefore, ((double)totalKeyCountAfter)/totalKeyCountBefore); + return ((double)totalKeyCountAfter)/totalKeyCountBefore; + } + + private static ICardinality mergeCardinalities(Collection<ICardinality> cardinalities) + { + ICardinality base = new HyperLogLogPlus(13, 25); // see MetadataCollector.cardinality + try + { + base = base.merge(cardinalities.toArray(new ICardinality[cardinalities.size()])); + } + catch (CardinalityMergeException e) + { + logger.warn("Could not merge cardinalities", e); + } + return base; + } + public static SSTableReader open(Descriptor descriptor) throws IOException { CFMetaData metadata; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java index 6132dad..d9bf017 100644 --- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java @@ -216,17 +216,17 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader for (SSTableReader sstr : sstrs) sstr.readMeter = null; - filtered = filterColdSSTables(sstrs, 0.05); + filtered = filterColdSSTables(sstrs, 0.05, 0); assertEquals("when there are no read meters, no sstables should be filtered", sstrs.size(), filtered.size()); for (SSTableReader sstr : sstrs) sstr.readMeter = new RestorableMeter(0.0, 0.0); - filtered = filterColdSSTables(sstrs, 0.05); + filtered = filterColdSSTables(sstrs, 0.05, 0); assertEquals("when all read meters are zero, no sstables should be filtered", sstrs.size(), filtered.size()); // leave all read rates at 0 besides one sstrs.get(0).readMeter = new RestorableMeter(1000.0, 1000.0); - filtered = filterColdSSTables(sstrs, 0.05); + filtered = filterColdSSTables(sstrs, 0.05, 0); assertEquals("there should only be one hot sstable", 1, filtered.size()); assertEquals(1000.0, filtered.get(0).readMeter.twoHourRate(), 0.5); @@ -239,20 +239,20 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader sstrs.get(2).readMeter = new RestorableMeter(1.0, 1.0); sstrs.get(3).readMeter = new RestorableMeter(1.0, 1.0); - filtered = filterColdSSTables(sstrs, 0.025); + filtered = filterColdSSTables(sstrs, 0.025, 0); assertEquals(2, filtered.size()); assertEquals(98.0, filtered.get(0).readMeter.twoHourRate() + filtered.get(1).readMeter.twoHourRate(), 0.5); // make sure a threshold of 0.0 doesn't result in any sstables being filtered for (SSTableReader sstr : sstrs) sstr.readMeter = new RestorableMeter(1.0, 1.0); - filtered = filterColdSSTables(sstrs, 0.0); + filtered = filterColdSSTables(sstrs, 0.0, 0); assertEquals(sstrs.size(), filtered.size()); // just for fun, set a threshold where all sstables are considered cold for (SSTableReader sstr : sstrs) sstr.readMeter = new RestorableMeter(1.0, 1.0); - filtered = filterColdSSTables(sstrs, 1.0); + filtered = filterColdSSTables(sstrs, 1.0, 0); assertTrue(filtered.isEmpty()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java index f1be21c..8ba2665 100644 --- a/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java +++ b/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java @@ -27,6 +27,7 @@ import org.junit.Test; import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.io.sstable.ColumnNameHelper; import org.apache.cassandra.utils.ByteBufferUtil; import static org.junit.Assert.*; @@ -344,6 +345,19 @@ public class ColumnSliceTest slice = new ColumnSlice(composite(0), composite(0, 1, 2)); assertFalse(slice.intersects(columnNames(1), columnNames(1, 2), nameType, false)); } + @Test + public void testColumnNameHelper() + { + List<AbstractType<?>> types = new ArrayList<>(); + types.add(Int32Type.instance); + types.add(Int32Type.instance); + types.add(Int32Type.instance); + CompoundDenseCellNameType nameType = new CompoundDenseCellNameType(types); + assertTrue(ColumnNameHelper.overlaps(columnNames(0, 0, 0), columnNames(3, 3, 3), columnNames(1, 1, 1), columnNames(2, 2, 2), nameType)); + assertFalse(ColumnNameHelper.overlaps(columnNames(0, 0, 0), columnNames(3, 3, 3), columnNames(4, 4, 4), columnNames(5, 5, 5), nameType)); + assertFalse(ColumnNameHelper.overlaps(columnNames(0, 0, 0), columnNames(3, 3, 3), columnNames(3, 3, 4), columnNames(5, 5, 5), nameType)); + assertTrue(ColumnNameHelper.overlaps(columnNames(0), columnNames(3, 3, 3), columnNames(1, 1), columnNames(5), nameType)); + } @Test public void testDeoverlapSlices()
