Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 5615a79cf -> 3484181e2


Fix ordering on SSTableScanner / SSTableReader close

Patch by jmckenzie; reviewed by marcuse for CASSANDRA-8019


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3484181e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3484181e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3484181e

Branch: refs/heads/cassandra-2.1
Commit: 3484181e2b68ce04acfe4e9028d5d1d76335094a
Parents: 5615a79
Author: Joshua McKenzie <jmcken...@apache.org>
Authored: Thu Nov 13 11:56:34 2014 -0600
Committer: Joshua McKenzie <jmcken...@apache.org>
Committed: Thu Nov 13 11:56:34 2014 -0600

----------------------------------------------------------------------
 .../cassandra/io/sstable/SSTableScanner.java    |   3 +
 .../io/sstable/SSTableRewriterTest.java         | 190 ++++++++++---------
 2 files changed, 106 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3484181e/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 62ac175..3f1f1f0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -63,6 +63,7 @@ public class SSTableScanner implements ICompactionScanner
     SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter 
limiter)
     {
         assert sstable != null;
+        sstable.acquireReference();
 
         this.dfile = limiter == null ? sstable.openDataReader() : 
sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
@@ -92,6 +93,7 @@ public class SSTableScanner implements ICompactionScanner
     SSTableScanner(SSTableReader sstable, Collection<Range<Token>> 
tokenRanges, RateLimiter limiter)
     {
         assert sstable != null;
+        sstable.acquireReference();
 
         this.dfile = limiter == null ? sstable.openDataReader() : 
sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
@@ -159,6 +161,7 @@ public class SSTableScanner implements ICompactionScanner
     public void close() throws IOException
     {
         FileUtils.close(dfile, ifile);
+        sstable.releaseReference();
     }
 
     public long getLengthInBytes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3484181e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 4d248bd..8a494a6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -69,14 +69,16 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
         SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, 
false);
-        AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables);
-        ICompactionScanner scanner = scanners.scanners.get(0);
-        CompactionController controller = new CompactionController(cfs, 
sstables, cfs.gcBefore(System.currentTimeMillis()));
-        writer.switchWriter(getWriter(cfs, 
sstables.iterator().next().descriptor.directory));
-        while(scanner.hasNext())
+        try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables);)
         {
-            AbstractCompactedRow row = new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next()));
-            writer.append(row);
+            ICompactionScanner scanner = scanners.scanners.get(0);
+            CompactionController controller = new CompactionController(cfs, 
sstables, cfs.gcBefore(System.currentTimeMillis()));
+            writer.switchWriter(getWriter(cfs, 
sstables.iterator().next().descriptor.directory));
+            while(scanner.hasNext())
+            {
+                AbstractCompactedRow row = new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next()));
+                writer.append(row);
+            }
         }
         cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, 
writer.finish(), OperationType.COMPACTION);
 
@@ -144,7 +146,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
 
     @Test
-    public void testNumberOfFilesAndSizes() throws InterruptedException
+    public void testNumberOfFilesAndSizes() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -158,20 +160,22 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
-                assertEquals(s.bytesOnDisk(), 
cfs.metric.liveDiskSpaceUsed.count());
-                assertEquals(s.bytesOnDisk(), 
cfs.metric.totalDiskSpaceUsed.count());
-
+                rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, 
s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have 
one original file plus the ones we have switched out.
+                    assertEquals(s.bytesOnDisk(), 
cfs.metric.liveDiskSpaceUsed.count());
+                    assertEquals(s.bytesOnDisk(), 
cfs.metric.totalDiskSpaceUsed.count());
+
+                }
             }
         }
         List<SSTableReader> sstables = rewriter.finish();
@@ -191,7 +195,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testNumberOfFiles_dont_clean_readers() throws 
InterruptedException
+    public void testNumberOfFiles_dont_clean_readers() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -205,17 +209,19 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
+                rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, 
s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have 
one original file plus the ones we have switched out.
+                }
             }
         }
         List<SSTableReader> sstables = rewriter.finish();
@@ -230,7 +236,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
 
     @Test
-    public void testNumberOfFiles_abort() throws InterruptedException
+    public void testNumberOfFiles_abort() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -246,17 +252,19 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
+                rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, 
s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have 
one original file plus the ones we have switched out.
+                }
             }
         }
         rewriter.abort();
@@ -271,7 +279,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testNumberOfFiles_abort2() throws InterruptedException
+    public void testNumberOfFiles_abort2() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -287,23 +295,25 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
-            }
-            if (files == 3)
-            {
-                //testing to abort when we have nothing written in the new file
-                rewriter.abort();
-                break;
+                rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, 
s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have 
one original file plus the ones we have switched out.
+                }
+                if (files == 3)
+                {
+                    //testing to abort when we have nothing written in the new 
file
+                    rewriter.abort();
+                    break;
+                }
             }
         }
         Thread.sleep(1000);
@@ -316,7 +326,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testNumberOfFiles_finish_empty_new_writer() throws 
InterruptedException
+    public void testNumberOfFiles_finish_empty_new_writer() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -330,24 +340,26 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
-            {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
-            }
-            if (files == 3)
+            while(scanner.hasNext())
             {
-                //testing to finish when we have nothing written in the new 
file
-                List<SSTableReader> sstables = rewriter.finish();
-                cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, 
sstables, OperationType.COMPACTION);
-                break;
+                rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, 
s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have 
one original file plus the ones we have switched out.
+                }
+                if (files == 3)
+                {
+                    //testing to finish when we have nothing written in the 
new file
+                    List<SSTableReader> sstables = rewriter.finish();
+                    
cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, 
OperationType.COMPACTION);
+                    break;
+                }
             }
         }
         Thread.sleep(1000);
@@ -357,7 +369,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testNumberOfFiles_truncate() throws InterruptedException
+    public void testNumberOfFiles_truncate() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -371,17 +383,19 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
+                rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, 
s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have 
one original file plus the ones we have switched out.
+                }
             }
         }
         List<SSTableReader> sstables = rewriter.finish();
@@ -393,7 +407,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testSmallFiles() throws InterruptedException
+    public void testSmallFiles() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -408,18 +422,20 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, 
compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+            while(scanner.hasNext())
             {
-                assertEquals(1, cfs.getSSTables().size()); // we dont open 
small files early ...
-                assertEquals(origFirst, 
cfs.getSSTables().iterator().next().first); // ... and the first key should 
stay the same
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
+                rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+                {
+                    assertEquals(1, cfs.getSSTables().size()); // we dont open 
small files early ...
+                    assertEquals(origFirst, 
cfs.getSSTables().iterator().next().first); // ... and the first key should 
stay the same
+                    rewriter.switchWriter(getWriter(cfs, 
s.descriptor.directory));
+                    files++;
+                }
             }
         }
         List<SSTableReader> sstables = rewriter.finish();

Reply via email to