avoid duplicate index entries ind PrecompactedRow and ParallelCompactionIterable
patch by jbellis; reviewed by Sam Tunnicliffe for CASSANDRA-5395


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a143966e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a143966e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a143966e

Branch: refs/heads/trunk
Commit: a143966e1cfe3f782f6237d67d216ef8bc2d4713
Parents: 0ab3d60
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Wed Mar 27 16:17:53 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Fri Mar 29 12:04:08 2013 -0500

----------------------------------------------------------------------
 .../db/compaction/LazilyCompactedRow.java          |    6 +-
 .../db/compaction/ParallelCompactionIterable.java  |   25 +++---
 .../cassandra/db/compaction/PrecompactedRow.java   |   68 +++++++++++----
 .../db/compaction/CompactionsPurgeTest.java        |    5 +-
 4 files changed, 70 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a143966e/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 76d5100..8d59898 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -254,8 +254,12 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow implements Iterable
             {
                 IColumn column = (IColumn) current;
                 container.addColumn(column);
-                if (container.getColumn(column.name()) != column)
+                if (indexer != SecondaryIndexManager.nullUpdater
+                    && !column.isMarkedForDelete()
+                    && container.getColumn(column.name()) != column)
+                {
                     indexer.remove(column);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a143966e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 7e1983c..24b1d00 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -184,6 +184,9 @@ public class ParallelCompactionIterable extends 
AbstractCompactionIterable
             executor.shutdown();
         }
 
+        /**
+         * Merges a set of in-memory rows
+         */
         private class MergeTask implements Callable<ColumnFamily>
         {
             private final List<Row> rows;
@@ -195,23 +198,17 @@ public class ParallelCompactionIterable extends 
AbstractCompactionIterable
 
             public ColumnFamily call() throws Exception
             {
-                ColumnFamily cf = null;
+                final ColumnFamily returnCF = 
ColumnFamily.create(controller.cfs.metadata, 
ArrayBackedSortedColumns.factory());
+
+                List<CloseableIterator<IColumn>> data = new 
ArrayList<CloseableIterator<IColumn>>(rows.size());
                 for (Row row : rows)
                 {
-                    ColumnFamily thisCF = row.cf;
-                    if (cf == null)
-                    {
-                        cf = thisCF;
-                    }
-                    else
-                    {
-                        // addAll is ok even if cf is an 
ArrayBackedSortedColumns
-                        SecondaryIndexManager.Updater indexer = 
controller.cfs.indexManager.updaterFor(row.key, false);
-                        cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, 
Functions.<IColumn>identity(), indexer);
-                    }
+                    returnCF.delete(row.cf);
+                    data.add(FBUtilities.closeableIterator(row.cf.iterator()));
                 }
 
-                return 
PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, cf);
+                PrecompactedRow.merge(returnCF, data, 
controller.cfs.indexManager.updaterFor(rows.get(0).key, false));
+                return 
PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, 
returnCF);
             }
         }
 
