Author: jbellis
Date: Sun Sep 19 16:37:50 2010
New Revision: 998692
URL: http://svn.apache.org/viewvc?rev=998692&view=rev
Log:
backport 1074 from trunk. patch by Sylvain Lebresne; reviewed by jbellis
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Sun Sep 19 16:37:50 2010
@@ -22,6 +22,8 @@
newly added nodes (CASSANDRA-1467)
* use JNA, if present, to take snapshots (CASSANDRA-1371)
* make IndexInterval configurable (CASSANDRA-1488)
+ * remove tombstones during non-major compactions when bloom filter
+ verifies that row does not exist in other sstables (CASSANDRA-1074)
0.6.5
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Sun Sep 19 16:37:50 2010
@@ -513,6 +513,24 @@ public class ColumnFamilyStore implement
}
}
+ /**
+ * Uses bloom filters to check if key may be present in any sstable in this
+ * ColumnFamilyStore, minus a set of provided ones.
+ *
+ * Because BFs are checked, negative returns ensure that the key is not
+ * present in the checked SSTables, but positive ones doesn't ensure key
+ * presence.
+ */
+ public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<SSTable>
sstablesToIgnore)
+ {
+ for (SSTableReader sstable : ssTables_)
+ {
+ if (!sstablesToIgnore.contains(sstable) &&
sstable.getBloomFilter().isPresent(key.key))
+ return true;
+ }
+ return false;
+ }
+
/*
* Called after the Memtable flushes its in-memory data, or we add a file
* via bootstrap. This information is
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
Sun Sep 19 16:37:50 2010
@@ -275,7 +275,7 @@ public class CompactionManager implement
logger.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
SSTableWriter writer;
- CompactionIterator ci = new CompactionIterator(sstables, gcBefore,
major); // retain a handle so we can call close()
+ CompactionIterator ci = new CompactionIterator(cfs, sstables,
gcBefore, major); // retain a handle so we can call close()
Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
executor.beginCompaction(cfs, ci);
@@ -359,7 +359,7 @@ public class CompactionManager implement
logger.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
SSTableWriter writer = null;
- CompactionIterator ci = new AntiCompactionIterator(sstables, ranges,
getDefaultGCBefore(), cfs.isCompleteSSTables(sstables));
+ CompactionIterator ci = new AntiCompactionIterator(cfs, sstables,
ranges, getDefaultGCBefore(), cfs.isCompleteSSTables(sstables));
Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
executor.beginCompaction(cfs, ci);
@@ -422,7 +422,7 @@ public class CompactionManager implement
private void doValidationCompaction(ColumnFamilyStore cfs,
AntiEntropyService.Validator validator) throws IOException
{
Collection<SSTableReader> sstables = cfs.getSSTables();
- CompactionIterator ci = new CompactionIterator(sstables,
getDefaultGCBefore(), true);
+ CompactionIterator ci = new CompactionIterator(cfs, sstables,
getDefaultGCBefore(), true);
executor.beginCompaction(cfs, ci);
try
{
@@ -495,10 +495,10 @@ public class CompactionManager implement
{
private Set<SSTableScanner> scanners;
- public AntiCompactionIterator(Collection<SSTableReader> sstables,
Collection<Range> ranges, int gcBefore, boolean isMajor)
+ public AntiCompactionIterator(ColumnFamilyStore cfStore,
Collection<SSTableReader> sstables, Collection<Range> ranges, int gcBefore,
boolean isMajor)
throws IOException
{
- super(getCollatedRangeIterator(sstables, ranges), gcBefore,
isMajor);
+ super(cfStore, getCollatedRangeIterator(sstables, ranges),
gcBefore, isMajor);
}
private static Iterator
getCollatedRangeIterator(Collection<SSTableReader> sstables, final
Collection<Range> ranges)
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/CompactionIterator.java
Sun Sep 19 16:37:50 2010
@@ -26,6 +26,8 @@ import java.io.IOException;
import java.io.IOError;
import java.util.List;
import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
import java.util.Iterator;
import org.apache.log4j.Logger;
@@ -45,6 +47,7 @@ public class CompactionIterator extends
protected static final int FILE_BUFFER_SIZE = 1024 * 1024;
private final List<IteratingRow> rows = new ArrayList<IteratingRow>();
+ private final ColumnFamilyStore cfs;
private final int gcBefore;
private final boolean major;
@@ -52,13 +55,13 @@ public class CompactionIterator extends
private long bytesRead;
private long row;
- public CompactionIterator(Iterable<SSTableReader> sstables, int gcBefore,
boolean major) throws IOException
+ public CompactionIterator(ColumnFamilyStore cfs, Iterable<SSTableReader>
sstables, int gcBefore, boolean major) throws IOException
{
- this(getCollatingIterator(sstables), gcBefore, major);
+ this(cfs, getCollatingIterator(sstables), gcBefore, major);
}
@SuppressWarnings("unchecked")
- protected CompactionIterator(Iterator iter, int gcBefore, boolean major)
+ protected CompactionIterator(ColumnFamilyStore cfs, Iterator iter, int
gcBefore, boolean major)
{
super(iter);
row = 0;
@@ -67,6 +70,7 @@ public class CompactionIterator extends
{
totalBytes += scanner.getFileLength();
}
+ this.cfs = cfs;
this.gcBefore = gcBefore;
this.major = major;
}
@@ -99,9 +103,14 @@ public class CompactionIterator extends
DataOutputBuffer buffer = new DataOutputBuffer();
DecoratedKey key = rows.get(0).getKey();
+ Set<SSTable> sstables = new HashSet<SSTable>();
+ for (IteratingRow row : rows)
+ sstables.add(row.sstable);
+ boolean shouldPurge = major || !cfs.isKeyInRemainingSSTables(key,
sstables);
+
try
{
- if (rows.size() > 1 || major)
+ if (rows.size() > 1 || shouldPurge)
{
ColumnFamily cf = null;
for (IteratingRow row : rows)
@@ -125,7 +134,7 @@ public class CompactionIterator extends
cf.addAll(thisCF);
}
}
- ColumnFamily cfPurged = major ?
ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
+ ColumnFamily cfPurged = shouldPurge ?
ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
if (cfPurged == null)
return null;
ColumnFamily.serializer().serializeWithIndexes(cfPurged,
buffer);
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IteratingRow.java
Sun Sep 19 16:37:50 2010
@@ -37,7 +37,7 @@ public class IteratingRow extends Abstra
private final DecoratedKey key;
private final long finishedAt;
private final BufferedRandomAccessFile file;
- private SSTableReader sstable;
+ public final SSTableReader sstable;
private long dataStart;
public IteratingRow(BufferedRandomAccessFile file, SSTableReader sstable)
throws IOException
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
Sun Sep 19 16:37:50 2010
@@ -497,6 +497,11 @@ public class SSTableReader extends SSTab
bf = BloomFilter.alwaysMatchingBloomFilter();
}
+ public BloomFilter getBloomFilter()
+ {
+ return bf;
+ }
+
public IPartitioner getPartitioner()
{
return partitioner;
Modified:
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java?rev=998692&r1=998691&r2=998692&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
(original)
+++
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
Sun Sep 19 16:37:50 2010
@@ -35,9 +35,10 @@ import static org.apache.cassandra.db.Ta
public class CompactionsPurgeTest extends CleanupHelper
{
public static final String TABLE1 = "Keyspace1";
+ public static final String TABLE2 = "Keyspace2";
@Test
- public void testCompactionPurge() throws IOException, ExecutionException,
InterruptedException
+ public void testMajorCompactionPurge() throws IOException,
ExecutionException, InterruptedException
{
CompactionManager.instance.disableAutoCompaction();
@@ -72,25 +73,71 @@ public class CompactionsPurgeTest extend
rm.apply();
cfs.forceBlockingFlush();
- // verify that non-major compaction does no GC to ensure correctness
(see CASSANDRA-604)
- Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
- rm = new RowMutation(TABLE1, key + "x");
- rm.add(new QueryPath(cfName, null, "0".getBytes()), new byte[0], 0);
- rm.apply();
- cfs.forceBlockingFlush();
- CompactionManager.instance.doCompaction(cfs, sstablesIncomplete,
CompactionManager.getDefaultGCBefore());
- ColumnFamily cf = cfs.getColumnFamily(new IdentityQueryFilter(key, new
QueryPath(cfName)));
- assert cf.getColumnCount() == 10;
-
// major compact and test that all columns but the resurrected one is
completely gone
CompactionManager.instance.submitMajor(cfs, 0,
Integer.MAX_VALUE).get();
cfs.invalidateCachedRow(key);
- cf = cfs.getColumnFamily(new IdentityQueryFilter(key, new
QueryPath(cfName)));
+ ColumnFamily cf = cfs.getColumnFamily(new IdentityQueryFilter(key, new
QueryPath(cfName)));
assertColumns(cf, "5");
assert cf.getColumn(String.valueOf(5).getBytes()) != null;
}
@Test
+ public void testMinorCompactionPurge() throws IOException,
ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open(TABLE2);
+ String cfName = "Standard1";
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ RowMutation rm;
+ for (int k = 1; k <= 2; ++k) {
+ String key = "key" + k;
+
+ // inserts
+ rm = new RowMutation(TABLE2, key);
+ for (int i = 0; i < 10; i++)
+ {
+ rm.add(new QueryPath(cfName, null,
String.valueOf(i).getBytes()), new byte[0], 0);
+ }
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // deletes
+ for (int i = 0; i < 10; i++)
+ {
+ rm = new RowMutation(TABLE2, key);
+ rm.delete(new QueryPath(cfName, null,
String.valueOf(i).getBytes()), 1);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ String key1 = "key1";
+ String key2 = "key2";
+
+ // flush, remember the current sstable and then resurrect one column
+ // for first key. Then submit minor compaction on remembered sstables.
+ cfs.forceBlockingFlush();
+ Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
+ rm = new RowMutation(TABLE2, key1);
+ rm.add(new QueryPath(cfName, null, String.valueOf(5).getBytes()), new
byte[0], 2);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ CompactionManager.instance.doCompaction(cfs, sstablesIncomplete,
Integer.MAX_VALUE);
+
+ // verify that minor compaction does not GC when key is present
+ // in a non-compacted sstable
+ ColumnFamily cf = cfs.getColumnFamily(new IdentityQueryFilter(key1,
new QueryPath(cfName)));
+ assert cf.getColumnCount() == 10;
+
+ // verify that minor compaction does GC when key is provably not
+ // present in a non-compacted sstable
+ cf = cfs.getColumnFamily(new IdentityQueryFilter(key2, new
QueryPath(cfName)));
+ assert cf == null;
+ }
+
+ @Test
public void testCompactionPurgeOneFile() throws IOException,
ExecutionException, InterruptedException
{
CompactionManager.instance.disableAutoCompaction();