Updated Branches: refs/heads/trunk d72e9381f -> effdb08b3
Improve LeveledScanner work estimation patch by Marcus Eriksson; reviewed by jbellis for CASSANDRA-5250 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/effdb08b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/effdb08b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/effdb08b Branch: refs/heads/trunk Commit: effdb08b341e7c3799a50983c6086d7f4bbaf120 Parents: d72e938 Author: Jonathan Ellis <[email protected]> Authored: Thu Mar 21 10:06:41 2013 -0400 Committer: Jonathan Ellis <[email protected]> Committed: Thu Mar 21 10:06:41 2013 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/LeveledCompactionStrategy.java | 40 ++++++++++-- .../io/sstable/SSTableBoundedScanner.java | 5 +- .../apache/cassandra/io/sstable/SSTableReader.java | 49 +-------------- 4 files changed, 38 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/effdb08b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0f9c4f1..b7b0e42 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0 + * Improve LeveledScanner work estimation (CASSANDRA-5250) * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430) * Change Message IDs to ints (CASSANDRA-5307) * Move sstable level information into the Stats component, removing the http://git-wip-us.apache.org/repos/asf/cassandra/blob/effdb08b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index ffe45ad..d081542 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -34,7 +34,6 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; import org.apache.cassandra.notifications.SSTableAddedNotification; @@ -172,7 +171,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem else { // Create a LeveledScanner that only opens one sstable at a time, in sorted order - scanners.add(new LeveledScanner(byLevel.get(level), range)); + List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range); + if (!intersecting.isEmpty()) + scanners.add(new LeveledScanner(intersecting, range)); } } @@ -194,19 +195,46 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem public LeveledScanner(Collection<SSTableReader> sstables, Range<Token> range) { this.range = range; - this.sstables = new ArrayList<SSTableReader>(sstables); + + // add only sstables that intersect our range, and estimate how much data that involves + this.sstables = new ArrayList<SSTableReader>(sstables.size()); + long length = 0; + for (SSTableReader sstable : sstables) + { + this.sstables.add(sstable); + long estimatedKeys = sstable.estimatedKeys(); + double estKeysInRangeRatio = 1.0; + + if (estimatedKeys > 0 && range != null) + estKeysInRangeRatio = ((double) sstable.estimatedKeysForRanges(Collections.singleton(range))) / estimatedKeys; + + length += sstable.uncompressedLength() * estKeysInRangeRatio; + } + + totalLength = length; Collections.sort(this.sstables, SSTable.sstableComparator); sstableIterator = this.sstables.iterator(); + assert sstableIterator.hasNext(); // caller should check intersecting first currentScanner = sstableIterator.next().getDirectScanner(range); + } - long length = 0; + public static List<SSTableReader> intersecting(Collection<SSTableReader> sstables, Range<Token> range) + { + ArrayList<SSTableReader> filtered = new ArrayList<SSTableReader>(); for (SSTableReader sstable : sstables) - length += sstable.uncompressedLength(); - totalLength = length; + { + Range<Token> sstableRange = new Range<Token>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner); + if (range == null || sstableRange.intersects(range)) + filtered.add(sstable); + } + return filtered; } protected OnDiskAtomIterator computeNext() { + if (currentScanner == null) + return endOfData(); + try { while (true) http://git-wip-us.apache.org/repos/asf/cassandra/blob/effdb08b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java index a571901..0e31896 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java @@ -35,11 +35,10 @@ public class SSTableBoundedScanner extends SSTableScanner private final Iterator<Pair<Long, Long>> rangeIterator; private Pair<Long, Long> currentRange; - SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Iterator<Pair<Long, Long>> rangeIterator) + SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Range<Token> range) { super(sstable, skipCache); - this.rangeIterator = rangeIterator; - assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise + this.rangeIterator = sstable.getPositionsForRanges(Collections.singletonList(range)).iterator(); currentRange = rangeIterator.next(); dfile.seek(currentRange.left); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/effdb08b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index e2ef70c..bbef4ec 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -973,10 +973,7 @@ public class SSTableReader extends SSTable if (range == null) return getDirectScanner(); - Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator(); - return rangeIterator.hasNext() - ? new SSTableBoundedScanner(this, true, rangeIterator) - : new EmptyCompactionScanner(getFilename()); + return new SSTableBoundedScanner(this, true, range); } public FileDataInput getFileDataInput(long position) @@ -1210,48 +1207,4 @@ public class SSTableReader extends SSTable sstable.releaseReference(); } } - - private static class EmptyCompactionScanner implements ICompactionScanner - { - private final String filename; - - private EmptyCompactionScanner(String filename) - { - this.filename = filename; - } - - public long getLengthInBytes() - { - return 0; - } - - public long getCurrentPosition() - { - return 0; - } - - public String getBackingFiles() - { - return filename; - } - - public void close() - { - } - - public boolean hasNext() - { - return false; - } - - public OnDiskAtomIterator next() - { - throw new IndexOutOfBoundsException(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } }
