Author: gdusbabek
Date: Wed Dec 15 15:22:27 2010
New Revision: 1049588

URL: http://svn.apache.org/viewvc?rev=1049588&view=rev
Log:
track row deletions when merging cols to form a row. patch by gdusbabek and 
jbellis. CASSANDRA-1837

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
    
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1049588&r1=1049587&r2=1049588&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Dec 15 15:22:27 2010
@@ -11,6 +11,8 @@ dev
  * make ByteBufferUtil.clone thread-safe (CASSANDRA-1847)
  * change exception for read requests during bootstrap from 
    InvalidRequest to Unavailable (CASSANDRA-1862)
+ * deleted columns resurrected after a flush on slice read path 
+   (CASSANDRA-1837)
 
 
 0.7.0-rc2

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1049588&r1=1049587&r2=1049588&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowIteratorFactory.java
 Wed Dec 15 15:22:27 2010
@@ -18,6 +18,8 @@
 package org.apache.cassandra.db;
 
 import java.io.Closeable;
+import java.io.IOError;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -128,8 +130,7 @@ public class RowIteratorFactory
                 Comparator<IColumn> colComparator = 
filter.filter.getColumnComparator(comparator);
                 Iterator<IColumn> colCollated = 
IteratorUtils.collatedIterator(colComparator, colIters);
 
-                ColumnFamily returnCF = null;
-                
+                ColumnFamily returnCF;
                 // First check if this row is in the rowCache. If it is we can 
skip the rest
                 ColumnFamily cached = cfs.getRawCachedRow(key);
                 if (cached != null)
@@ -137,23 +138,34 @@ public class RowIteratorFactory
                     QueryFilter keyFilter = new QueryFilter(key, filter.path, 
filter.filter);
                     returnCF = cfs.filterColumnFamily(cached, keyFilter, 
gcBefore);
                 }
-                else
+                else if (colCollated.hasNext())
                 {
-                    returnCF = firstMemtable.getColumnFamily(key);            
+                    returnCF = firstMemtable.getColumnFamily(key);
                     // TODO this is a little subtle: the Memtable 
ColumnIterator has to be a shallow clone of the source CF,
                     // with deletion times set correctly, so we can use it as 
the "base" CF to add query results to.
                     // (for sstable ColumnIterators we do not care if it is a 
shallow clone or not.)
                     returnCF = returnCF == null ? 
ColumnFamily.create(firstMemtable.getTableName(), filter.getColumnFamilyName())
-                            : returnCF.cloneMeShallow();
-
-                    if (colCollated.hasNext())
-                    {
-                        filter.collectCollatedColumns(returnCF, colCollated, 
gcBefore);
-                    }
-                    else
+                                                : returnCF.cloneMeShallow();
+                    long lastDeletedAt = Long.MIN_VALUE;
+                    for (IColumnIterator columns : colIters)
                     {
-                        returnCF = null;
+                        columns.hasNext(); // force cf initializtion
+                        try
+                        {
+                            if (columns.getColumnFamily().isMarkedForDelete())
+                                lastDeletedAt = Math.max(lastDeletedAt, 
columns.getColumnFamily().getMarkedForDeleteAt());
+                        }
+                        catch (IOException e)
+                        {
+                            throw new IOError(e);
+                        }
                     }
+                    returnCF.markedForDeleteAt.set(lastDeletedAt);
+                    filter.collectCollatedColumns(returnCF, colCollated, 
gcBefore);
+                }
+                else
+                {
+                    returnCF = null;
                 }
 
                 Row rv = new Row(key, returnCF);

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java?rev=1049588&r1=1049587&r2=1049588&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
 Wed Dec 15 15:22:27 2010
