L0 contents are overlapping (fix for #4142)
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aead8da9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aead8da9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aead8da9 Branch: refs/heads/trunk Commit: aead8da91a81e4f3b4ad21d3d53157846d7fbb36 Parents: 146f4bd Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed May 9 14:01:18 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed May 9 14:52:04 2012 -0500 ---------------------------------------------------------------------- .../db/compaction/LeveledCompactionStrategy.java | 30 ++++++++++----- 1 files changed, 20 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aead8da9/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 361c333..939fdc6 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -170,8 +171,20 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem byLevel.get(manifest.levelOf(sstable)).add(sstable); List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size()); - for (Integer level : ImmutableSortedSet.copyOf(byLevel.keySet())) - scanners.add(new LeveledScanner(new ArrayList<SSTableReader>(byLevel.get(level)), range)); + for (Integer level : byLevel.keySet()) + { + if (level == 0) + { + // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each + for (SSTableReader sstable : byLevel.get(level)) + scanners.add(sstable.getDirectScanner(range)); + } + else + { + // Create a LeveledScanner that only opens one sstable at a time, in sorted order + scanners.add(new LeveledScanner(byLevel.get(level), range)); + } + } return scanners; } @@ -188,14 +201,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem private SSTableScanner currentScanner; private long positionOffset; - public LeveledScanner(List<SSTableReader> sstables, Range<Token> range) + public LeveledScanner(Collection<SSTableReader> sstables, Range<Token> range) { this.range = range; - this.sstables = sstables; - - // Sorting a list we got in argument is bad but it's all private to this class so let's not bother - Collections.sort(sstables, SSTable.sstableComparator); - this.sstableIterator = sstables.iterator(); + this.sstables = new ArrayList<SSTableReader>(sstables); + Collections.sort(this.sstables, SSTable.sstableComparator); + this.sstableIterator = this.sstables.iterator(); long length = 0; for (SSTableReader sstable : sstables) @@ -225,8 +236,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem if (!sstableIterator.hasNext()) return endOfData(); - SSTableReader reader = sstableIterator.next(); - currentScanner = reader.getDirectScanner(range); + currentScanner = sstableIterator.next().getDirectScanner(range); return computeNext(); } catch (IOException e)