Author: jbellis
Date: Mon Sep 13 14:06:08 2010
New Revision: 996541

URL: http://svn.apache.org/viewvc?rev=996541&view=rev
Log:
remove tombstones during non-major compactions when bloom filter verifies that 
row does not exist in other sstables
patch by Sylvain Lebresne; reviewed by jbellis for CASSANDRA-1074

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
    
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
    
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Sep 13 14:06:08 2010
@@ -65,6 +65,8 @@ dev
  * rename check_schema_agreement to describe_schema_versions
    (CASSANDRA-1478)
  * fix QUORUM calculation for RF > 3 (CASSANDRA-1487)
+ * remove tombstones during non-major compactions when bloom filter
+   verifies that row does not exist in other sstables (CASSANDRA-1074)
 
 
 0.7-beta1

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon 
Sep 13 14:06:08 2010
@@ -614,6 +614,24 @@ public class ColumnFamilyStore implement
         }
     }
 
+    /**
+     * Uses bloom filters to check if key may be present in any sstable in this
+     * ColumnFamilyStore, minus a set of provided ones.
+     *
+     * Because BFs are checked, negative returns ensure that the key is not
+     * present in the checked SSTables, but positive ones doesn't ensure key
+     * presence.
+     */
+    public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<SSTable> 
sstablesToIgnore)
+    {
+        for (SSTableReader sstable : ssTables)
+        {
+            if (!sstablesToIgnore.contains(sstable) && 
sstable.getBloomFilter().isPresent(key.key))
+                return true;
+        }
+        return false;
+    }
+
     /*
      * Called after the Memtable flushes its in-memory data, or we add a file
      * via bootstrap. This information is

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon 
Sep 13 14:06:08 2010
@@ -280,7 +280,7 @@ public class CompactionManager implement
           logger.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
 
         SSTableWriter writer;
-        CompactionIterator ci = new CompactionIterator(sstables, gcBefore, 
major); // retain a handle so we can call close()
+        CompactionIterator ci = new CompactionIterator(cfs, sstables, 
gcBefore, major); // retain a handle so we can call close()
         Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
         executor.beginCompaction(cfs, ci);
 
@@ -368,7 +368,7 @@ public class CompactionManager implement
           logger.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
 
         SSTableWriter writer = null;
-        CompactionIterator ci = new AntiCompactionIterator(sstables, ranges, 
(int) (System.currentTimeMillis() / 1000) - cfs.metadata.gcGraceSeconds, 
cfs.isCompleteSSTables(sstables));
+        CompactionIterator ci = new AntiCompactionIterator(cfs, sstables, 
ranges, (int) (System.currentTimeMillis() / 1000) - 
cfs.metadata.gcGraceSeconds, cfs.isCompleteSSTables(sstables));
         Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
         executor.beginCompaction(cfs, ci);
 
@@ -435,7 +435,7 @@ public class CompactionManager implement
     private void doValidationCompaction(ColumnFamilyStore cfs, 
AntiEntropyService.Validator validator) throws IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionIterator ci = new CompactionIterator(sstables, (int) 
(System.currentTimeMillis() / 1000) - cfs.metadata.gcGraceSeconds, true);
+        CompactionIterator ci = new CompactionIterator(cfs, sstables, (int) 
(System.currentTimeMillis() / 1000) - cfs.metadata.gcGraceSeconds, true);
         executor.beginCompaction(cfs, ci);
         try
         {
@@ -524,10 +524,10 @@ public class CompactionManager implement
     {
         private Set<SSTableScanner> scanners;
 
-        public AntiCompactionIterator(Collection<SSTableReader> sstables, 
Collection<Range> ranges, int gcBefore, boolean isMajor)
+        public AntiCompactionIterator(ColumnFamilyStore cfStore, 
Collection<SSTableReader> sstables, Collection<Range> ranges, int gcBefore, 
boolean isMajor)
                 throws IOException
         {
-            super(getCollatedRangeIterator(sstables, ranges), gcBefore, 
isMajor);
+            super(cfStore, getCollatedRangeIterator(sstables, ranges), 
gcBefore, isMajor);
         }
 
         private static Iterator 
getCollatedRangeIterator(Collection<SSTableReader> sstables, final 
Collection<Range> ranges)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
Mon Sep 13 14:06:08 2010
@@ -23,7 +23,6 @@ package org.apache.cassandra.io;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.IOError;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -36,12 +35,9 @@ import org.apache.commons.collections.it
 
 import org.apache.cassandra.utils.ReducingIterator;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 
 public class CompactionIterator extends 
ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow> implements 
Closeable
 {
@@ -50,6 +46,7 @@ public class CompactionIterator extends 
     protected static final int FILE_BUFFER_SIZE = 1024 * 1024;
 
     protected final List<SSTableIdentityIterator> rows = new 
ArrayList<SSTableIdentityIterator>();
+    private final ColumnFamilyStore cfs;
     private final int gcBefore;
     private final boolean major;
 
@@ -57,13 +54,13 @@ public class CompactionIterator extends 
     private long bytesRead;
     private long row;
 
-    public CompactionIterator(Iterable<SSTableReader> sstables, int gcBefore, 
boolean major) throws IOException
+    public CompactionIterator(ColumnFamilyStore cfs, Iterable<SSTableReader> 
sstables, int gcBefore, boolean major) throws IOException
     {
-        this(getCollatingIterator(sstables), gcBefore, major);
+        this(cfs, getCollatingIterator(sstables), gcBefore, major);
     }
 
     @SuppressWarnings("unchecked")
-    protected CompactionIterator(Iterator iter, int gcBefore, boolean major)
+    protected CompactionIterator(ColumnFamilyStore cfs, Iterator iter, int 
gcBefore, boolean major)
     {
         super(iter);
         row = 0;
@@ -72,6 +69,7 @@ public class CompactionIterator extends 
         {
             totalBytes += scanner.getFileLength();
         }
+        this.cfs = cfs;
         this.gcBefore = gcBefore;
         this.major = major;
     }
@@ -133,9 +131,9 @@ public class CompactionIterator extends 
         {
             logger.info(String.format("Compacting large row %s (%d bytes) 
incrementally",
                                       
FBUtilities.bytesToHex(rows.get(0).getKey().key), rowSize));
-            return new LazilyCompactedRow(rows, major, gcBefore);
+            return new LazilyCompactedRow(cfs, rows, major, gcBefore);
         }
-        return new PrecompactedRow(rows, major, gcBefore);
+        return new PrecompactedRow(cfs, rows, major, gcBefore);
     }
 
     public void close() throws IOException

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java 
Mon Sep 13 14:06:08 2010
@@ -33,10 +33,10 @@ import org.apache.commons.collections.it
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.IIterableColumns;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ReducingIterator;
 
 /**
@@ -53,7 +53,7 @@ import org.apache.cassandra.utils.Reduci
 public class LazilyCompactedRow extends AbstractCompactedRow implements 
IIterableColumns
 {
     private final List<SSTableIdentityIterator> rows;
-    private final boolean major;
+    private final boolean shouldPurge;
     private final int gcBefore;
     private final DataOutputBuffer headerBuffer;
     private ColumnFamily emptyColumnFamily;
@@ -61,15 +61,16 @@ public class LazilyCompactedRow extends 
     private int columnCount;
     private long columnSerializedSize;
 
-    public LazilyCompactedRow(List<SSTableIdentityIterator> rows, boolean 
major, int gcBefore)
+    public LazilyCompactedRow(ColumnFamilyStore cfStore, 
List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
     {
         super(rows.get(0).getKey());
-        this.major = major;
         this.gcBefore = gcBefore;
         this.rows = new ArrayList<SSTableIdentityIterator>(rows);
 
+        Set<SSTable> sstables = new HashSet<SSTable>();
         for (SSTableIdentityIterator row : rows)
         {
+            sstables.add(row.getSSTable());
             ColumnFamily cf = row.getColumnFamily();
 
             if (emptyColumnFamily == null)
@@ -77,6 +78,7 @@ public class LazilyCompactedRow extends 
             else
                 emptyColumnFamily.delete(cf);
         }
+        this.shouldPurge = major || !cfStore.isKeyInRemainingSSTables(key, 
sstables);
 
         // initialize row header so isEmpty can be called
         headerBuffer = new DataOutputBuffer();
@@ -89,7 +91,7 @@ public class LazilyCompactedRow extends 
 
     public void write(DataOutput out) throws IOException
     {
-        if (rows.size() == 1 && !major)
+        if (rows.size() == 1 && !shouldPurge)
         {
             SSTableIdentityIterator row = rows.get(0);
             out.writeLong(row.getDataSize());
@@ -203,7 +205,7 @@ public class LazilyCompactedRow extends 
         {
             assert container != null;
             IColumn reduced = container.iterator().next();
-            ColumnFamily purged = major ? 
ColumnFamilyStore.removeDeleted(container, gcBefore) : container;
+            ColumnFamily purged = shouldPurge ? 
ColumnFamilyStore.removeDeleted(container, gcBefore) : container;
             if (purged == null || !purged.iterator().hasNext())
             {
                 container.clear();

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Mon 
Sep 13 14:06:08 2010
@@ -26,10 +26,13 @@ import java.io.IOError;
 import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.slf4j.Logger;
@@ -51,12 +54,19 @@ public class PrecompactedRow extends Abs
         this.buffer = buffer;
     }
 
-    public PrecompactedRow(List<SSTableIdentityIterator> rows, boolean major, 
int gcBefore)
+    public PrecompactedRow(ColumnFamilyStore cfStore, 
List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
     {
         super(rows.get(0).getKey());
         buffer = new DataOutputBuffer();
 
-        if (rows.size() > 1 || major)
+        Set<SSTable> sstables = new HashSet<SSTable>();
+        for (SSTableIdentityIterator row : rows)
+        {
+            sstables.add(row.getSSTable());
+        }
+        boolean shouldPurge = major || !cfStore.isKeyInRemainingSSTables(key, 
sstables);
+
+        if (rows.size() > 1 || shouldPurge)
         {
             ColumnFamily cf = null;
             for (SSTableIdentityIterator row : rows)
@@ -80,7 +90,7 @@ public class PrecompactedRow extends Abs
                     cf.addAll(thisCF);
                 }
             }
-            ColumnFamily cfPurged = major ? 
ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
+            ColumnFamily cfPurged = shouldPurge ? 
ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
             if (cfPurged == null)
                 return;
             columnCount = 
ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
 Mon Sep 13 14:06:08 2010
@@ -87,6 +87,11 @@ public class SSTableIdentityIterator imp
         return columnFamily;
     }
 
+    public SSTableReader getSSTable()
+    {
+        return sstable;
+    }
+
     public boolean hasNext()
     {
         return file.getFilePointer() < finishedAt;
@@ -162,4 +167,4 @@ public class SSTableIdentityIterator imp
             throw new IOError(e);
         }
     }
-}
\ No newline at end of file
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
Mon Sep 13 14:06:08 2010
@@ -313,6 +313,11 @@ public class SSTableReader extends SSTab
         bf = BloomFilter.alwaysMatchingBloomFilter();
     }
 
+    public BloomFilter getBloomFilter()
+    {
+      return bf;
+    }
+
     /**
      * @return The key cache: for monitoring purposes.
      */

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java 
Mon Sep 13 14:06:08 2010
@@ -37,9 +37,10 @@ import static org.apache.cassandra.db.Ta
 public class CompactionsPurgeTest extends CleanupHelper
 {
     public static final String TABLE1 = "Keyspace1";
+    public static final String TABLE2 = "Keyspace2";
 
     @Test
-    public void testCompactionPurge() throws IOException, ExecutionException, 
InterruptedException
+    public void testMajorCompactionPurge() throws IOException, 
ExecutionException, InterruptedException
     {
         CompactionManager.instance.disableAutoCompaction();
 
@@ -74,25 +75,71 @@ public class CompactionsPurgeTest extend
         rm.apply();
         cfs.forceBlockingFlush();
 
-        // verify that non-major compaction does no GC to ensure correctness 
(see CASSANDRA-604)
-        Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
-        rm = new RowMutation(TABLE1, Util.dk("blah").key);
-        rm.add(new QueryPath(cfName, null, "0".getBytes()), new byte[0], new 
TimestampClock(0));
-        rm.apply();
-        cfs.forceBlockingFlush();
-        CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, (int) 
(System.currentTimeMillis() / 1000) - cfs.metadata.gcGraceSeconds);
-        ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
-        assert cf.getColumnCount() == 10;
-
         // major compact and test that all columns but the resurrected one is 
completely gone
         CompactionManager.instance.submitMajor(cfs, 0, 
Integer.MAX_VALUE).get();
         cfs.invalidateCachedRow(key);
-        cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new 
QueryPath(cfName)));
+        ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
         assertColumns(cf, "5");
         assert cf.getColumn(String.valueOf(5).getBytes()) != null;
     }
 
     @Test
