Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 ffa806733 -> d5e5f9800
Make sure we don't give out positions from an sstable beyond its first/last keys. Patch by marcuse; reviewed by belliottsmith for CASSANDRA-8458 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5e5f980 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5e5f980 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5e5f980 Branch: refs/heads/cassandra-2.1 Commit: d5e5f980093c20b42b89fec7bef8e31808fd37f6 Parents: ffa8067 Author: Marcus Eriksson <[email protected]> Authored: Fri Dec 12 15:50:12 2014 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Wed Dec 17 08:44:57 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableReader.java | 29 ++++++---- .../io/sstable/SSTableRewriterTest.java | 61 ++++++++++++++++++++ 3 files changed, 81 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5e5f980/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d95e02e..410d49a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458) * Disable mmap on Windows (CASSANDRA-6993) * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253) * Add auth support to cassandra-stress (CASSANDRA-7985) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5e5f980/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index a8188ba..bd20226 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -1354,20 +1354,29 @@ public class SSTableReader extends SSTable List<Pair<Long,Long>> positions = new ArrayList<>(); for (Range<Token> range : Range.normalize(ranges)) { - AbstractBounds<RowPosition> keyRange = range.toRowBounds(); - RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT); - long left = idxLeft == null ? -1 : idxLeft.position; - if (left == -1) - // left is past the end of the file + assert !range.isWrapAround() || range.right.isMinimum(); + // truncate the range so it at most covers the sstable + AbstractBounds<RowPosition> bounds = range.toRowBounds(); + RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound(); + RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right; + + if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0) continue; - RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT); - long right = idxRight == null ? -1 : idxRight.position; - if (right == -1 || Range.isWrapAround(range.left, range.right)) - // right is past the end of the file, or it wraps - right = uncompressedLength(); + + long left = getPosition(leftBound, Operator.GT).position; + long right = (rightBound.compareTo(last) > 0) + ? (openReason == OpenReason.EARLY + // if opened early, we overlap with the old sstables by one key, so we know that the last + // (and further) key(s) will be streamed from these if necessary + ? getPosition(last.getToken().maxKeyBound(), Operator.GT).position + : uncompressedLength()) + : getPosition(rightBound, Operator.GT).position; + if (left == right) // empty range continue; + + assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right); positions.add(Pair.create(left, right)); } return positions; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5e5f980/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index ecf97c3..6f9acea 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; + import com.google.common.collect.Sets; import org.junit.Test; @@ -40,10 +41,13 @@ import org.apache.cassandra.db.compaction.CompactionController; import org.apache.cassandra.db.compaction.LazilyCompactedRow; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.compaction.SSTableSplitter; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -121,6 +125,63 @@ public class SSTableRewriterTest extends SchemaLoader } @Test + public void getPositionsTest() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); + assertEquals(1, sstables.size()); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); + boolean checked = false; + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) + { + ISSTableScanner scanner = scanners.scanners.get(0); + CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + while (scanner.hasNext()) + { + AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); + writer.append(row); + if (!checked && writer.currentWriter().getFilePointer() > 15000000) + { + checked = true; + for (SSTableReader sstable : cfs.getSSTables()) + { + if (sstable.openReason == SSTableReader.OpenReason.EARLY) + { + SSTableReader c = sstables.iterator().next(); + long lastKeySize = sstable.getPosition(sstable.last, SSTableReader.Operator.GT).position - sstable.getPosition(sstable.last, SSTableReader.Operator.EQ).position; + Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.partitioner.getMinimumToken(), cfs.partitioner.getMinimumToken())); + List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r); + List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r); + assertEquals(1, tmplinkPositions.size()); + assertEquals(1, compactingPositions.size()); + assertEquals(0, tmplinkPositions.get(0).left.longValue()); + // make sure we have one key overlap between the early opened file and the compacting one: + assertEquals(tmplinkPositions.get(0).right.longValue(), compactingPositions.get(0).left + lastKeySize); + assertEquals(c.uncompressedLength(), compactingPositions.get(0).right.longValue()); + } + } + } + } + } + assertTrue(checked); + Collection<SSTableReader> newsstables = writer.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); + Thread.sleep(100); + validateCFS(cfs); + int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); + assertEquals(1, filecounts); + cfs.truncateBlocking(); + validateCFS(cfs); + } + + @Test public void testFileRemoval() throws InterruptedException { Keyspace keyspace = Keyspace.open(KEYSPACE);
