Fix size-tiered compaction in LCS L0 patch by jbellis; reviewed by Marcus Eriksson and tested by Nikolai Grigoriev for CASSANDRA-9496
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ecec863d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ecec863d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ecec863d Branch: refs/heads/trunk Commit: ecec863d1fe3b1b249b7d2948b482104f5ff1ef3 Parents: 09c7dee Author: Jonathan Ellis <[email protected]> Authored: Tue Dec 17 14:31:04 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Tue Dec 17 14:31:14 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compaction/AbstractCompactionStrategy.java | 2 +- .../cassandra/db/compaction/CompactionTask.java | 2 +- .../compaction/LeveledCompactionStrategy.java | 20 +++++++--------- .../db/compaction/LeveledCompactionTask.java | 8 +++---- .../db/compaction/LeveledManifest.java | 25 ++++++++++++++++---- .../SizeTieredCompactionStrategy.java | 2 +- .../cassandra/db/compaction/Upgrader.java | 9 +------ .../cassandra/tools/StandaloneScrubber.java | 2 +- 9 files changed, 39 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ecec863d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5450b8a..c2cd052 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.4 + * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496) * Fix assertion failure in filterColdSSTables (CASSANDRA-6483) * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008) * Fix cleanup ClassCastException (CASSANDRA-6462) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ecec863d/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index b63caab..f101998 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -167,7 +167,7 @@ public abstract class AbstractCompactionStrategy /** * @return size in bytes of the largest sstables for this strategy */ - public abstract long getMaxSSTableSize(); + public abstract long getMaxSSTableBytes(); public boolean isEnabled() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ecec863d/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 6c6f852..2a23966 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -118,7 +118,7 @@ public class CompactionTask extends AbstractCompactionTask long totalkeysWritten = 0; long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata)); - long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableSize()); + long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes()); long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); if (logger.isDebugEnabled()) logger.debug("Expected bloom filter size : " + keysPerSSTable); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ecec863d/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index e992003..8e60223 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -38,7 +38,6 @@ import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; import org.apache.cassandra.notifications.SSTableAddedNotification; import org.apache.cassandra.notifications.SSTableListChangedNotification; -import org.apache.cassandra.utils.Pair; public class LeveledCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer { @@ -111,11 +110,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem { while (true) { - Pair<? extends Collection<SSTableReader>, Integer> pair = manifest.getCompactionCandidates(); - Collection<SSTableReader> sstables; OperationType op; - int level; - if (pair == null) + LeveledManifest.CompactionCandidate candidate = manifest.getCompactionCandidates(); + if (candidate == null) { // if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio SSTableReader sstable = findDroppableSSTable(gcBefore); @@ -124,20 +121,19 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem logger.debug("No compaction necessary for {}", this); return null; } - sstables = Collections.singleton(sstable); + candidate = new LeveledManifest.CompactionCandidate(Collections.singleton(sstable), + sstable.getSSTableLevel(), + getMaxSSTableBytes()); op = OperationType.TOMBSTONE_COMPACTION; - level = sstable.getSSTableLevel(); } else { op = OperationType.COMPACTION; - sstables = pair.left; - level = pair.right; } - if (cfs.getDataTracker().markCompacting(sstables)) + if (cfs.getDataTracker().markCompacting(candidate.sstables)) { - LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, level, gcBefore, maxSSTableSizeInMB); + LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, candidate.sstables, candidate.level, gcBefore, candidate.maxSSTableBytes); newTask.setCompactionType(op); return newTask; } @@ -168,7 +164,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem } } - public long getMaxSSTableSize() + public long getMaxSSTableBytes() { return maxSSTableSizeInMB * 1024L * 1024L; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ecec863d/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index bb60f21..f64f633 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -26,19 +26,19 @@ import org.apache.cassandra.io.sstable.SSTableWriter; public class LeveledCompactionTask extends CompactionTask { private final int level; - private final int sstableSizeInMB; + private final long maxSSTableBytes; - public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int level, final int gcBefore, int sstableSizeInMB) + public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int level, final int gcBefore, long maxSSTableBytes) { super(cfs, sstables, gcBefore); this.level = level; - this.sstableSizeInMB = sstableSizeInMB; + this.maxSSTableBytes = maxSSTableBytes; } @Override protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) { - return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L; + return writer.getOnDiskFilePointer() > maxSSTableBytes; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/ecec863d/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 5690bd8..92cd887 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -230,7 +230,7 @@ public class LeveledManifest * @return highest-priority sstables to compact, and level to compact them to * If no compactions are necessary, will return null */ - public synchronized Pair<? extends Collection<SSTableReader>, Integer> getCompactionCandidates() + public synchronized CompactionCandidate getCompactionCandidates() { // LevelDB gives each level a score of how much data it contains vs its ideal amount, and // compacts the level with the highest score. But this falls apart spectacularly once you @@ -283,7 +283,10 @@ public class LeveledManifest options.minSSTableSize); List<SSTableReader> mostInteresting = SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32); if (!mostInteresting.isEmpty()) - return Pair.create(mostInteresting, 0); + { + logger.debug("L0 is too far behind, performing size-tiering there first"); + return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE); + } } // L0 is fine, proceed with this level @@ -291,7 +294,7 @@ public class LeveledManifest if (logger.isDebugEnabled()) logger.debug("Compaction candidates for L{} are {}", i, toString(candidates)); if (!candidates.isEmpty()) - return Pair.create(candidates, getNextLevel(candidates)); + return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes()); } } @@ -301,7 +304,7 @@ public class LeveledManifest Collection<SSTableReader> candidates = getCandidatesFor(0); if (candidates.isEmpty()) return null; - return Pair.create(candidates, getNextLevel(candidates)); + return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes()); } public synchronized int getLevelSize(int i) @@ -602,4 +605,18 @@ public class LeveledManifest FileUtils.delete(filename); FileUtils.renameWithConfirm(filename + "-tmp", filename); } + + public static class CompactionCandidate + { + public final Collection<SSTableReader> sstables; + public final int level; + public final long maxSSTableBytes; + + public CompactionCandidate(Collection<SSTableReader> sstables, int level, long maxSSTableBytes) + { + this.sstables = sstables; + this.level = level; + this.maxSSTableBytes = maxSSTableBytes; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ecec863d/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 7ccc99d..51464ab 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -346,7 +346,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy estimatedRemainingTasks = n; } - public long getMaxSSTableSize() + public long getMaxSSTableBytes() { return Long.MAX_VALUE; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ecec863d/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index fa21765..2805a52 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -25,13 +25,6 @@ import com.google.common.base.Throwables; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; -import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; -import org.apache.cassandra.db.compaction.AbstractCompactionIterable; -import org.apache.cassandra.db.compaction.CompactionIterable; -import org.apache.cassandra.db.compaction.CompactionController; -import org.apache.cassandra.db.compaction.CompactionTask; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.OutputHandler; @@ -63,7 +56,7 @@ public class Upgrader this.strategy = cfs.getCompactionStrategy(); long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade, cfs.metadata)); - long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableSize()); + long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableBytes()); this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ecec863d/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index 780ff5b..00e0a5a 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -109,7 +109,7 @@ public class StandaloneScrubber // If leveled, load the manifest if (cfs.getCompactionStrategy() instanceof LeveledCompactionStrategy) { - int maxSizeInMB = (int)((cfs.getCompactionStrategy().getMaxSSTableSize()) / (1024L * 1024L)); + int maxSizeInMB = (int)((cfs.getCompactionStrategy().getMaxSSTableBytes()) / (1024L * 1024L)); manifest = LeveledManifest.create(cfs, maxSizeInMB, sstables); }
