Fix potential assertion error during compaction patch by slebresne; reviewed by krummas for CASSANDRA-10944
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d7bacc4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d7bacc4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d7bacc4 Branch: refs/heads/cassandra-3.3 Commit: 1d7bacc45fa1cd6cac36d7f9ece30ba1ed430f2a Parents: 5ad8114 Author: Sylvain Lebresne <[email protected]> Authored: Mon Dec 28 14:08:10 2015 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Tue Jan 5 15:15:28 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ReadCommand.java | 2 +- .../db/compaction/CompactionIterator.java | 6 +-- .../cassandra/db/partitions/PurgeFunction.java | 12 ++--- .../apache/cassandra/db/rows/BufferCell.java | 2 +- .../cassandra/db/compaction/TTLExpiryTest.java | 47 ++++++++++++++------ 6 files changed, 46 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d7bacc4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index beb59b0..23edbbf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.3 + * Fix potential assertion error during compaction (CASSANDRA-10944) * Fix counting of received sstables in streaming (CASSANDRA-10949) * Implement hints compression (CASSANDRA-9428) * Fix potential assertion error when reading static columns (CASSANDRA-10903) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d7bacc4/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 5ab1ee5..3f0695c 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -488,7 +488,7 @@ public abstract class ReadCommand implements ReadQuery { public WithoutPurgeableTombstones() { - super(isForThrift, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); + super(isForThrift, nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); } protected long getMaxPurgeableTimestamp() http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d7bacc4/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 8a3b24b..d39da2a 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -103,7 +103,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false) : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()); boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug - this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller)); + this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec)); } public boolean isForThrift() @@ -264,9 +264,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte private long compactedUnfiltered; - private Purger(boolean isForThrift, CompactionController controller) + private Purger(boolean isForThrift, CompactionController controller, int nowInSec) { - super(isForThrift, controller.gcBefore, controller.compactingRepaired() ? Integer.MIN_VALUE : Integer.MAX_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); + super(isForThrift, nowInSec, controller.gcBefore, controller.compactingRepaired() ? Integer.MIN_VALUE : Integer.MAX_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); this.controller = controller; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d7bacc4/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java index b7b01d6..492bab1 100644 --- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java +++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java @@ -25,13 +25,13 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator { private final boolean isForThrift; private final DeletionPurger purger; - private final int gcBefore; + private final int nowInSec; private boolean isReverseOrder; - public PurgeFunction(boolean isForThrift, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones) + public PurgeFunction(boolean isForThrift, int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones) { this.isForThrift = isForThrift; - this.gcBefore = gcBefore; + this.nowInSec = nowInSec; this.purger = (timestamp, localDeletionTime) -> !(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone) && localDeletionTime < gcBefore @@ -79,13 +79,13 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator public Row applyToStatic(Row row) { updateProgress(); - return row.purge(purger, gcBefore); + return row.purge(purger, nowInSec); } public Row applyToRow(Row row) { updateProgress(); - return row.purge(purger, gcBefore); + return row.purge(purger, nowInSec); } public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) @@ -117,4 +117,4 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d7bacc4/src/java/org/apache/cassandra/db/rows/BufferCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java index 4176ba6..8912f59 100644 --- a/src/java/org/apache/cassandra/db/rows/BufferCell.java +++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java @@ -172,7 +172,7 @@ public class BufferCell extends AbstractCell // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds, // we'll fulfil our responsibility to repair. See discussion at // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html - return BufferCell.tombstone(column, timestamp, localDeletionTime - ttl); + return BufferCell.tombstone(column, timestamp, localDeletionTime - ttl, path); } } return this; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d7bacc4/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java index 59bb697..7dd3da0 100644 --- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java @@ -18,32 +18,33 @@ */ package org.apache.cassandra.db.compaction; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.db.marshal.AsciiType; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.junit.BeforeClass; +import java.io.IOException; +import java.util.Collections; +import java.util.Set; + import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.BTreeRow; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.tools.SSTableExpiredBlockers; import org.apache.cassandra.utils.ByteBufferUtil; -import java.io.IOException; -import java.util.Collections; -import java.util.Set; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -67,6 +68,7 @@ public class TTLExpiryTest .addRegularColumn("col2", AsciiType.instance) .addRegularColumn("col3", AsciiType.instance) .addRegularColumn("col7", AsciiType.instance) + .addRegularColumn("col8", MapType.getInstance(AsciiType.instance, AsciiType.instance, true)) .addRegularColumn("shadow", AsciiType.instance) .build().gcGraceSeconds(0)); } @@ -141,9 +143,23 @@ public class TTLExpiryTest @Test public void testSimpleExpire() throws InterruptedException { + testSimpleExpire(false); + } + + @Test + public void testBug10944() throws InterruptedException + { + // Reproduction for CASSANDRA-10944 (at the time of the bug) + testSimpleExpire(true); + } + + public void testSimpleExpire(boolean force10944Bug) throws InterruptedException + { ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1"); + cfs.truncateBlocking(); cfs.disableAutoCompaction(); - cfs.metadata.gcGraceSeconds(0); + // To reproduce #10944, we need our gcBefore to be equal to the locaDeletionTime. A gcGrace of 1 will (almost always) give us that. + cfs.metadata.gcGraceSeconds(force10944Bug ? 1 : 0); long timestamp = System.currentTimeMillis(); String key = "ttl"; new RowUpdateBuilder(cfs.metadata, timestamp, 1, key) @@ -156,12 +172,16 @@ public class TTLExpiryTest new RowUpdateBuilder(cfs.metadata, timestamp, 1, key) .add("col2", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .addMapEntry("col8", "bar", "foo") + .delete("col1") .build() .applyUnsafe(); cfs.forceBlockingFlush(); - new RowUpdateBuilder(cfs.metadata, timestamp, 1, key) + // To reproduce #10944, we need to avoid the optimization that get rid of full sstable because everything + // is known to be gcAble, so keep some data non-expiring in that case. + new RowUpdateBuilder(cfs.metadata, timestamp, force10944Bug ? 0 : 1, key) .add("col3", ByteBufferUtil.EMPTY_BYTE_BUFFER) .build() .applyUnsafe(); @@ -178,13 +198,14 @@ public class TTLExpiryTest Thread.sleep(2000); // wait for ttl to expire assertEquals(4, cfs.getLiveSSTables().size()); cfs.enableAutoCompaction(true); - assertEquals(0, cfs.getLiveSSTables().size()); + assertEquals(force10944Bug ? 1 : 0, cfs.getLiveSSTables().size()); } @Test public void testNoExpire() throws InterruptedException, IOException { ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1"); + cfs.truncateBlocking(); cfs.disableAutoCompaction(); cfs.metadata.gcGraceSeconds(0); long timestamp = System.currentTimeMillis();
