Author: slebresne
Date: Wed Jul 6 11:34:50 2011
New Revision: 1143352
URL: http://svn.apache.org/viewvc?rev=1143352&view=rev
Log:
Handle row tombstones correctly in EchoedRow
patch by slebresne; reviewed by jbellis for CASSANDRA-2786
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Jul 6 11:34:50 2011
@@ -15,6 +15,7 @@
* fix index-building status display (CASSANDRA-2853)
* fix CLI perpetuating obsolete KsDef.replication_factor (CASSANDRA-2846)
* improve cli treatment of multiline comments (CASSANDRA-2852)
+ * handle row tombstones correctly in EchoedRow (CASSANDRA-2786)
0.8.1
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/EchoedRow.java
Wed Jul 6 11:34:50 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.security.MessageDigest;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
/**
@@ -35,11 +36,13 @@ import org.apache.cassandra.io.sstable.S
public class EchoedRow extends AbstractCompactedRow
{
private final SSTableIdentityIterator row;
+ private final int gcBefore;
- public EchoedRow(SSTableIdentityIterator row)
+ public EchoedRow(CompactionController controller, SSTableIdentityIterator
row)
{
super(row.getKey());
this.row = row;
+ this.gcBefore = controller.gcBefore;
// Reset SSTableIdentityIterator because we have not guarantee the
filePointer hasn't moved since the Iterator was built
row.reset();
}
@@ -59,7 +62,7 @@ public class EchoedRow extends AbstractC
public boolean isEmpty()
{
- return !row.hasNext();
+ return !row.hasNext() &&
ColumnFamilyStore.removeDeletedCF(row.getColumnFamily(), gcBefore) == null;
}
public int columnCount()
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
Wed Jul 6 11:34:50 2011
@@ -113,7 +113,7 @@ public class CompactionController
public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator>
rows)
{
if (rows.size() == 1 && !needDeserialize() &&
!shouldPurge(rows.get(0).getKey()))
- return new EchoedRow(rows.get(0));
+ return new EchoedRow(this, rows.get(0));
long rowSize = 0;
for (SSTableIdentityIterator row : rows)
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Wed Jul 6 11:34:50 2011
@@ -412,7 +412,8 @@ public class CompactionManager implement
// success: perform the compaction
try
{
- doCompactionWithoutSizeEstimation(cfs, sstables,
gcBefore, location);
+ // Forcing deserialization because in case the
user wants expired columns to be transformed to tombstones
+ doCompactionWithoutSizeEstimation(cfs, sstables,
gcBefore, location, true);
}
finally
{
@@ -501,7 +502,7 @@ public class CompactionManager implement
{
String compactionFileLocation =
table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
if (compactionFileLocation != null)
- return doCompactionWithoutSizeEstimation(cfs, smallerSSTables,
gcBefore, compactionFileLocation);
+ return doCompactionWithoutSizeEstimation(cfs, smallerSSTables,
gcBefore, compactionFileLocation, false);
logger.warn("insufficient space to compact all requested files " +
StringUtils.join(smallerSSTables, ", "));
smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
@@ -515,7 +516,7 @@ public class CompactionManager implement
* For internal use and testing only. The rest of the system should go
through the submit* methods,
* which are properly serialized.
*/
- int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, int gcBefore, String
compactionFileLocation) throws IOException
+ int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, int gcBefore, String
compactionFileLocation, boolean forceDeserialize) throws IOException
{
// The collection of sstables passed may be empty (but not null); even
if
// it is not empty, it may compact down to nothing if all rows are
deleted.
@@ -529,10 +530,6 @@ public class CompactionManager implement
for (SSTableReader sstable : sstables)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);
- // compaction won't normally compact a single sstable, so if that's
what we're doing
- // it must have been requested manually by the user, which probably
means he wants to force
- // tombstone purge, which won't happen unless we force deserializing
the rows.
- boolean forceDeserialize = sstables.size() == 1;
CompactionController controller = new CompactionController(cfs,
sstables, gcBefore, forceDeserialize);
// new sstables from flush can be added during a compaction, but only
the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of
determining if we're compacting
Modified:
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1143352&r1=1143351&r2=1143352&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
(original)
+++
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
Wed Jul 6 11:34:50 2011
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
@@ -181,27 +182,54 @@ public class CompactionsTest extends Cle
if (i % 2 == 0)
store.forceBlockingFlush();
}
+ Collection<SSTableReader> toCompact = store.getSSTables();
- // Force compaction. Since each row is in only one sstable, we will be
using EchoedRow.
- CompactionManager.instance.performMajor(store);
+ // Reinserting the same keys. We will compact only the previous
sstable, but we need those new ones
+ // to make sure we use EchoedRow, otherwise it won't be used because
purge can be done.
+ for (int i=1; i < 5; i++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(i));
+ RowMutation rm = new RowMutation(TABLE1, key.key);
+ rm.add(new QueryPath("Standard2", null,
ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ SSTableReader tmpSSTable = null;
+ for (SSTableReader sstable : store.getSSTables())
+ if (!toCompact.contains(sstable))
+ tmpSSTable = sstable;
+
+ // Force compaction on first sstables. Since each row is in only one
sstable, we will be using EchoedRow.
+ CompactionManager.instance.doCompaction(store, toCompact, (int)
(System.currentTimeMillis() / 1000) - store.metadata.getGcGraceSeconds());
+
+ // Now, we remove the sstable that was just created to force the use
of EchoedRow (so that it doesn't hide the problem)
+ store.markCompacted(Collections.singleton(tmpSSTable));
- // Now assert we do have the two keys
+ // Now assert we do have the 4 keys
assertEquals(4, Util.getRangeSlice(store).size());
}
@Test
public void testDontPurgeAccidentaly() throws IOException,
ExecutionException, InterruptedException
{
+ // Testing with and without forcing deserialization. Without
deserialization, EchoedRow will be used.
+ testDontPurgeAccidentaly("test1", false);
+ testDontPurgeAccidentaly("test2", true);
+ }
+
+ private void testDontPurgeAccidentaly(String k, boolean forceDeserialize)
throws IOException, ExecutionException, InterruptedException
+ {
// This test catches the regression of CASSANDRA-2786
Table table = Table.open(TABLE1);
String cfname = "Super5";
ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
// disable compaction while flushing
+ store.removeAllSSTables();
store.disableAutoCompaction();
// Add test row
- DecoratedKey key = Util.dk("test");
+ DecoratedKey key = Util.dk(k);
RowMutation rm = new RowMutation(TABLE1, key.key);
rm.add(new QueryPath(cfname, ByteBufferUtil.bytes("sc"),
ByteBufferUtil.bytes("c")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
rm.apply();
@@ -210,11 +238,10 @@ public class CompactionsTest extends Cle
Collection<SSTableReader> sstablesBefore = store.getSSTables();
- QueryFilter filter = QueryFilter.getIdentityFilter(Util.dk("test"),
new QueryPath(cfname, null, null));
+ QueryFilter filter = QueryFilter.getIdentityFilter(key, new
QueryPath(cfname, null, null));
assert !store.getColumnFamily(filter).isEmpty();
// Remove key
- key = Util.dk("test");
rm = new RowMutation(TABLE1, key.key);
rm.delete(new QueryPath(cfname, null, null), 2);
rm.apply();
@@ -225,12 +252,13 @@ public class CompactionsTest extends Cle
store.forceBlockingFlush();
Collection<SSTableReader> sstablesAfter = store.getSSTables();
- Collection<Descriptor> toCompact = new ArrayList<Descriptor>();
+ Collection<SSTableReader> toCompact = new ArrayList<SSTableReader>();
for (SSTableReader sstable : sstablesAfter)
if (!sstablesBefore.contains(sstable))
- toCompact.add(sstable.descriptor);
+ toCompact.add(sstable);
- CompactionManager.instance.submitUserDefined(store, toCompact, (int)
(System.currentTimeMillis() / 1000) - store.metadata.getGcGraceSeconds()).get();
+ String location = store.table.getDataFileLocation(1);
+ CompactionManager.instance.doCompactionWithoutSizeEstimation(store,
toCompact, (int) (System.currentTimeMillis() / 1000) -
store.metadata.getGcGraceSeconds(), location, forceDeserialize);
cf = store.getColumnFamily(filter);
assert cf.isEmpty() : "should be empty: " + cf;