@@ -300,7 +297,7 @@ public class ParallelCompactionIterable extends 
AbstractCompactionIterable
                         else
                         {
                             logger.debug("parallel eager deserialize from " + 
iter.getPath());
-                            queue.put(new RowContainer(new Row(iter.getKey(), 
iter.getColumnFamilyWithColumns(TreeMapBackedSortedColumns.factory()))));
+                            queue.put(new RowContainer(new Row(iter.getKey(), 
iter.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory()))));
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a143966e/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index be4b20e..0de9f42 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -20,16 +20,21 @@ package org.apache.cassandra.db.compaction;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 
-import com.google.common.base.Functions;
-
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.utils.HeapAllocator;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MergeIterator;
 
 /**
  * PrecompactedRow merges its rows in its constructor in memory.
@@ -97,34 +102,61 @@ public class PrecompactedRow extends AbstractCompactedRow
     private static ColumnFamily merge(List<SSTableIdentityIterator> rows, 
CompactionController controller)
     {
         assert !rows.isEmpty();
-        ColumnFamily cf = null;
-        SecondaryIndexManager.Updater indexer = null;
+
+        final ColumnFamily returnCF = 
ColumnFamily.create(controller.cfs.metadata, 
ArrayBackedSortedColumns.factory());
+
+        // transform into iterators that MergeIterator will like, and apply 
row-level tombstones
+        List<CloseableIterator<IColumn>> data = new 
ArrayList<CloseableIterator<IColumn>>(rows.size());
         for (SSTableIdentityIterator row : rows)
         {
-            ColumnFamily thisCF;
             try
             {
-                // use a map for the first once since that will be the one we 
merge into
-                ISortedColumns.Factory factory = cf == null ? 
TreeMapBackedSortedColumns.factory() : ArrayBackedSortedColumns.factory();
-                thisCF = row.getColumnFamilyWithColumns(factory);
+                ColumnFamily cf = 
row.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory());
+                returnCF.delete(cf);
+                data.add(FBUtilities.closeableIterator(cf.iterator()));
             }
             catch (IOException e)
             {
-                throw new RuntimeException("Failed merge of rows on row with 
key: " + row.getKey(), e);
+                throw new RuntimeException(e);
             }
+        }
+
+        merge(returnCF, data, 
controller.cfs.indexManager.updaterFor(rows.get(0).getKey(), false));
+
+        return returnCF;
+    }
 
-            if (cf == null)
+    // returnCF should already have row-level tombstones applied
+    public static void merge(final ColumnFamily returnCF, 
List<CloseableIterator<IColumn>> data, final SecondaryIndexManager.Updater 
indexer)
+    {
+        IDiskAtomFilter filter = new IdentityQueryFilter();
+        Comparator<IColumn> fcomp = 
filter.getColumnComparator(returnCF.getComparator());
+
+        MergeIterator.Reducer<IColumn, IColumn> reducer = new 
MergeIterator.Reducer<IColumn, IColumn>()
+        {
+            ColumnFamily container = returnCF.cloneMeShallow();
+
+            public void reduce(IColumn column)
             {
-                cf = thisCF;
-                indexer = controller.cfs.indexManager.updaterFor(row.getKey(), 
false); // only init indexer once
+                container.addColumn(column);
+                if (indexer != SecondaryIndexManager.nullUpdater
+                    && !column.isMarkedForDelete()
+                    && container.getColumn(column.name()) != column)
+                {
+                    indexer.remove(column);
+                }
             }
-            else
+
+            protected IColumn getReduced()
             {
-                // addAll is ok even if cf is an ArrayBackedSortedColumns
-                cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, 
Functions.<IColumn>identity(), indexer);
+                IColumn c = container.iterator().next();
+                container.clear();
+                return c;
             }
-        }
-        return cf;
+        };
+
+        Iterator<IColumn> reduced = MergeIterator.get(data, fcomp, reducer);
+        filter.collectReducedColumns(returnCF, reduced, 
CompactionManager.NO_GC);
     }
 
     public long write(DataOutput out) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a143966e/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index ea9763a..9629017 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -153,6 +153,7 @@ public class CompactionsPurgeTest extends SchemaLoader
     @Test
     public void testMinTimestampPurge() throws IOException, 
ExecutionException, InterruptedException
     {
+        // verify that we don't drop tombstones during a minor compaction that 
might still be relevant
         CompactionManager.instance.disableAutoCompaction();
         Table table = Table.open(TABLE2);
         String cfName = "Standard1";
@@ -180,8 +181,10 @@ public class CompactionsPurgeTest extends SchemaLoader
         cfs.forceBlockingFlush();
         cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, 
Integer.MAX_VALUE).execute(null);
 
+        // we should have both the c1 and c2 tombstones still, since the c2 
timestamp is older than the c1 tombstone
+        // so it would be invalid to assume we can throw out the c1 entry.
         ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, new QueryPath(cfName)));
-        Assert.assertTrue(!cf.getColumn(ByteBufferUtil.bytes("c2")).isLive());
+        Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive());
         Assert.assertEquals(2, cf.getColumnCount());
     }
 

Reply via email to