Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 5615a79cf -> 3484181e2
Fix ordering on SSTableScanner / SSTableReader close Patch by jmckenzie; reviewed by marcuse for CASSANDRA-8019 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3484181e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3484181e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3484181e Branch: refs/heads/cassandra-2.1 Commit: 3484181e2b68ce04acfe4e9028d5d1d76335094a Parents: 5615a79 Author: Joshua McKenzie <jmcken...@apache.org> Authored: Thu Nov 13 11:56:34 2014 -0600 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Thu Nov 13 11:56:34 2014 -0600 ---------------------------------------------------------------------- .../cassandra/io/sstable/SSTableScanner.java | 3 + .../io/sstable/SSTableRewriterTest.java | 190 ++++++++++--------- 2 files changed, 106 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3484181e/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 62ac175..3f1f1f0 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -63,6 +63,7 @@ public class SSTableScanner implements ICompactionScanner SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) { assert sstable != null; + sstable.acquireReference(); this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); this.ifile = sstable.openIndexReader(); @@ -92,6 +93,7 @@ public class SSTableScanner implements ICompactionScanner SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) { assert sstable != null; + sstable.acquireReference(); this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); this.ifile = sstable.openIndexReader(); @@ -159,6 +161,7 @@ public class SSTableScanner implements ICompactionScanner public void close() throws IOException { FileUtils.close(dfile, ifile); + sstable.releaseReference(); } public long getLengthInBytes() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3484181e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 4d248bd..8a494a6 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -69,14 +69,16 @@ public class SSTableRewriterTest extends SchemaLoader Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); assertEquals(1, sstables.size()); SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); - AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); - ICompactionScanner scanner = scanners.scanners.get(0); - CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); - writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); - while(scanner.hasNext()) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) { - AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); - writer.append(row); + ICompactionScanner scanner = scanners.scanners.get(0); + CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + while(scanner.hasNext()) + { + AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); + writer.append(row); + } } cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION); @@ -144,7 +146,7 @@ public class SSTableRewriterTest extends SchemaLoader @Test - public void testNumberOfFilesAndSizes() throws InterruptedException + public void testNumberOfFilesAndSizes() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -158,20 +160,22 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - ICompactionScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0); int files = 1; - while(scanner.hasNext()) + try (ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) { - rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); - if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + while(scanner.hasNext()) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - files++; - assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. - assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count()); - assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count()); - + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count()); + assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count()); + + } } } List<SSTableReader> sstables = rewriter.finish(); @@ -191,7 +195,7 @@ public class SSTableRewriterTest extends SchemaLoader } @Test - public void testNumberOfFiles_dont_clean_readers() throws InterruptedException + public void testNumberOfFiles_dont_clean_readers() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -205,17 +209,19 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - ICompactionScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0); int files = 1; - while(scanner.hasNext()) + try (ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) { - rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); - if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + while(scanner.hasNext()) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - files++; - assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } } } List<SSTableReader> sstables = rewriter.finish(); @@ -230,7 +236,7 @@ public class SSTableRewriterTest extends SchemaLoader @Test - public void testNumberOfFiles_abort() throws InterruptedException + public void testNumberOfFiles_abort() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -246,17 +252,19 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - ICompactionScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0); int files = 1; - while(scanner.hasNext()) + try (ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) { - rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); - if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + while(scanner.hasNext()) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - files++; - assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } } } rewriter.abort(); @@ -271,7 +279,7 @@ public class SSTableRewriterTest extends SchemaLoader } @Test - public void testNumberOfFiles_abort2() throws InterruptedException + public void testNumberOfFiles_abort2() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -287,23 +295,25 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - ICompactionScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0); int files = 1; - while(scanner.hasNext()) + try (ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) { - rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); - if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + while(scanner.hasNext()) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - files++; - assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. - } - if (files == 3) - { - //testing to abort when we have nothing written in the new file - rewriter.abort(); - break; + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } + if (files == 3) + { + //testing to abort when we have nothing written in the new file + rewriter.abort(); + break; + } } } Thread.sleep(1000); @@ -316,7 +326,7 @@ public class SSTableRewriterTest extends SchemaLoader } @Test - public void testNumberOfFiles_finish_empty_new_writer() throws InterruptedException + public void testNumberOfFiles_finish_empty_new_writer() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -330,24 +340,26 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - ICompactionScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0); int files = 1; - while(scanner.hasNext()) + try (ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) { - rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); - if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) - { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - files++; - assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. - } - if (files == 3) + while(scanner.hasNext()) { - //testing to finish when we have nothing written in the new file - List<SSTableReader> sstables = rewriter.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); - break; + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } + if (files == 3) + { + //testing to finish when we have nothing written in the new file + List<SSTableReader> sstables = rewriter.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + break; + } } } Thread.sleep(1000); @@ -357,7 +369,7 @@ public class SSTableRewriterTest extends SchemaLoader } @Test - public void testNumberOfFiles_truncate() throws InterruptedException + public void testNumberOfFiles_truncate() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -371,17 +383,19 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - ICompactionScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0); int files = 1; - while(scanner.hasNext()) + try (ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) { - rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); - if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + while(scanner.hasNext()) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - files++; - assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } } } List<SSTableReader> sstables = rewriter.finish(); @@ -393,7 +407,7 @@ public class SSTableRewriterTest extends SchemaLoader } @Test - public void testSmallFiles() throws InterruptedException + public void testSmallFiles() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -408,18 +422,20 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - ICompactionScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0); int files = 1; - while(scanner.hasNext()) + try (ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0)) { - rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); - if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000) + while(scanner.hasNext()) { - assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ... - assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - files++; + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000) + { + assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ... + assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + } } } List<SSTableReader> sstables = rewriter.finish();