+    public void testMinorCompactionPurge() throws IOException, 
ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open(TABLE2);
+        String cfName = "Standard1";
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+        RowMutation rm;
+        for (int k = 1; k <= 2; ++k) {
+            DecoratedKey key = Util.dk("key" + k);
+
+            // inserts
+            rm = new RowMutation(TABLE2, key.key);
+            for (int i = 0; i < 10; i++)
+            {
+                rm.add(new QueryPath(cfName, null, 
String.valueOf(i).getBytes()), new byte[0], new TimestampClock(0));
+            }
+            rm.apply();
+            cfs.forceBlockingFlush();
+
+            // deletes
+            for (int i = 0; i < 10; i++)
+            {
+                rm = new RowMutation(TABLE2, key.key);
+                rm.delete(new QueryPath(cfName, null, 
String.valueOf(i).getBytes()), new TimestampClock(1));
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        DecoratedKey key1 = Util.dk("key1");
+        DecoratedKey key2 = Util.dk("key2");
+
+        // flush, remember the current sstable and then resurrect one column
+        // for first key. Then submit minor compaction on remembered sstables.
+        cfs.forceBlockingFlush();
+        Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
+        rm = new RowMutation(TABLE2, key1.key);
+        rm.add(new QueryPath(cfName, null, String.valueOf(5).getBytes()), new 
byte[0], new TimestampClock(2));
+        rm.apply();
+        cfs.forceBlockingFlush();
+        CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, 
Integer.MAX_VALUE);
+
+        // verify that minor compaction does not GC when key is present
+        // in a non-compacted sstable
+        ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, new QueryPath(cfName)));
+        assert cf.getColumnCount() == 10;
+
+        // verify that minor compaction does GC when key is provably not
+        // present in a non-compacted sstable
+        cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key2, new 
QueryPath(cfName)));
+        assert cf == null;
+    }
+
+    @Test
     public void testCompactionPurgeOneFile() throws IOException, 
