Author: jbellis
Date: Mon Sep 13 14:06:08 2010
New Revision: 996541
URL: http://svn.apache.org/viewvc?rev=996541&view=rev
Log:
remove tombstones during non-major compactions when bloom filter verifies that
row does not exist in other sstables
patch by Sylvain Lebresne; reviewed by jbellis for CASSANDRA-1074
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Sep 13 14:06:08 2010
@@ -65,6 +65,8 @@ dev
* rename check_schema_agreement to describe_schema_versions
(CASSANDRA-1478)
* fix QUORUM calculation for RF > 3 (CASSANDRA-1487)
+ * remove tombstones during non-major compactions when bloom filter
+ verifies that row does not exist in other sstables (CASSANDRA-1074)
0.7-beta1
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon
Sep 13 14:06:08 2010
@@ -614,6 +614,24 @@ public class ColumnFamilyStore implement
}
}
+ /**
+ * Uses bloom filters to check if key may be present in any sstable in this
+ * ColumnFamilyStore, minus a set of provided ones.
+ *
+ * Because BFs are checked, negative returns ensure that the key is not
+ * present in the checked SSTables, but positive ones doesn't ensure key
+ * presence.
+ */
+ public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<SSTable>
sstablesToIgnore)
+ {
+ for (SSTableReader sstable : ssTables)
+ {
+ if (!sstablesToIgnore.contains(sstable) &&
sstable.getBloomFilter().isPresent(key.key))
+ return true;
+ }
+ return false;
+ }
+
/*
* Called after the Memtable flushes its in-memory data, or we add a file
* via bootstrap. This information is
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon
Sep 13 14:06:08 2010
@@ -280,7 +280,7 @@ public class CompactionManager implement
logger.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
SSTableWriter writer;
- CompactionIterator ci = new CompactionIterator(sstables, gcBefore,
major); // retain a handle so we can call close()
+ CompactionIterator ci = new CompactionIterator(cfs, sstables,
gcBefore, major); // retain a handle so we can call close()
Iterator<AbstractCompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
executor.beginCompaction(cfs, ci);
@@ -368,7 +368,7 @@ public class CompactionManager implement
logger.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
SSTableWriter writer = null;
- CompactionIterator ci = new AntiCompactionIterator(sstables, ranges,
(int) (System.currentTimeMillis() / 1000) - cfs.metadata.gcGraceSeconds,
cfs.isCompleteSSTables(sstables));
+ CompactionIterator ci = new AntiCompactionIterator(cfs, sstables,
ranges, (int) (System.currentTimeMillis() / 1000) -
cfs.metadata.gcGraceSeconds, cfs.isCompleteSSTables(sstables));
Iterator<AbstractCompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
executor.beginCompaction(cfs, ci);
@@ -435,7 +435,7 @@ public class CompactionManager implement
private void doValidationCompaction(ColumnFamilyStore cfs,
AntiEntropyService.Validator validator) throws IOException
{
Collection<SSTableReader> sstables = cfs.getSSTables();
- CompactionIterator ci = new CompactionIterator(sstables, (int)
(System.currentTimeMillis() / 1000) - cfs.metadata.gcGraceSeconds, true);
+ CompactionIterator ci = new CompactionIterator(cfs, sstables, (int)
(System.currentTimeMillis() / 1000) - cfs.metadata.gcGraceSeconds, true);
executor.beginCompaction(cfs, ci);
try
{
@@ -524,10 +524,10 @@ public class CompactionManager implement
{
private Set<SSTableScanner> scanners;
- public AntiCompactionIterator(Collection<SSTableReader> sstables,
Collection<Range> ranges, int gcBefore, boolean isMajor)
+ public AntiCompactionIterator(ColumnFamilyStore cfStore,
Collection<SSTableReader> sstables, Collection<Range> ranges, int gcBefore,
boolean isMajor)
throws IOException
{
- super(getCollatedRangeIterator(sstables, ranges), gcBefore,
isMajor);
+ super(cfStore, getCollatedRangeIterator(sstables, ranges),
gcBefore, isMajor);
}
private static Iterator
getCollatedRangeIterator(Collection<SSTableReader> sstables, final
Collection<Range> ranges)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
Mon Sep 13 14:06:08 2010
@@ -23,7 +23,6 @@ package org.apache.cassandra.io;
import java.io.Closeable;
import java.io.IOException;
-import java.io.IOError;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
@@ -36,12 +35,9 @@ import org.apache.commons.collections.it
import org.apache.cassandra.utils.ReducingIterator;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.io.util.DataOutputBuffer;
public class CompactionIterator extends
ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow> implements
Closeable
{
@@ -50,6 +46,7 @@ public class CompactionIterator extends
protected static final int FILE_BUFFER_SIZE = 1024 * 1024;
protected final List<SSTableIdentityIterator> rows = new
ArrayList<SSTableIdentityIterator>();
+ private final ColumnFamilyStore cfs;
private final int gcBefore;
private final boolean major;
@@ -57,13 +54,13 @@ public class CompactionIterator extends
private long bytesRead;
private long row;
- public CompactionIterator(Iterable<SSTableReader> sstables, int gcBefore,
boolean major) throws IOException
+ public CompactionIterator(ColumnFamilyStore cfs, Iterable<SSTableReader>
sstables, int gcBefore, boolean major) throws IOException
{
- this(getCollatingIterator(sstables), gcBefore, major);
+ this(cfs, getCollatingIterator(sstables), gcBefore, major);
}
@SuppressWarnings("unchecked")
- protected CompactionIterator(Iterator iter, int gcBefore, boolean major)
+ protected CompactionIterator(ColumnFamilyStore cfs, Iterator iter, int
gcBefore, boolean major)
{
super(iter);
row = 0;
@@ -72,6 +69,7 @@ public class CompactionIterator extends
{
totalBytes += scanner.getFileLength();
}
+ this.cfs = cfs;
this.gcBefore = gcBefore;
this.major = major;
}
@@ -133,9 +131,9 @@ public class CompactionIterator extends
{
logger.info(String.format("Compacting large row %s (%d bytes)
incrementally",
FBUtilities.bytesToHex(rows.get(0).getKey().key), rowSize));
- return new LazilyCompactedRow(rows, major, gcBefore);
+ return new LazilyCompactedRow(cfs, rows, major, gcBefore);
}
- return new PrecompactedRow(rows, major, gcBefore);
+ return new PrecompactedRow(cfs, rows, major, gcBefore);
}
public void close() throws IOException
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
Mon Sep 13 14:06:08 2010
@@ -33,10 +33,10 @@ import org.apache.commons.collections.it
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.IIterableColumns;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ReducingIterator;
/**
@@ -53,7 +53,7 @@ import org.apache.cassandra.utils.Reduci
public class LazilyCompactedRow extends AbstractCompactedRow implements
IIterableColumns
{
private final List<SSTableIdentityIterator> rows;
- private final boolean major;
+ private final boolean shouldPurge;
private final int gcBefore;
private final DataOutputBuffer headerBuffer;
private ColumnFamily emptyColumnFamily;
@@ -61,15 +61,16 @@ public class LazilyCompactedRow extends
private int columnCount;
private long columnSerializedSize;
- public LazilyCompactedRow(List<SSTableIdentityIterator> rows, boolean
major, int gcBefore)
+ public LazilyCompactedRow(ColumnFamilyStore cfStore,
List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
{
super(rows.get(0).getKey());
- this.major = major;
this.gcBefore = gcBefore;
this.rows = new ArrayList<SSTableIdentityIterator>(rows);
+ Set<SSTable> sstables = new HashSet<SSTable>();
for (SSTableIdentityIterator row : rows)
{
+ sstables.add(row.getSSTable());
ColumnFamily cf = row.getColumnFamily();
if (emptyColumnFamily == null)
@@ -77,6 +78,7 @@ public class LazilyCompactedRow extends
else
emptyColumnFamily.delete(cf);
}
+ this.shouldPurge = major || !cfStore.isKeyInRemainingSSTables(key,
sstables);
// initialize row header so isEmpty can be called
headerBuffer = new DataOutputBuffer();
@@ -89,7 +91,7 @@ public class LazilyCompactedRow extends
public void write(DataOutput out) throws IOException
{
- if (rows.size() == 1 && !major)
+ if (rows.size() == 1 && !shouldPurge)
{
SSTableIdentityIterator row = rows.get(0);
out.writeLong(row.getDataSize());
@@ -203,7 +205,7 @@ public class LazilyCompactedRow extends
{
assert container != null;
IColumn reduced = container.iterator().next();
- ColumnFamily purged = major ?
ColumnFamilyStore.removeDeleted(container, gcBefore) : container;
+ ColumnFamily purged = shouldPurge ?
ColumnFamilyStore.removeDeleted(container, gcBefore) : container;
if (purged == null || !purged.iterator().hasNext())
{
container.clear();
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Mon
Sep 13 14:06:08 2010
@@ -26,10 +26,13 @@ import java.io.IOError;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.slf4j.Logger;
@@ -51,12 +54,19 @@ public class PrecompactedRow extends Abs
this.buffer = buffer;
}
- public PrecompactedRow(List<SSTableIdentityIterator> rows, boolean major,
int gcBefore)
+ public PrecompactedRow(ColumnFamilyStore cfStore,
List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
{
super(rows.get(0).getKey());
buffer = new DataOutputBuffer();
- if (rows.size() > 1 || major)
+ Set<SSTable> sstables = new HashSet<SSTable>();
+ for (SSTableIdentityIterator row : rows)
+ {
+ sstables.add(row.getSSTable());
+ }
+ boolean shouldPurge = major || !cfStore.isKeyInRemainingSSTables(key,
sstables);
+
+ if (rows.size() > 1 || shouldPurge)
{
ColumnFamily cf = null;
for (SSTableIdentityIterator row : rows)
@@ -80,7 +90,7 @@ public class PrecompactedRow extends Abs
cf.addAll(thisCF);
}
}
- ColumnFamily cfPurged = major ?
ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
+ ColumnFamily cfPurged = shouldPurge ?
ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
if (cfPurged == null)
return;
columnCount =
ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
Mon Sep 13 14:06:08 2010
@@ -87,6 +87,11 @@ public class SSTableIdentityIterator imp
return columnFamily;
}
+ public SSTableReader getSSTable()
+ {
+ return sstable;
+ }
+
public boolean hasNext()
{
return file.getFilePointer() < finishedAt;
@@ -162,4 +167,4 @@ public class SSTableIdentityIterator imp
throw new IOError(e);
}
}
-}
\ No newline at end of file
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Mon Sep 13 14:06:08 2010
@@ -313,6 +313,11 @@ public class SSTableReader extends SSTab
bf = BloomFilter.alwaysMatchingBloomFilter();
}
+ public BloomFilter getBloomFilter()
+ {
+ return bf;
+ }
+
/**
* @return The key cache: for monitoring purposes.
*/
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
Mon Sep 13 14:06:08 2010
@@ -37,9 +37,10 @@ import static org.apache.cassandra.db.Ta
public class CompactionsPurgeTest extends CleanupHelper
{
public static final String TABLE1 = "Keyspace1";
+ public static final String TABLE2 = "Keyspace2";
@Test
- public void testCompactionPurge() throws IOException, ExecutionException,
InterruptedException
+ public void testMajorCompactionPurge() throws IOException,
ExecutionException, InterruptedException
{
CompactionManager.instance.disableAutoCompaction();
@@ -74,25 +75,71 @@ public class CompactionsPurgeTest extend
rm.apply();
cfs.forceBlockingFlush();
- // verify that non-major compaction does no GC to ensure correctness
(see CASSANDRA-604)
- Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
- rm = new RowMutation(TABLE1, Util.dk("blah").key);
- rm.add(new QueryPath(cfName, null, "0".getBytes()), new byte[0], new
TimestampClock(0));
- rm.apply();
- cfs.forceBlockingFlush();
- CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, (int)
(System.currentTimeMillis() / 1000) - cfs.metadata.gcGraceSeconds);
- ColumnFamily cf =
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
- assert cf.getColumnCount() == 10;
-
// major compact and test that all columns but the resurrected one is
completely gone
CompactionManager.instance.submitMajor(cfs, 0,
Integer.MAX_VALUE).get();
cfs.invalidateCachedRow(key);
- cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new
QueryPath(cfName)));
+ ColumnFamily cf =
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
assertColumns(cf, "5");
assert cf.getColumn(String.valueOf(5).getBytes()) != null;
}
@Test
+ public void testMinorCompactionPurge() throws IOException,
ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open(TABLE2);
+ String cfName = "Standard1";
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ RowMutation rm;
+ for (int k = 1; k <= 2; ++k) {
+ DecoratedKey key = Util.dk("key" + k);
+
+ // inserts
+ rm = new RowMutation(TABLE2, key.key);
+ for (int i = 0; i < 10; i++)
+ {
+ rm.add(new QueryPath(cfName, null,
String.valueOf(i).getBytes()), new byte[0], new TimestampClock(0));
+ }
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // deletes
+ for (int i = 0; i < 10; i++)
+ {
+ rm = new RowMutation(TABLE2, key.key);
+ rm.delete(new QueryPath(cfName, null,
String.valueOf(i).getBytes()), new TimestampClock(1));
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ DecoratedKey key1 = Util.dk("key1");
+ DecoratedKey key2 = Util.dk("key2");
+
+ // flush, remember the current sstable and then resurrect one column
+ // for first key. Then submit minor compaction on remembered sstables.
+ cfs.forceBlockingFlush();
+ Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
+ rm = new RowMutation(TABLE2, key1.key);
+ rm.add(new QueryPath(cfName, null, String.valueOf(5).getBytes()), new
byte[0], new TimestampClock(2));
+ rm.apply();
+ cfs.forceBlockingFlush();
+ CompactionManager.instance.doCompaction(cfs, sstablesIncomplete,
Integer.MAX_VALUE);
+
+ // verify that minor compaction does not GC when key is present
+ // in a non-compacted sstable
+ ColumnFamily cf =
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, new QueryPath(cfName)));
+ assert cf.getColumnCount() == 10;
+
+ // verify that minor compaction does GC when key is provably not
+ // present in a non-compacted sstable
+ cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key2, new
QueryPath(cfName)));
+ assert cf == null;
+ }
+
+ @Test
public void testCompactionPurgeOneFile() throws IOException,
ExecutionException, InterruptedException
{
CompactionManager.instance.disableAutoCompaction();
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=996541&r1=996540&r2=996541&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Mon Sep 13 14:06:08 2010
@@ -51,10 +51,11 @@ import static junit.framework.Assert.ass
public class LazilyCompactedRowTest extends CleanupHelper
{
- private void assertBytes(Collection<SSTableReader> sstables, int gcBefore,
boolean major) throws IOException
+ private void assertBytes(ColumnFamilyStore cfs, int gcBefore, boolean
major) throws IOException
{
- CompactionIterator ci1 = new CompactionIterator(sstables, gcBefore,
major);
- LazyCompactionIterator ci2 = new LazyCompactionIterator(sstables,
gcBefore, major);
+ Collection<SSTableReader> sstables = cfs.getSSTables();
+ CompactionIterator ci1 = new CompactionIterator(cfs, sstables,
gcBefore, major);
+ LazyCompactionIterator ci2 = new LazyCompactionIterator(cfs, sstables,
gcBefore, major);
while (true)
{
@@ -128,7 +129,7 @@ public class LazilyCompactedRowTest exte
rm.apply();
cfs.forceBlockingFlush();
- assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ assertBytes(cfs, Integer.MAX_VALUE, true);
}
@Test
@@ -146,7 +147,7 @@ public class LazilyCompactedRowTest exte
rm.apply();
cfs.forceBlockingFlush();
- assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ assertBytes(cfs, Integer.MAX_VALUE, true);
}
@Test
@@ -166,7 +167,7 @@ public class LazilyCompactedRowTest exte
rm.apply();
cfs.forceBlockingFlush();
- assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ assertBytes(cfs, Integer.MAX_VALUE, true);
}
@Test
@@ -187,7 +188,7 @@ public class LazilyCompactedRowTest exte
rm.apply();
cfs.forceBlockingFlush();
- assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ assertBytes(cfs, Integer.MAX_VALUE, true);
}
@Test
@@ -209,20 +210,23 @@ public class LazilyCompactedRowTest exte
cfs.forceBlockingFlush();
}
- assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+ assertBytes(cfs, Integer.MAX_VALUE, true);
}
private static class LazyCompactionIterator extends CompactionIterator
{
- public LazyCompactionIterator(Iterable<SSTableReader> sstables, int
gcBefore, boolean major) throws IOException
+ private final ColumnFamilyStore cfStore;
+
+ public LazyCompactionIterator(ColumnFamilyStore cfStore,
Iterable<SSTableReader> sstables, int gcBefore, boolean major) throws
IOException
{
- super(sstables, gcBefore, major);
+ super(cfStore, sstables, gcBefore, major);
+ this.cfStore = cfStore;
}
@Override
protected AbstractCompactedRow getCompactedRow()
{
- return new LazilyCompactedRow(rows, true, Integer.MAX_VALUE);
+ return new LazilyCompactedRow(cfStore, rows, true,
Integer.MAX_VALUE);
}
}
}