Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 9c0f5753f -> d15c9187a
Make sure we release sstable references after anticompaction Patch by marcuse; reviewed by yukim for CASSANDRA-8386 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d15c9187 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d15c9187 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d15c9187 Branch: refs/heads/cassandra-2.1 Commit: d15c9187a4b66645bf0575a7c3bfdbb9b10a263d Parents: 9c0f575 Author: Marcus Eriksson <marc...@apache.org> Authored: Thu Nov 27 18:12:24 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Dec 2 10:10:33 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 66 +++++++++++--------- .../cassandra/io/sstable/SSTableReader.java | 2 +- .../db/compaction/AntiCompactionTest.java | 55 ++++++++++++---- 4 files changed, 82 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d454ba2..7df396d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Release sstable references after anticompaction (CASSANDRA-8386) * Handle abort() in SSTableRewriter properly (CASSANDRA-8320) * Fix high size calculations for prepared statements (CASSANDRA-8231) * Centralize shared executors (CASSANDRA-8055) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 61628ff..d85ffd7 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -391,6 +391,8 @@ public class CompactionManager implements CompactionManagerMBean /** * Make sure the {validatedForRepair} are marked for compaction before calling this. * + * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getAndReferenceSSTables(..)). + * * @param cfs * @param ranges Ranges that the repair was carried out on * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them. @@ -407,40 +409,48 @@ public class CompactionManager implements CompactionManagerMBean Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); Set<SSTableReader> nonAnticompacting = new HashSet<>(); Iterator<SSTableReader> sstableIterator = sstables.iterator(); - while (sstableIterator.hasNext()) + try { - SSTableReader sstable = sstableIterator.next(); - for (Range<Token> r : Range.normalize(ranges)) + while (sstableIterator.hasNext()) { - Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner); - if (r.contains(sstableRange)) + SSTableReader sstable = sstableIterator.next(); + for (Range<Token> r : Range.normalize(ranges)) { - logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r); - sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt); - sstable.reloadSSTableMetadata(); - mutatedRepairStatuses.add(sstable); - sstableIterator.remove(); - break; - } - else if (!sstableRange.intersects(r)) - { - logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r); - nonAnticompacting.add(sstable); - sstableIterator.remove(); - break; - } - else - { - logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r); + Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner); + if (r.contains(sstableRange)) + { + logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r); + sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt); + sstable.reloadSSTableMetadata(); + mutatedRepairStatuses.add(sstable); + sstableIterator.remove(); + break; + } + else if (!sstableRange.intersects(r)) + { + logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r); + nonAnticompacting.add(sstable); + sstableIterator.remove(); + break; + } + else + { + logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r); + } } } + cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses); + cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses)); + SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses)); + if (!sstables.isEmpty()) + doAntiCompaction(cfs, ranges, sstables, repairedAt); } - cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses); - cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses)); - if (!sstables.isEmpty()) - doAntiCompaction(cfs, ranges, sstables, repairedAt); - SSTableReader.releaseReferences(sstables); - cfs.getDataTracker().unmarkCompacting(sstables); + finally + { + SSTableReader.releaseReferences(sstables); + cfs.getDataTracker().unmarkCompacting(sstables); + } + logger.info(String.format("Completed anticompaction successfully")); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 0a34b4a..0024f24 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -1645,7 +1645,7 @@ public class SSTableReader extends SSTable } @VisibleForTesting - int referenceCount() + public int referenceCount() { return references.get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 5ed4f4a..090839e 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -75,22 +75,30 @@ public class AntiCompactionTest extends SchemaLoader int nonRepairedKeys = 0; for (SSTableReader sstable : store.getSSTables()) { - SSTableScanner scanner = sstable.getScanner(); - while (scanner.hasNext()) + try (SSTableScanner scanner = sstable.getScanner()) { - SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); - if (sstable.isRepaired()) + while (scanner.hasNext()) { - assertTrue(range.contains(row.getKey().getToken())); - repairedKeys++; - } - else - { - assertFalse(range.contains(row.getKey().getToken())); - nonRepairedKeys++; + SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); + if (sstable.isRepaired()) + { + assertTrue(range.contains(row.getKey().getToken())); + repairedKeys++; + } + else + { + assertFalse(range.contains(row.getKey().getToken())); + nonRepairedKeys++; + } } } } + for (SSTableReader sstable : store.getSSTables()) + { + assertFalse(sstable.isMarkedCompacted()); + assertEquals(1, sstable.referenceCount()); + } + assertEquals(0, store.getDataTracker().getCompacting().size()); assertEquals(repairedKeys, 4); assertEquals(nonRepairedKeys, 6); } @@ -103,7 +111,6 @@ public class AntiCompactionTest extends SchemaLoader SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); long origSize = s.bytesOnDisk(); - System.out.println(cfs.metric.liveDiskSpaceUsed.count()); Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500))); Collection<SSTableReader> sstables = cfs.getSSTables(); SSTableReader.acquireReferences(sstables); @@ -146,16 +153,38 @@ public class AntiCompactionTest extends SchemaLoader List<Range<Token>> ranges = Arrays.asList(range); SSTableReader.acquireReferences(sstables); - CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0); + CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1); assertThat(store.getSSTables().size(), is(1)); assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false)); + assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1)); + assertThat(store.getDataTracker().getCompacting().size(), is(0)); } + @Test + public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + Collection<SSTableReader> sstables = store.getUnrepairedSSTables(); + assertEquals(store.getSSTables().size(), sstables.size()); + Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes())); + List<Range<Token>> ranges = Arrays.asList(range); + + SSTableReader.acquireReferences(sstables); + CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1); + + assertThat(store.getSSTables().size(), is(1)); + assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true)); + assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1)); + assertThat(store.getDataTracker().getCompacting().size(), is(0)); + } + + private ColumnFamilyStore prepareColumnFamilyStore() { Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.truncateBlocking(); store.disableAutoCompaction(); long timestamp = System.currentTimeMillis(); for (int i = 0; i < 10; i++)