ExecutionException, InterruptedException
     {
         CompactionManager.instance.disableAutoCompaction();

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java 
Mon Sep 13 14:06:08 2010
@@ -51,10 +51,11 @@ import static junit.framework.Assert.ass
 
 public class LazilyCompactedRowTest extends CleanupHelper
 {
-    private void assertBytes(Collection<SSTableReader> sstables, int gcBefore, 
boolean major) throws IOException
+    private void assertBytes(ColumnFamilyStore cfs, int gcBefore, boolean 
major) throws IOException
     {
-        CompactionIterator ci1 = new CompactionIterator(sstables, gcBefore, 
major);
-        LazyCompactionIterator ci2 = new LazyCompactionIterator(sstables, 
gcBefore, major);
+        Collection<SSTableReader> sstables = cfs.getSSTables();
+        CompactionIterator ci1 = new CompactionIterator(cfs, sstables, 
gcBefore, major);
+        LazyCompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables, 
gcBefore, major);
 
         while (true)
         {
@@ -128,7 +129,7 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE, true);
     }
 
     @Test
@@ -146,7 +147,7 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE, true);
     }
 
     @Test
@@ -166,7 +167,7 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE, true);
     }
 
     @Test
@@ -187,7 +188,7 @@ public class LazilyCompactedRowTest exte
         rm.apply();
         cfs.forceBlockingFlush();
 
-        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE, true);
     }
 
     @Test
@@ -209,20 +210,23 @@ public class LazilyCompactedRowTest exte
             cfs.forceBlockingFlush();
         }
 
-        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+        assertBytes(cfs, Integer.MAX_VALUE, true);
     }
 
     private static class LazyCompactionIterator extends CompactionIterator
     {
-        public LazyCompactionIterator(Iterable<SSTableReader> sstables, int 
gcBefore, boolean major) throws IOException
+        private final ColumnFamilyStore cfStore;
+
+        public LazyCompactionIterator(ColumnFamilyStore cfStore, 
Iterable<SSTableReader> sstables, int gcBefore, boolean major) throws 
IOException
         {
-            super(sstables, gcBefore, major);
+            super(cfStore, sstables, gcBefore, major);
+            this.cfStore = cfStore;
         }
 
         @Override
         protected AbstractCompactedRow getCompactedRow()
         {
-            return new LazilyCompactedRow(rows, true, Integer.MAX_VALUE);
+            return new LazilyCompactedRow(cfStore, rows, true, 
Integer.MAX_VALUE);
         }
     }
 }


Reply via email to