avoid duplicate index entries ind PrecompactedRow and ParallelCompactionIterable patch by jbellis; reviewed by Sam Tunnicliffe for CASSANDRA-5395
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/91907513 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/91907513 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/91907513 Branch: refs/heads/trunk Commit: 91907513cba4493f37f8aa86a7c14578d8bb065e Parents: 8ba462a Author: Jonathan Ellis <[email protected]> Authored: Wed Mar 27 16:17:53 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Fri Mar 29 12:07:08 2013 -0500 ---------------------------------------------------------------------- .../db/compaction/LazilyCompactedRow.java | 6 +- .../db/compaction/ParallelCompactionIterable.java | 27 +++---- .../cassandra/db/compaction/PrecompactedRow.java | 68 +++++++++++---- .../db/compaction/CompactionsPurgeTest.java | 5 +- 4 files changed, 70 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/91907513/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 244c897..8f735d7 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -256,8 +256,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable { Column column = (Column) current; container.addColumn(column); - if (container.getColumn(column.name()) != column) + if (indexer != SecondaryIndexManager.nullUpdater + && !column.isMarkedForDelete() + && container.getColumn(column.name()) != column) + { indexer.remove(column); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/91907513/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index 58227f6..f2faebd 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.concurrent.*; import java.util.concurrent.locks.Condition; -import com.google.common.base.Functions; import com.google.common.collect.AbstractIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +34,6 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.ICountableColumnIterator; -import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.utils.*; @@ -184,6 +182,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable executor.shutdown(); } + /** + * Merges a set of in-memory rows + */ private class MergeTask implements Callable<ColumnFamily> { private final List<Row> rows; @@ -195,23 +196,17 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable public ColumnFamily call() throws Exception { - ColumnFamily cf = null; + final ColumnFamily returnCF = ColumnFamily.create(controller.cfs.metadata, ArrayBackedSortedColumns.factory()); + + List<CloseableIterator<Column>> data = new ArrayList<CloseableIterator<Column>>(rows.size()); for (Row row : rows) { - ColumnFamily thisCF = row.cf; - if (cf == null) - { - cf = thisCF; - } - else - { - // addAll is ok even if cf is an ArrayBackedSortedColumns - SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.updaterFor(row.key, false); - cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<Column>identity(), indexer); - } + returnCF.delete(row.cf); + data.add(FBUtilities.closeableIterator(row.cf.iterator())); } - return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, cf); + PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key, false)); + return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF); } } @@ -300,7 +295,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable else { logger.debug("parallel eager deserialize from " + iter.getPath()); - queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(TreeMapBackedSortedColumns.factory())))); + queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory())))); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/91907513/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java index f90b8c6..94509d1 100644 --- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java @@ -20,16 +20,21 @@ package org.apache.cassandra.db.compaction; import java.io.DataOutput; import java.io.IOException; import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; -import com.google.common.base.Functions; - import org.apache.cassandra.db.*; +import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.sstable.ColumnStats; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.utils.HeapAllocator; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MergeIterator; /** * PrecompactedRow merges its rows in its constructor in memory. @@ -97,34 +102,61 @@ public class PrecompactedRow extends AbstractCompactedRow private static ColumnFamily merge(List<SSTableIdentityIterator> rows, CompactionController controller) { assert !rows.isEmpty(); - ColumnFamily cf = null; - SecondaryIndexManager.Updater indexer = null; + + final ColumnFamily returnCF = ColumnFamily.create(controller.cfs.metadata, ArrayBackedSortedColumns.factory()); + + // transform into iterators that MergeIterator will like, and apply row-level tombstones + List<CloseableIterator<Column>> data = new ArrayList<CloseableIterator<Column>>(rows.size()); for (SSTableIdentityIterator row : rows) { - ColumnFamily thisCF; try { - // use a map for the first once since that will be the one we merge into - ISortedColumns.Factory factory = cf == null ? TreeMapBackedSortedColumns.factory() : ArrayBackedSortedColumns.factory(); - thisCF = row.getColumnFamilyWithColumns(factory); + ColumnFamily cf = row.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory()); + returnCF.delete(cf); + data.add(FBUtilities.closeableIterator(cf.iterator())); } catch (IOException e) { - throw new RuntimeException("Failed merge of rows on row with key: " + row.getKey(), e); + throw new RuntimeException(e); } + } + + merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey(), false)); + + return returnCF; + } - if (cf == null) + // returnCF should already have row-level tombstones applied + public static void merge(final ColumnFamily returnCF, List<CloseableIterator<Column>> data, final SecondaryIndexManager.Updater indexer) + { + IDiskAtomFilter filter = new IdentityQueryFilter(); + Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator()); + + MergeIterator.Reducer<Column, Column> reducer = new MergeIterator.Reducer<Column, Column>() + { + ColumnFamily container = returnCF.cloneMeShallow(); + + public void reduce(Column column) { - cf = thisCF; - indexer = controller.cfs.indexManager.updaterFor(row.getKey(), false); // only init indexer once + container.addColumn(column); + if (indexer != SecondaryIndexManager.nullUpdater + && !column.isMarkedForDelete() + && container.getColumn(column.name()) != column) + { + indexer.remove(column); + } } - else + + protected Column getReduced() { - // addAll is ok even if cf is an ArrayBackedSortedColumns - cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<Column>identity(), indexer); + Column c = container.iterator().next(); + container.clear(); + return c; } - } - return cf; + }; + + Iterator<Column> reduced = MergeIterator.get(data, fcomp, reducer); + filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC); } public long write(DataOutput out) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/91907513/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index 9344ec3..3329300 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -151,6 +151,7 @@ public class CompactionsPurgeTest extends SchemaLoader @Test public void testMinTimestampPurge() throws IOException, ExecutionException, InterruptedException { + // verify that we don't drop tombstones during a minor compaction that might still be relevant CompactionManager.instance.disableAutoCompaction(); Table table = Table.open(TABLE2); String cfName = "Standard1"; @@ -178,8 +179,10 @@ public class CompactionsPurgeTest extends SchemaLoader cfs.forceBlockingFlush(); cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); + // we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone + // so it would be invalid to assume we can throw out the c1 entry. ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName)); - Assert.assertTrue(!cf.getColumn(ByteBufferUtil.bytes("c2")).isLive()); + Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive()); Assert.assertEquals(2, cf.getColumnCount()); }
