Author: gdusbabek
Date: Wed Dec 15 15:22:27 2010
New Revision: 1049588
URL: http://svn.apache.org/viewvc?rev=1049588&view=rev
Log:
track row deletions when merging cols to form a row. patch by gdusbabek and
jbellis. CASSANDRA-1837
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1049588&r1=1049587&r2=1049588&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Dec 15 15:22:27 2010
@@ -11,6 +11,8 @@ dev
* make ByteBufferUtil.clone thread-safe (CASSANDRA-1847)
* change exception for read requests during bootstrap from
InvalidRequest to Unavailable (CASSANDRA-1862)
+ * deleted columns resurrected after a flush on slice read path
+ (CASSANDRA-1837)
0.7.0-rc2
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1049588&r1=1049587&r2=1049588&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
Wed Dec 15 15:22:27 2010
@@ -18,6 +18,8 @@
package org.apache.cassandra.db;
import java.io.Closeable;
+import java.io.IOError;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -128,8 +130,7 @@ public class RowIteratorFactory
Comparator<IColumn> colComparator =
filter.filter.getColumnComparator(comparator);
Iterator<IColumn> colCollated =
IteratorUtils.collatedIterator(colComparator, colIters);
- ColumnFamily returnCF = null;
-
+ ColumnFamily returnCF;
// First check if this row is in the rowCache. If it is we can
skip the rest
ColumnFamily cached = cfs.getRawCachedRow(key);
if (cached != null)
@@ -137,23 +138,34 @@ public class RowIteratorFactory
QueryFilter keyFilter = new QueryFilter(key, filter.path,
filter.filter);
returnCF = cfs.filterColumnFamily(cached, keyFilter,
gcBefore);
}
- else
+ else if (colCollated.hasNext())
{
- returnCF = firstMemtable.getColumnFamily(key);
+ returnCF = firstMemtable.getColumnFamily(key);
// TODO this is a little subtle: the Memtable
ColumnIterator has to be a shallow clone of the source CF,
// with deletion times set correctly, so we can use it as
the "base" CF to add query results to.
// (for sstable ColumnIterators we do not care if it is a
shallow clone or not.)
returnCF = returnCF == null ?
ColumnFamily.create(firstMemtable.getTableName(), filter.getColumnFamilyName())
- : returnCF.cloneMeShallow();
-
- if (colCollated.hasNext())
- {
- filter.collectCollatedColumns(returnCF, colCollated,
gcBefore);
- }
- else
+ : returnCF.cloneMeShallow();
+ long lastDeletedAt = Long.MIN_VALUE;
+ for (IColumnIterator columns : colIters)
{
- returnCF = null;
+ columns.hasNext(); // force cf initializtion
+ try
+ {
+ if (columns.getColumnFamily().isMarkedForDelete())
+ lastDeletedAt = Math.max(lastDeletedAt,
columns.getColumnFamily().getMarkedForDeleteAt());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
+ returnCF.markedForDeleteAt.set(lastDeletedAt);
+ filter.collectCollatedColumns(returnCF, colCollated,
gcBefore);
+ }
+ else
+ {
+ returnCF = null;
}
Row rv = new Row(key, returnCF);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java?rev=1049588&r1=1049587&r2=1049588&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
Wed Dec 15 15:22:27 2010
@@ -31,8 +31,9 @@ import org.apache.cassandra.db.ColumnFam
public interface IColumnIterator extends Iterator<IColumn>
{
/**
- * returns the CF of the column being iterated. Do not modify the
returned CF; clone first.
- * The CF is only guaranteed to be available after a call to next() or
hasNext().
+ * returns the CF of the column being iterated. Do not modify the
returned CF; clone first.
+ * The CF is only guaranteed to be available after a call to next() or
hasNext().
+ * Guaranteed to be non-null.
* @throws IOException
*/
public abstract ColumnFamily getColumnFamily() throws IOException;
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1049588&r1=1049587&r2=1049588&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Wed Dec 15 15:22:27 2010
@@ -21,12 +21,17 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.junit.Test;
@@ -48,6 +53,8 @@ import org.apache.cassandra.utils.FBUtil
import org.apache.cassandra.utils.WrappedRunnable;
import static junit.framework.Assert.assertEquals;
+import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.getBytes;
import static org.junit.Assert.assertNull;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -328,6 +335,170 @@ public class ColumnFamilyStoreTest exten
String key = new
String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
assert "k1".equals( key );
}
+
+ @Test
+ public void testDeleteSuperRowSticksAfterFlush() throws Throwable
+ {
+ String tableName = "Keyspace1";
+ String cfName= "Super1";
+ ByteBuffer scfName = ByteBufferUtil.bytes("SuperDuper");
+ Table table = Table.open(tableName);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+ DecoratedKey key = Util.dk("flush-resurrection");
+
+ // create an isolated sstable.
+ putColsSuper(cfs, key, scfName,
+ new Column(getBytes(1), ByteBufferUtil.bytes("val1"), 1),
+ new Column(getBytes(2), ByteBufferUtil.bytes("val2"), 1),
+ new Column(getBytes(3), ByteBufferUtil.bytes("val3"), 1));
+ cfs.forceBlockingFlush();
+
+ // insert, don't flush.
+ putColsSuper(cfs, key, scfName,
+ new Column(getBytes(4), ByteBufferUtil.bytes("val4"), 1),
+ new Column(getBytes(5), ByteBufferUtil.bytes("val5"), 1),
+ new Column(getBytes(6), ByteBufferUtil.bytes("val6"), 1));
+
+ // verify insert.
+ final SlicePredicate sp = new SlicePredicate();
+ sp.setSlice_range(new SliceRange());
+ sp.getSlice_range().setCount(100);
+ sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
+ sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
+
+ assertRowAndColCount(1, 6, scfName, false, cfs.getRangeSlice(scfName,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // deeleet.
+ RowMutation rm = new RowMutation(table.name, key.key);
+ rm.delete(new QueryPath(cfName, scfName), 2);
+ rm.apply();
+
+ // verify delete.
+ assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(scfName,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // flush
+ cfs.forceBlockingFlush();
+
+ // re-verify delete.
+ assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(scfName,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // late insert.
+ putColsSuper(cfs, key, scfName,
+ new Column(getBytes(4), ByteBufferUtil.bytes("val4"), 1L),
+ new Column(getBytes(7), ByteBufferUtil.bytes("val7"), 1L));
+
+ // re-verify delete.
+ assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(scfName,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // make sure new writes are recognized.
+ putColsSuper(cfs, key, scfName,
+ new Column(getBytes(3), ByteBufferUtil.bytes("val3"), 3),
+ new Column(getBytes(8), ByteBufferUtil.bytes("val8"), 3),
+ new Column(getBytes(9), ByteBufferUtil.bytes("val9"), 3));
+ assertRowAndColCount(1, 3, scfName, false, cfs.getRangeSlice(scfName,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+ }
+
+ private static void assertRowAndColCount(int rowCount, int colCount,
ByteBuffer sc, boolean isDeleted, Collection<Row> rows)
+ {
+ assert rows.size() == rowCount : "rowcount " + rows.size();
+ for (Row row : rows)
+ {
+ assert row.cf != null : "cf was null";
+ if (sc != null)
+ assert row.cf.getColumn(sc).getSubColumns().size() == colCount
: row.cf.getColumn(sc).getSubColumns().size();
+ else
+ assert row.cf.getColumnCount() == colCount : "colcount " +
row.cf.getColumnCount() + "|" + str(row.cf);
+ if (isDeleted)
+ assert row.cf.isMarkedForDelete() : "cf not marked for delete";
+ }
+ }
+
+ private static String str(ColumnFamily cf)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (IColumn col : cf.getSortedColumns())
+ sb.append(String.format("(%s,%s,%d),", new
String(col.name().array()), new String(col.value().array()), col.timestamp()));
+ return sb.toString();
+ }
+
+ private static void putColsSuper(ColumnFamilyStore cfs, DecoratedKey key,
ByteBuffer scfName, Column... cols) throws Throwable
+ {
+ RowMutation rm = new RowMutation(cfs.table.name, key.key);
+ ColumnFamily cf = ColumnFamily.create(cfs.table.name,
cfs.getColumnFamilyName());
+ SuperColumn sc = new SuperColumn(scfName, LongType.instance);
+ for (Column col : cols)
+ sc.addColumn(col);
+ cf.addColumn(sc);
+ rm.add(cf);
+ rm.apply();
+ }
+
+ private static void putColsStandard(ColumnFamilyStore cfs, DecoratedKey
key, Column... cols) throws Throwable
+ {
+ RowMutation rm = new RowMutation(cfs.table.name, key.key);
+ ColumnFamily cf = ColumnFamily.create(cfs.table.name,
cfs.getColumnFamilyName());
+ for (Column col : cols)
+ cf.addColumn(col);
+ rm.add(cf);
+ rm.apply();
+ }
+
+ @Test
+ public void testDeleteStandardRowSticksAfterFlush() throws Throwable
+ {
+ // test to make sure flushing after a delete doesn't resurrect delted
cols.
+ String tableName = "Keyspace1";
+ String cfName = "Standard1";
+ Table table = Table.open(tableName);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+ DecoratedKey key = Util.dk("f-flush-resurrection");
+
+ SlicePredicate sp = new SlicePredicate();
+ sp.setSlice_range(new SliceRange());
+ sp.getSlice_range().setCount(100);
+ sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
+ sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
+
+ // insert
+ putColsStandard(cfs, key, column("col1", "val1", 1), column("col2",
"val2", 1));
+ assertRowAndColCount(1, 2, null, false, cfs.getRangeSlice(null,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // flush.
+ cfs.forceBlockingFlush();
+
+ // insert, don't flush
+ putColsStandard(cfs, key, column("col3", "val3", 1), column("col4",
"val4", 1));
+ assertRowAndColCount(1, 4, null, false, cfs.getRangeSlice(null,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // delete (from sstable and memtable)
+ RowMutation rm = new RowMutation(table.name, key.key);
+ rm.delete(new QueryPath(cfs.columnFamily, null, null), 2);
+ rm.apply();
+
+ // verify delete
+ assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(null,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // flush
+ cfs.forceBlockingFlush();
+
+ // re-verify delete. // first breakage is right here because of
CASSANDRA-1837.
+ assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(null,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // simulate a 'late' insertion that gets put in after the deletion.
should get inserted, but fail on read.
+ putColsStandard(cfs, key, column("col5", "val5", 1), column("col2",
"val2", 1));
+
+ // should still be nothing there because we deleted this row. 2nd
breakage, but was undetected because of 1837.
+ assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(null,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // make sure that new writes are recognized.
+ putColsStandard(cfs, key, column("col6", "val6", 3), column("col7",
"val7", 3));
+ assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(null,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+
+ // and it remains so after flush. (this wasn't failing before, but
it's good to check.)
+ cfs.forceBlockingFlush();
+ assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(null,
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+ }
+
private ColumnFamilyStore insertKey1Key2() throws IOException,
ExecutionException, InterruptedException
{