Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 40cef770d -> 5a356a706


Do size tiered compaction in date tiered compaction windows

Patch by marcuse; reviewed by Jeff Jirsa for CASSANDRA-10276


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cedcf07c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cedcf07c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cedcf07c

Branch: refs/heads/cassandra-2.2
Commit: cedcf07c542235815c023b66f151ad8c7aa9ba9a
Parents: 78810f2
Author: Marcus Eriksson <marc...@apache.org>
Authored: Mon Sep 7 10:39:15 2015 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Oct 28 08:40:12 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../DateTieredCompactionStrategy.java           | 46 ++++++++++------
 .../DateTieredCompactionStrategyTest.java       | 57 ++++++++++++++------
 3 files changed, 72 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cedcf07c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ca3b43..5b46eac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Do STCS in DTCS windows (CASSANDRA-10276)
  * Don't try to get ancestors from half-renamed sstables (CASSANDRA-10501)
  * Avoid repetition of JVM_OPTS in debian package (CASSANDRA-10251)
  * Fix potential NPE from handling result of SIM.highestSelectivityIndex 
(CASSANDRA-10550)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cedcf07c/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index a8e2aff..ece596f 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.*;
-import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
@@ -40,6 +39,7 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
     protected volatile int estimatedRemainingTasks;
     private final Set<SSTableReader> sstables = new HashSet<>();
     private long lastExpiredCheck;
+    private final SizeTieredCompactionStrategyOptions stcsOptions;
 
     public DateTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options)
     {
@@ -54,6 +54,7 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
         else
             logger.debug("Enabling tombstone compactions for DTCS");
 
+        this.stcsOptions = new SizeTieredCompactionStrategyOptions(options);
     }
 
     @Override
@@ -137,7 +138,8 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
                                                            
cfs.getMinimumCompactionThreshold(),
                                                            
cfs.getMaximumCompactionThreshold(),
                                                            now,
-                                                           options.baseTime);
+                                                           options.baseTime,
+                                                           stcsOptions);
         if (!mostInteresting.isEmpty())
             return mostInteresting;
         return null;
@@ -328,7 +330,7 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
         for (List<SSTableReader> bucket : tasks)
         {
             if (bucket.size() >= cfs.getMinimumCompactionThreshold())
-                n += Math.ceil((double)bucket.size() / 
cfs.getMaximumCompactionThreshold());
+                n += getSTCSBuckets(bucket, stcsOptions).size();
         }
         estimatedRemainingTasks = n;
     }
@@ -341,7 +343,7 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
      * @return a bucket (list) of sstables to compact.
      */
     @VisibleForTesting
-    static List<SSTableReader> newestBucket(List<List<SSTableReader>> buckets, 
int minThreshold, int maxThreshold, long now, long baseTime)
+    static List<SSTableReader> newestBucket(List<List<SSTableReader>> buckets, 
int minThreshold, int maxThreshold, long now, long baseTime, 
SizeTieredCompactionStrategyOptions stcsOptions)
     {
         // If the "incoming window" has at least minThreshold SSTables, choose 
that one.
         // For any other bucket, at least 2 SSTables is enough.
@@ -349,23 +351,31 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
         Target incomingWindow = getInitialTarget(now, baseTime);
         for (List<SSTableReader> bucket : buckets)
         {
-            if (bucket.size() >= minThreshold ||
-                    (bucket.size() >= 2 && 
!incomingWindow.onTarget(bucket.get(0).getMinTimestamp())))
-                return trimToThreshold(bucket, maxThreshold);
+            boolean inFirstWindow = 
incomingWindow.onTarget(bucket.get(0).getMinTimestamp());
+            if (bucket.size() >= minThreshold || (bucket.size() >= 2 && 
!inFirstWindow))
+            {
+                List<SSTableReader> stcsSSTables = getSSTablesForSTCS(bucket, 
inFirstWindow ? minThreshold : 2, maxThreshold, stcsOptions);
+                if (!stcsSSTables.isEmpty())
+                    return stcsSSTables;
+            }
         }
         return Collections.emptyList();
     }
 