@@ -31,8 +31,9 @@ import org.apache.cassandra.db.ColumnFam
 public interface IColumnIterator extends Iterator<IColumn>
 {
     /**
-     *  returns the CF of the column being iterated.  Do not modify the 
returned CF; clone first.
-     *  The CF is only guaranteed to be available after a call to next() or 
hasNext().
+     * returns the CF of the column being iterated.  Do not modify the 
returned CF; clone first.
+     * The CF is only guaranteed to be available after a call to next() or 
hasNext().
+     * Guaranteed to be non-null.
      * @throws IOException 
      */
     public abstract ColumnFamily getColumnFamily() throws IOException;

Modified: 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1049588&r1=1049587&r2=1049588&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 Wed Dec 15 15:22:27 2010
@@ -21,12 +21,17 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.junit.Test;
 
@@ -48,6 +53,8 @@ import org.apache.cassandra.utils.FBUtil
 import org.apache.cassandra.utils.WrappedRunnable;
 
 import static junit.framework.Assert.assertEquals;
+import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.getBytes;
 import static org.junit.Assert.assertNull;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -328,6 +335,170 @@ public class ColumnFamilyStoreTest exten
         String key = new 
String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
 
         assert "k1".equals( key );        
     }
+    
+    @Test
+    public void testDeleteSuperRowSticksAfterFlush() throws Throwable
+    {
+        String tableName = "Keyspace1";
+        String cfName= "Super1";
+        ByteBuffer scfName = ByteBufferUtil.bytes("SuperDuper");
+        Table table = Table.open(tableName);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+        DecoratedKey key = Util.dk("flush-resurrection");
+        
+        // create an isolated sstable.
+        putColsSuper(cfs, key, scfName, 
+                new Column(getBytes(1), ByteBufferUtil.bytes("val1"), 1),
+                new Column(getBytes(2), ByteBufferUtil.bytes("val2"), 1),
+                new Column(getBytes(3), ByteBufferUtil.bytes("val3"), 1));
+        cfs.forceBlockingFlush();
+        
+        // insert, don't flush.
+        putColsSuper(cfs, key, scfName, 
+                new Column(getBytes(4), ByteBufferUtil.bytes("val4"), 1),
+                new Column(getBytes(5), ByteBufferUtil.bytes("val5"), 1),
+                new Column(getBytes(6), ByteBufferUtil.bytes("val6"), 1));
+        
+        // verify insert.
+        final SlicePredicate sp = new SlicePredicate();
+        sp.setSlice_range(new SliceRange());
+        sp.getSlice_range().setCount(100);
+        sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
+        sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
+        
+        assertRowAndColCount(1, 6, scfName, false, cfs.getRangeSlice(scfName, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // deeleet.
+        RowMutation rm = new RowMutation(table.name, key.key);
+        rm.delete(new QueryPath(cfName, scfName), 2);
+        rm.apply();
+        
+        // verify delete.
+        assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(scfName, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // flush
+        cfs.forceBlockingFlush();
+        
+        // re-verify delete.
+        assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(scfName, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // late insert.
+        putColsSuper(cfs, key, scfName, 
+                new Column(getBytes(4), ByteBufferUtil.bytes("val4"), 1L),
+                new Column(getBytes(7), ByteBufferUtil.bytes("val7"), 1L));
+        
+        // re-verify delete.
+        assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(scfName, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // make sure new writes are recognized.
+        putColsSuper(cfs, key, scfName, 
+                new Column(getBytes(3), ByteBufferUtil.bytes("val3"), 3),
+                new Column(getBytes(8), ByteBufferUtil.bytes("val8"), 3),
+                new Column(getBytes(9), ByteBufferUtil.bytes("val9"), 3));
+        assertRowAndColCount(1, 3, scfName, false, cfs.getRangeSlice(scfName, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+    }
+    
+    private static void assertRowAndColCount(int rowCount, int colCount, 
ByteBuffer sc, boolean isDeleted, Collection<Row> rows)
+    {
+        assert rows.size() == rowCount : "rowcount " + rows.size();
+        for (Row row : rows)
+        {
+            assert row.cf != null : "cf was null";
+            if (sc != null)
+                assert row.cf.getColumn(sc).getSubColumns().size() == colCount 
: row.cf.getColumn(sc).getSubColumns().size();
+            else
+                assert row.cf.getColumnCount() == colCount : "colcount " + 
row.cf.getColumnCount() + "|" + str(row.cf);
+            if (isDeleted)
+                assert row.cf.isMarkedForDelete() : "cf not marked for delete";
+        }
+    }
+    
+    private static String str(ColumnFamily cf)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (IColumn col : cf.getSortedColumns())
+            sb.append(String.format("(%s,%s,%d),", new 
String(col.name().array()), new String(col.value().array()), col.timestamp()));
+        return sb.toString();
+    }
+    
+    private static void putColsSuper(ColumnFamilyStore cfs, DecoratedKey key, 
ByteBuffer scfName, Column... cols) throws Throwable
+    {
+        RowMutation rm = new RowMutation(cfs.table.name, key.key);
+        ColumnFamily cf = ColumnFamily.create(cfs.table.name, 
cfs.getColumnFamilyName());
+        SuperColumn sc = new SuperColumn(scfName, LongType.instance);
+        for (Column col : cols)
+            sc.addColumn(col);
+        cf.addColumn(sc);
+        rm.add(cf);
+        rm.apply();
+    }
+    
+    private static void putColsStandard(ColumnFamilyStore cfs, DecoratedKey 
key, Column... cols) throws Throwable
+    {
+        RowMutation rm = new RowMutation(cfs.table.name, key.key);
+        ColumnFamily cf = ColumnFamily.create(cfs.table.name, 
cfs.getColumnFamilyName());
+        for (Column col : cols)
+            cf.addColumn(col);
+        rm.add(cf);
+        rm.apply();
+    }
+    
+    @Test
+    public void testDeleteStandardRowSticksAfterFlush() throws Throwable
+    {
+        // test to make sure flushing after a delete doesn't resurrect delted 
cols.
+        String tableName = "Keyspace1";
+        String cfName = "Standard1";
+        Table table = Table.open(tableName);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+        DecoratedKey key = Util.dk("f-flush-resurrection");
+        
+        SlicePredicate sp = new SlicePredicate();
+        sp.setSlice_range(new SliceRange());
+        sp.getSlice_range().setCount(100);
+        sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
+        sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
+        
+        // insert
+        putColsStandard(cfs, key, column("col1", "val1", 1), column("col2", 
"val2", 1));
+        assertRowAndColCount(1, 2, null, false, cfs.getRangeSlice(null, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // flush.
+        cfs.forceBlockingFlush();
+        
+        // insert, don't flush
+        putColsStandard(cfs, key, column("col3", "val3", 1), column("col4", 
"val4", 1));
+        assertRowAndColCount(1, 4, null, false, cfs.getRangeSlice(null, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // delete (from sstable and memtable)
+        RowMutation rm = new RowMutation(table.name, key.key);
+        rm.delete(new QueryPath(cfs.columnFamily, null, null), 2);
+        rm.apply();
+        
+        // verify delete
+        assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(null, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // flush
+        cfs.forceBlockingFlush();
+        
+        // re-verify delete. // first breakage is right here because of 
CASSANDRA-1837.
+        assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(null, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // simulate a 'late' insertion that gets put in after the deletion. 
should get inserted, but fail on read.
+        putColsStandard(cfs, key, column("col5", "val5", 1), column("col2", 
"val2", 1));
+        
+        // should still be nothing there because we deleted this row. 2nd 
breakage, but was undetected because of 1837.
+        assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(null, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // make sure that new writes are recognized.
+        putColsStandard(cfs, key, column("col6", "val6", 3), column("col7", 
"val7", 3));
+        assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(null, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+        
+        // and it remains so after flush. (this wasn't failing before, but 
it's good to check.)
+        cfs.forceBlockingFlush();
+        assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(null, 
Util.range("f", "g"), 100, QueryFilter.getFilter(sp, cfs.getComparator())));
+    }
+        
 
     private ColumnFamilyStore insertKey1Key2() throws IOException, 
ExecutionException, InterruptedException
     {


Reply via email to