Author: jbellis
Date: Thu Oct 22 15:41:11 2009
New Revision: 828755

URL: http://svn.apache.org/viewvc?rev=828755&view=rev
Log:
all rows go through deserialize/removeDeleted so we can GC tombstones.
patch by jbellis; reviewed by junrao for CASSANDRA-507

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=828755&r1=828754&r2=828755&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Thu Oct 22 15:41:11 2009
@@ -667,8 +667,7 @@
             sstables = ssTables_.getSSTables();
         }
 
-        if (sstables.size() > 1)
-            doFileCompaction(sstables);
+        doFileCompaction(sstables);
     }
 
     /*
@@ -838,6 +837,8 @@
     * that occur in multiple files and are the same then a resolution is done
     * to get the latest data.
     *
+    * The collection of sstables passed may be empty (but not null); even if
+    * it is not empty, it may compact down to nothing if all rows are deleted.
     */
     int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore) 
throws IOException
     {
@@ -869,9 +870,12 @@
 
         try
         {
-            if (!ci.hasNext())
+            if (!nni.hasNext())
             {
-                logger_.warn("Nothing to compact (all files empty or corrupt). 
This should not happen.");
+                // don't mark compacted in the finally block, since if there 
_is_ nondeleted data,
+                // we need to sync it (via closeAndOpen) first, so there is no 
period during which
+                // a crash could cause data loss.
+                ssTables_.markCompacted(sstables);
                 return 0;
             }
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=828755&r1=828754&r2=828755&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java 
Thu Oct 22 15:41:11 2009
@@ -203,7 +203,6 @@
     public SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) 
throws IOException
     {
         logger_.info("Writing " + this);
-        IPartitioner<?> partitioner = StorageService.getPartitioner();
         ColumnFamilyStore cfStore = 
Table.open(table_).getColumnFamilyStore(cfName_);
         SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), 
columnFamilies_.size(), StorageService.getPartitioner());
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=828755&r1=828754&r2=828755&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
 Thu Oct 22 15:41:11 2009
@@ -70,31 +70,26 @@
     {
         DataOutputBuffer buffer = new DataOutputBuffer();
         DecoratedKey key = rows.get(0).getKey();
-        if (rows.size() > 1)
+        assert rows.size() > 0;
+
+        ColumnFamily cf = null;
+        for (IteratingRow row : rows)
         {
-            ColumnFamily cf = null;
-            for (IteratingRow row : rows)
+            if (cf == null)
             {
-                if (cf == null)
-                {
-                    cf = row.getColumnFamily();
-                }
-                else
-                {
-                    cf.addAll(row.getColumnFamily());
-                }
+                cf = row.getColumnFamily();
+            }
+            else
+            {
+                cf.addAll(row.getColumnFamily());
             }
-            ColumnFamily cfPurged = ColumnFamilyStore.removeDeleted(cf, 
gcBefore);
-            if (cfPurged == null)
-                return null;
-            ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
-        }
-        else
-        {
-            assert rows.size() == 1;
-            rows.get(0).echoData(buffer);
         }
         rows.clear();
+
+        ColumnFamily cfPurged = ColumnFamilyStore.removeDeleted(cf, gcBefore);
+        if (cfPurged == null)
+            return null;
+        ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
         return new CompactedRow(key, buffer);
     }
 

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=828755&r1=828754&r2=828755&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
 Thu Oct 22 15:41:11 2009
@@ -70,19 +70,20 @@
     @Test
     public void testCompactionPurge() throws IOException, ExecutionException, 
InterruptedException
     {
+        CompactionManager.instance().disableCompactions();
+
         Table table = Table.open("Keyspace1");
-        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+        String cfName = "Standard1";
+        ColumnFamilyStore store = table.getColumnFamilyStore(cfName);
 
         String key = "key1";
         RowMutation rm;
 
-        CompactionManager.instance().disableCompactions();
-
         // inserts
         rm = new RowMutation("Keyspace1", key);
         for (int i = 0; i < 10; i++)
         {
-            rm.add(new QueryPath("Standard1", null, 
String.valueOf(i).getBytes()), new byte[0], 0);
+            rm.add(new QueryPath(cfName, null, String.valueOf(i).getBytes()), 
new byte[0], 0);
         }
         rm.apply();
         store.forceBlockingFlush();
@@ -91,21 +92,59 @@
         for (int i = 0; i < 10; i++)
         {
             rm = new RowMutation("Keyspace1", key);
-            rm.delete(new QueryPath("Standard1", null, 
String.valueOf(i).getBytes()), 1);
+            rm.delete(new QueryPath(cfName, null, 
String.valueOf(i).getBytes()), 1);
             rm.apply();
         }
         store.forceBlockingFlush();
 
         // resurrect one row
         rm = new RowMutation("Keyspace1", key);
-        rm.add(new QueryPath("Standard1", null, String.valueOf(5).getBytes()), 
new byte[0], 2);
+        rm.add(new QueryPath(cfName, null, String.valueOf(5).getBytes()), new 
byte[0], 2);
         rm.apply();
         store.forceBlockingFlush();
 
         // compact and test that all columns but the resurrected one is 
completely gone
         store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
-        ColumnFamily cf = 
table.getColumnFamilyStore("Standard1").getColumnFamily(new 
IdentityQueryFilter(key, new QueryPath("Standard1")));
+        ColumnFamily cf = 
table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, 
new QueryPath(cfName)));
         assert cf.getColumnCount() == 1;
         assert cf.getColumn(String.valueOf(5).getBytes()) != null;
     }
+
+    @Test
+    public void testCompactionPurgeOneFile() throws IOException, 
ExecutionException, InterruptedException
+    {
+        CompactionManager.instance().disableCompactions();
+
+        Table table = Table.open("Keyspace1");
+        String cfName = "Standard2";
+        ColumnFamilyStore store = table.getColumnFamilyStore(cfName);
+
+        String key = "key1";
+        RowMutation rm;
+
+        // inserts
+        rm = new RowMutation("Keyspace1", key);
+        for (int i = 0; i < 5; i++)
+        {
+            rm.add(new QueryPath(cfName, null, String.valueOf(i).getBytes()), 
new byte[0], 0);
+        }
+        rm.apply();
+
+        // deletes
+        for (int i = 0; i < 5; i++)
+        {
+            rm = new RowMutation("Keyspace1", key);
+            rm.delete(new QueryPath(cfName, null, 
String.valueOf(i).getBytes()), 1);
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+
+        assert store.getSSTables().size() == 1 : store.getSSTables(); // 
inserts & deletes were in the same memtable -> only deletes in sstable
+
+        // compact and test that the row is completely gone
+        store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+        assert store.getSSTables().isEmpty();
+        ColumnFamily cf = 
table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, 
new QueryPath(cfName)));
+        assert cf == null : cf;
+    }
 }


Reply via email to