-    /**
-     * @param bucket list of sstables, ordered from newest to oldest by 
getMinTimestamp().
-     * @param maxThreshold maximum number of sstables in a single compaction 
task.
-     * @return A bucket trimmed to the <code>maxThreshold</code> newest 
sstables.
-     */
-    @VisibleForTesting
-    static List<SSTableReader> trimToThreshold(List<SSTableReader> bucket, int 
maxThreshold)
+    private static List<SSTableReader> 
getSSTablesForSTCS(Collection<SSTableReader> sstables, int minThreshold, int 
maxThreshold, SizeTieredCompactionStrategyOptions stcsOptions)
     {
-        // Trim the oldest sstables off the end to meet the maxThreshold
-        return bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+        List<SSTableReader> s = 
SizeTieredCompactionStrategy.mostInterestingBucket(getSTCSBuckets(sstables, 
stcsOptions), minThreshold, maxThreshold);
+        logger.debug("Got sstables {} for STCS from {}", s, sstables);
+        return s;
+    }
+
+    private static List<List<SSTableReader>> 
getSTCSBuckets(Collection<SSTableReader> sstables, 
SizeTieredCompactionStrategyOptions stcsOptions)
+    {
+        List<Pair<SSTableReader,Long>> pairs = 
SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(sstables));
+        return SizeTieredCompactionStrategy.getBuckets(pairs,
+                                                       stcsOptions.bucketHigh,
+                                                       stcsOptions.bucketLow,
+                                                       
stcsOptions.minSSTableSize);
     }
 
     @Override
@@ -375,7 +385,7 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
         if (sstables == null)
             return null;
 
-        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, 
sstables, gcBefore, false));
+        return Collections.<AbstractCompactionTask>singleton(new 
CompactionTask(cfs, sstables, gcBefore, false));
     }
 
     @Override
@@ -411,6 +421,8 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
         uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD);
         uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD);
 
+        uncheckedOptions = 
SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
         return uncheckedOptions;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cedcf07c/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
index f05bf44..368101d 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.utils.Pair;
 
 import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.getBuckets;
 import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.newestBucket;
-import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.trimToThreshold;
 import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.filterOldSSTables;
 import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.validateOptions;
 
@@ -213,27 +212,15 @@ public class DateTieredCompactionStrategyTest extends 
SchemaLoader
 
         List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
 
-        List<SSTableReader> newBucket = 
newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 9, 10);
+        List<SSTableReader> newBucket = 
newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 9, 10, new 
SizeTieredCompactionStrategyOptions());
         assertTrue("incoming bucket should not be accepted when it has below 
the min threshold SSTables", newBucket.isEmpty());
 
-        newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 
2)), 4, 32, 10, 10);
+        newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 
2)), 4, 32, 10, 10, new SizeTieredCompactionStrategyOptions());
         assertFalse("non-incoming bucket should be accepted when it has at 
least 2 SSTables", newBucket.isEmpty());
 
         assertEquals("an sstable with a single value should have equal min/max 
timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp());
         assertEquals("an sstable with a single value should have equal min/max 
timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp());
         assertEquals("an sstable with a single value should have equal min/max 
timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp());
-
-        // if we have more than the max threshold, the oldest should be dropped
-        Collections.sort(sstrs, Collections.reverseOrder(new 
Comparator<SSTableReader>() {
-            public int compare(SSTableReader o1, SSTableReader o2) {
-                return Long.compare(o1.getMinTimestamp(), 
o2.getMinTimestamp()) ;
-            }
-        }));
-
-        List<SSTableReader> bucket = trimToThreshold(sstrs, 2);
-        assertEquals("one bucket should have been dropped", 2, bucket.size());
-        for (SSTableReader sstr : bucket)
-            assertFalse("the oldest sstable should be dropped", 
sstr.getMinTimestamp() == 0);
     }
 
     @Test
@@ -320,4 +307,44 @@ public class DateTieredCompactionStrategyTest extends 
SchemaLoader
         cfs.getDataTracker().unmarkCompacting(cfs.getSSTables());
     }
 
+    @Test
+    public void testSTCSBigWindow()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        ByteBuffer bigValue = ByteBuffer.wrap(new byte[10000]);
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+        int numSSTables = 40;
+        // create big sstabels out of half:
+        long timestamp = System.currentTimeMillis();
+        for (int r = 0; r < numSSTables / 2; r++)
+        {
+            for (int i = 0; i < 10; i++)
+            {
+                DecoratedKey key = Util.dk(String.valueOf(r));
+                Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+                rm.add(CF_STANDARD1, Util.cellname("column"), bigValue, 
timestamp);
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+        // and small ones:
+        for (int r = 0; r < numSSTables / 2; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+            rm.add(CF_STANDARD1, Util.cellname("column"), value, timestamp);
+            rm.apply();
+            cfs.forceBlockingFlush();
+        }
+        Map<String, String> options = new HashMap<>();
+        options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, 
"1");
+        DateTieredCompactionStrategy dtcs = new 
DateTieredCompactionStrategy(cfs, options);
+        for (SSTableReader sstable : cfs.getSSTables())
+            dtcs.addSSTable(sstable);
+        assertEquals(20, dtcs.getNextBackgroundTask(0).sstables.size());
+    }
+
 }

Reply via email to