Repository: cassandra Updated Branches: refs/heads/trunk 01880a470 -> f85df3741
Backport CASSANDRA-8243 to 2.0 - Do more aggressive ttl expiration checks to be able to drop more sstables Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/13af5978 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/13af5978 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/13af5978 Branch: refs/heads/trunk Commit: 13af5978225e1ac511fc00a763a8919a2d638dfa Parents: cefaa4e Author: Marcus Eriksson <[email protected]> Authored: Thu Apr 30 12:51:44 2015 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Thu Apr 30 12:58:02 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../db/compaction/CompactionController.java | 19 ++++--- .../cassandra/db/compaction/TTLExpiryTest.java | 54 ++++++++++++++++++-- 3 files changed, 62 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/13af5978/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3df91ce..22a9515 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.0.15: + * Do more agressive ttl expiration checks to be able to + drop more sstables (CASSANDRA-8243) * IncomingTcpConnection thread is not named (CASSANDRA-9262) * Close incoming connections when MessagingService is stopped (CASSANDRA-9238) * Avoid overflow when calculating max sstable size in LCS (CASSANDRA-9235) http://git-wip-us.apache.org/repos/asf/cassandra/blob/13af5978/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index fba659d..7a4b7d9 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -78,12 +78,11 @@ public class CompactionController * Finds expired sstables * * works something like this; - * 1. find "global" minTimestamp of overlapping sstables (excluding the possibly droppable ones) - * 2. build a list of candidates to be dropped - * 3. sort the candidate list, biggest maxTimestamp first in list - * 4. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp) and it is included in the compaction - * - if not droppable, update global minTimestamp and remove from candidates - * 5. return candidates. + * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data + * 2. build a list of fully expired candidates + * 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp) + * - if not droppable, remove from candidates + * 4. return candidates. * * @param cfStore * @param compacting we take the drop-candidates from this set, it is usually the sstables included in the compaction @@ -113,10 +112,11 @@ public class CompactionController minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp()); } - // we still need to keep candidates that might shadow something in a - // non-candidate sstable. And if we remove a sstable from the candidates, we - // must take it's timestamp into account (hence the sorting below). Collections.sort(candidates, SSTable.maxTimestampComparator); + // At this point, minTimestamp denotes the lowest timestamp of any relevant + // SSTable that contains a constructive value. candidates contains all the + // candidates with no constructive values. The ones out of these that have + // (getMaxTimestamp() < minTimestamp) serve no purpose anymore. Iterator<SSTableReader> iterator = candidates.iterator(); while (iterator.hasNext()) @@ -124,7 +124,6 @@ public class CompactionController SSTableReader candidate = iterator.next(); if (candidate.getMaxTimestamp() >= minTimestamp) { - minTimestamp = Math.min(candidate.getMinTimestamp(), minTimestamp); iterator.remove(); } else http://git-wip-us.apache.org/repos/asf/cassandra/blob/13af5978/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java index 7666922..3fad0ec 100644 --- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java @@ -22,7 +22,11 @@ package org.apache.cassandra.db.compaction; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.ExecutionException; + +import com.google.common.collect.Sets; import org.junit.Test; import org.junit.runner.RunWith; @@ -34,10 +38,7 @@ import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowMutation; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.utils.ByteBufferUtil; @@ -149,4 +150,51 @@ public class TTLExpiryTest extends SchemaLoader assertEquals(noTTLKey, iter.getKey()); } } + + @Test + public void testAggressiveFullyExpired() + { + String KEYSPACE1 = "Keyspace1"; + ColumnFamilyStore cfs = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard1"); + cfs.disableAutoCompaction(); + cfs.metadata.gcGraceSeconds(0); + + DecoratedKey ttlKey = Util.dk("ttl"); + RowMutation rm = new RowMutation("Keyspace1", ttlKey.key); + rm.add("Standard1", ByteBufferUtil.bytes("col1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 1, 1); + rm.add("Standard1", ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 3, 1); + rm.applyUnsafe(); + cfs.forceBlockingFlush(); + + rm = new RowMutation(KEYSPACE1, ttlKey.key); + rm.add("Standard1", ByteBufferUtil.bytes("col1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2, 1); + rm.add("Standard1", ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 5, 1); + rm.applyUnsafe(); + cfs.forceBlockingFlush(); + + rm = new RowMutation(KEYSPACE1, ttlKey.key); + rm.add("Standard1", ByteBufferUtil.bytes("col1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 4, 1); + rm.add("Standard1", ByteBufferUtil.bytes("shadow"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 7, 1); + rm.applyUnsafe(); + cfs.forceBlockingFlush(); + + rm = new RowMutation(KEYSPACE1, ttlKey.key); + rm.add("Standard1", ByteBufferUtil.bytes("shadow"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 6, 3); + rm.add("Standard1", ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8, 1); + rm.applyUnsafe(); + cfs.forceBlockingFlush(); + + Set<SSTableReader> sstables = Sets.newHashSet(cfs.getSSTables()); + int now = (int)(System.currentTimeMillis() / 1000); + int gcBefore = now + 2; + Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables( + cfs, + sstables, + Collections.EMPTY_SET, + gcBefore); + assertEquals(2, expired.size()); + + cfs.clearUnsafe(); + } + }
