Repository: cassandra
Updated Branches:
  refs/heads/trunk 7f0a8542d -> b09e39295


More aggressive check for expired sstables in DTCS

Patch by Björn Hegerfors; reviewed by marcuse for CASSANDRA-8359


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d0c4e78
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d0c4e78
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d0c4e78

Branch: refs/heads/trunk
Commit: 3d0c4e78c6bb3f8767aa0720b7de579908e2bf59
Parents: 74bfa77
Author: Björn Hegerfors <bj...@spotify.com>
Authored: Wed Apr 1 12:26:00 2015 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Apr 1 12:32:09 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../DateTieredCompactionStrategy.java           | 31 ++++++++++----
 .../DateTieredCompactionStrategyTest.java       | 43 ++++++++++++++++++++
 3 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d0c4e78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a956eb6..1afe6fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.14:
+ * More aggressive check for expired sstables in DTCS (CASSANDRA-8359)
  * Don't set clientMode to true when bulk-loading sstables to avoid
    a NullPointerException (CASSANDRA-8238)
  * Fix ignored index_interval change in ALTER TABLE statements (CASSANDRA-7976)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d0c4e78/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 6b3e800..cfa9c8a 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -53,7 +53,7 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
 
         while (true)
         {
-            List<SSTableReader> latestBucket = 
getNextBackgroundSStables(gcBefore);
+            List<SSTableReader> latestBucket = 
getNextBackgroundSSTables(gcBefore);
 
             if (latestBucket.isEmpty())
                 return null;
@@ -68,24 +68,39 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
      * @param gcBefore
      * @return
      */
-    private List<SSTableReader> getNextBackgroundSStables(final int gcBefore)
+    private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
         if (!isEnabled() || cfs.getSSTables().isEmpty())
             return Collections.emptyList();
 
+        Set<SSTableReader> uncompacting = cfs.getUncompactingSSTables();
+
+        // Find fully expired SSTables. Those will be included no matter what.
+        Set<SSTableReader> expired = 
CompactionController.getFullyExpiredSSTables(cfs, uncompacting, 
cfs.getOverlappingSSTables(uncompacting), gcBefore);
+        Set<SSTableReader> candidates = 
Sets.newHashSet(filterSuspectSSTables(uncompacting));
+
+        List<SSTableReader> compactionCandidates = new 
ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), 
gcBefore));
+        if (!expired.isEmpty())
+        {
+            logger.debug("Including expired sstables: {}", expired);
+            compactionCandidates.addAll(expired);
+        }
+        return compactionCandidates;
+    }
+
+    private List<SSTableReader> 
getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final 
int gcBefore)
+    {
         int base = cfs.getMinimumCompactionThreshold();
         long now = getNow();
 
-        Iterable<SSTableReader> candidates = 
filterSuspectSSTables(cfs.getUncompactingSSTables());
-
-        List<SSTableReader> mostInteresting = 
getCompactionCandidates(candidates, now, base);
+        List<SSTableReader> mostInteresting = 
getCompactionCandidates(nonExpiringSSTables, now, base);
         if (mostInteresting != null)
             return mostInteresting;
 
         // if there is no sstable to compact in standard way, try compacting 
single sstable whose droppable tombstone
         // ratio is greater than threshold.
         List<SSTableReader> sstablesWithTombstones = Lists.newArrayList();
-        for (SSTableReader sstable : candidates)
+        for (SSTableReader sstable : nonExpiringSSTables)
         {
             if (worthDroppingTombstones(sstable, gcBefore))
                 sstablesWithTombstones.add(sstable);
@@ -106,8 +121,8 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
         List<SSTableReader> mostInteresting = newestBucket(buckets,
                                                            
cfs.getMinimumCompactionThreshold(),
                                                            
cfs.getMaximumCompactionThreshold(),
-                                                           options.baseTime,
-                                                           now);
+                                                           now,
+                                                           options.baseTime);
         if (!mostInteresting.isEmpty())
             return mostInteresting;
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d0c4e78/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
index f98e372..1fa41a3 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@ -273,4 +273,47 @@ public class DateTieredCompactionStrategyTest extends 
SchemaLoader
         filtered = filterOldSSTables(sstrs, 1, 4);
         assertEquals("no sstables should remain when all are too old", 0, 
Iterables.size(filtered));
     }
+
+
+    @Test
+    public void testDropExpiredSSTables() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 2 sstables
+        DecoratedKey key = Util.dk(String.valueOf("expired"));
+        RowMutation rm = new RowMutation(KEYSPACE1, key.key);
+        rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, 
System.currentTimeMillis(), 5);
+        rm.apply();
+        cfs.forceBlockingFlush();
+        SSTableReader expiredSSTable = cfs.getSSTables().iterator().next();
+        Thread.sleep(10);
+        key = Util.dk(String.valueOf("nonexpired"));
+        rm = new RowMutation(KEYSPACE1, key.key);
+        rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, 
System.currentTimeMillis());
+        rm.apply();
+        cfs.forceBlockingFlush();
+        assertEquals(cfs.getSSTables().size(), 2);
+
+        Map<String, String> options = new HashMap<>();
+
+        options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "30");
+        
options.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, 
"MILLISECONDS");
+        options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, 
Double.toString((1d / (24 * 60 * 60))));
+        DateTieredCompactionStrategy dtcs = new 
DateTieredCompactionStrategy(cfs, options);
+        dtcs.startup();
+        assertNull(dtcs.getNextBackgroundTask((int) 
(System.currentTimeMillis() / 1000)));
+        Thread.sleep(7000);
+        AbstractCompactionTask t = dtcs.getNextBackgroundTask((int) 
(System.currentTimeMillis()/1000));
+        assertNotNull(t);
+        assertEquals(1, Iterables.size(t.sstables));
+        SSTableReader sstable = t.sstables.iterator().next();
+        assertEquals(sstable, expiredSSTable);
+    }
+
 }

Reply via email to