Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
        NEWS.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1dea9839
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1dea9839
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1dea9839

Branch: refs/heads/trunk
Commit: 1dea98396266a41f9bf64ade05dcf02f12f301d2
Parents: 1ef7d05 77dae50
Author: Marcus Eriksson <[email protected]>
Authored: Thu Oct 16 18:32:16 2014 +0200
Committer: Marcus Eriksson <[email protected]>
Committed: Thu Oct 16 18:33:24 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   7 +
 pylib/cqlshlib/cql3handling.py                  |   4 +
 pylib/cqlshlib/cqlhandling.py                   |   3 +-
 .../DateTieredCompactionStrategy.java           | 399 +++++++++++++++++++
 .../DateTieredCompactionStrategyOptions.java    | 100 +++++
 .../DateTieredCompactionStrategyTest.java       | 242 +++++++++++
 7 files changed, 755 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 74fe652,102a87b..50b9c7e
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,82 -13,13 +13,89 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 +2.1.1
 +=====
 +
 +New features
 +------------
 +   - Netty support for epoll on linux is now enabled.  If for some
 +     reason you want to disable it pass, the following system property
 +     -Dcassandra.native.epoll.enabled=false
 +
 +2.1
 +===
 +
 +New features
 +------------
 +   - Default data and log locations have changed.  If not set in
 +     cassandra.yaml, the data file directory, commitlog directory,
 +     and saved caches directory will default to $CASSANDRA_HOME/data/data,
 +     $CASSANDRA_HOME/data/commitlog, and $CASSANDRA_HOME/data/saved_caches,
 +     respectively.  The log directory now defaults to $CASSANDRA_HOME/logs.
 +     If not set, $CASSANDRA_HOME, defaults to the top-level directory of
 +     the installation.
 +     Note that this should only affect source checkouts and tarballs.
 +     Deb and RPM packages will continue to use /var/lib/cassandra and
 +     /var/log/cassandra in cassandra.yaml.
 +   - SSTable data directory name is slightly changed. Each directory will
 +     have hex string appended after CF name, e.g.
 +         ks/cf-5be396077b811e3a3ab9dc4b9ac088d/
 +     This hex string part represents unique ColumnFamily ID.
 +     Note that existing directories are used as is, so only newly created
 +     directories after upgrade have new directory name format.
 +   - Saved key cache files also have ColumnFamily ID in their file name.
 +   - It is now possible to do incremental repairs, sstables that have been
 +     repaired are marked with a timestamp and not included in the next
 +     repair session. Use nodetool repair -par -inc to use this feature.
 +     A tool to manually mark/unmark sstables as repaired is available in
 +     tools/bin/sstablerepairedset. This is particularly important when
 +     using LCS, or any data not repaired in your first incremental repair
 +     will be put back in L0.
 +   - Bootstrapping now ensures that range movements are consistent,
 +     meaning the data for the new node is taken from the node that is no 
 +     longer a responsible for that range of keys.
 +     If you want the old behavior (due to a lost node perhaps)
 +     you can set the following property 
(-Dcassandra.consistent.rangemovement=false)
 +   - It is now possible to use quoted identifiers in triggers' names. 
 +     WARNING: if you previously used triggers with capital letters in their 
 +     names, then you must quote them from now on.
 +   - Improved stress tool (http://goo.gl/OTNqiQ)
 +   - New incremental repair option (http://goo.gl/MjohJp, 
http://goo.gl/f8jSme)
 +   - Incremental replacement of compacted SSTables (http://goo.gl/JfDBGW)
 +   - The row cache can now cache only the head of partitions 
(http://goo.gl/6TJPH6)
 +   - Off-heap memtables (http://goo.gl/YT7znJ)
 +   - CQL improvements and additions: User-defined types, tuple types, 2ndary
 +     indexing of collections, ... (http://goo.gl/kQl7GW)
 +
 +Upgrading
 +---------
 +   - Rolling upgrades from anything pre-2.0.7 is not supported. Furthermore
 +     pre-2.0 sstables are not supported. This means that before upgrading
 +     a node on 2.1, this node must be started on 2.0 and
 +     'nodetool upgdradesstables' must be run (and this even in the case
 +     of not-rolling upgrades).
 +   - For size-tiered compaction users, Cassandra now defaults to ignoring
 +     the coldest 5% of sstables.  This can be customized with the
 +     cold_reads_to_omit compaction option; 0.0 omits nothing (the old
 +     behavior) and 1.0 omits everything.
 +   - Multithreaded compaction has been removed.
 +   - Counters implementation has been changed, replaced by a safer one with
 +     less caveats, but different performance characteristics. You might have
 +     to change your data model to accomodate the new implementation.
 +     (See https://issues.apache.org/jira/browse/CASSANDRA-6504 and the
 +     blog post at http://goo.gl/qj8iQl for details).
 +   - (per-table) index_interval parameter has been replaced with
 +     min_index_interval and max_index_interval paratemeters. index_interval
 +     has been deprecated.
 +   - support for supercolumns has been removed from json2sstable
 +
+ 2.0.11
+ ======
+ New features
+ ------------
+     - DateTieredCompactionStrategy added, optimized for time series data and 
groups
+       data that is written closely in time (CASSANDRA-6602 for details). 
Consider
+       this experimental for now.
  
  2.0.10
  ======

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 0000000,9c708db..8c997ed
mode 000000,100644..100644
--- 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@@ -1,0 -1,374 +1,399 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.util.*;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Predicate;
+ import com.google.common.collect.*;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.cql3.statements.CFPropDefs;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.SSTableReader;
+ import org.apache.cassandra.utils.Pair;
+ 
+ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
+ {
+     private static final Logger logger = 
LoggerFactory.getLogger(DateTieredCompactionStrategy.class);
+ 
+     protected DateTieredCompactionStrategyOptions options;
+     protected volatile int estimatedRemainingTasks;
+ 
+     public DateTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options)
+     {
+         super(cfs, options);
+         this.estimatedRemainingTasks = 0;
+         this.options = new DateTieredCompactionStrategyOptions(options);
+     }
+ 
+     @Override
+     public synchronized AbstractCompactionTask getNextBackgroundTask(int 
gcBefore)
+     {
+         if (!isEnabled())
+             return null;
+ 
+         while (true)
+         {
+             List<SSTableReader> latestBucket = 
getNextBackgroundSStables(gcBefore);
+ 
+             if (latestBucket.isEmpty())
+                 return null;
+ 
+             if (cfs.getDataTracker().markCompacting(latestBucket))
 -                return new CompactionTask(cfs, latestBucket, gcBefore);
++                return new CompactionTask(cfs, latestBucket, gcBefore, false);
+         }
+     }
+ 
+     /**
+      *
+      * @param gcBefore
+      * @return
+      */
+     private List<SSTableReader> getNextBackgroundSStables(final int gcBefore)
+     {
+         if (!isEnabled() || cfs.getSSTables().isEmpty())
+             return Collections.emptyList();
+ 
+         int base = cfs.getMinimumCompactionThreshold();
+         long now = getNow();
 -
+         Iterable<SSTableReader> candidates = 
filterSuspectSSTables(cfs.getUncompactingSSTables());
+ 
 -        List<SSTableReader> mostInteresting = 
getCompactionCandidates(candidates, now, base);
 -        if (mostInteresting != null)
 -            return mostInteresting;
++        Set<SSTableReader> repairedCandidates = new HashSet<>();
++        Set<SSTableReader> unRepairedCandidates = new HashSet<>();
++        for (SSTableReader sstable : candidates)
++        {
++            if (sstable.isRepaired())
++            {
++                repairedCandidates.add(sstable);
++            }
++            else
++            {
++                unRepairedCandidates.add(sstable);
++            }
++        }
++
++
++        List<SSTableReader> mostInterestingRepaired = 
getCompactionCandidates(repairedCandidates, now, base);
++        List<SSTableReader> mostInterestingUnrepaired = 
getCompactionCandidates(unRepairedCandidates, now, base);
++        if (mostInterestingRepaired != null && mostInterestingUnrepaired != 
null)
++        {
++            return mostInterestingRepaired.size() > 
mostInterestingUnrepaired.size() ? mostInterestingRepaired : 
mostInterestingUnrepaired;
++        }
++        else if (mostInterestingRepaired != null)
++        {
++            return mostInterestingRepaired;
++        }
++        else if (mostInterestingUnrepaired != null)
++        {
++            return mostInterestingUnrepaired;
++        }
+ 
+         // if there is no sstable to compact in standard way, try compacting 
single sstable whose droppable tombstone
+         // ratio is greater than threshold.
+         List<SSTableReader> sstablesWithTombstones = Lists.newArrayList();
+         for (SSTableReader sstable : candidates)
+         {
+             if (worthDroppingTombstones(sstable, gcBefore))
+                 sstablesWithTombstones.add(sstable);
+         }
+         if (sstablesWithTombstones.isEmpty())
+             return Collections.emptyList();
+ 
+         return 
Collections.singletonList(Collections.min(sstablesWithTombstones, new 
SSTableReader.SizeComparator()));
+     }
+ 
+     private List<SSTableReader> 
getCompactionCandidates(Iterable<SSTableReader> candidateSSTables, long now, 
int base)
+     {
+         Iterable<SSTableReader> candidates = 
filterOldSSTables(Lists.newArrayList(candidateSSTables), options.maxSSTableAge, 
now);
+ 
+         List<List<SSTableReader>> buckets = 
getBuckets(createSSTableAndMinTimestampPairs(candidates), options.baseTime, 
base, now);
+         logger.debug("Compaction buckets are {}", buckets);
+         updateEstimatedCompactionsByTasks(buckets);
+         List<SSTableReader> mostInteresting = newestBucket(buckets, 
cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold());
+         if (!mostInteresting.isEmpty())
+             return mostInteresting;
+         return null;
+     }
+ 
+     /**
+      * Gets the timestamp that DateTieredCompactionStrategy considers to be 
the "current time".
+      * @return the maximum timestamp across all SSTables.
+      * @throws java.util.NoSuchElementException if there are no SSTables.
+      */
+     private long getNow()
+     {
+         return Collections.max(cfs.getSSTables(), new 
Comparator<SSTableReader>()
+         {
+             public int compare(SSTableReader o1, SSTableReader o2)
+             {
+                 return Long.compare(o1.getMaxTimestamp(), 
o2.getMaxTimestamp());
+             }
+         }).getMaxTimestamp();
+     }
+ 
+     /**
+      * Removes all sstables with max timestamp older than maxSSTableAge.
+      * @param sstables all sstables to consider
+      * @param maxSSTableAge the age in milliseconds when an SSTable stops 
participating in compactions
+      * @param now current time. SSTables with max timestamp less than (now - 
maxSSTableAge) are filtered.
+      * @return a list of sstables with the oldest sstables excluded
+      */
+     @VisibleForTesting
+     static Iterable<SSTableReader> filterOldSSTables(List<SSTableReader> 
sstables, long maxSSTableAge, long now)
+     {
+         if (maxSSTableAge == 0)
+             return sstables;
+         final long cutoff = now - maxSSTableAge;
+         return Iterables.filter(sstables, new Predicate<SSTableReader>()
+         {
+             @Override
+             public boolean apply(SSTableReader sstable)
+             {
+                 return sstable.getMaxTimestamp() >= cutoff;
+             }
+         });
+     }
+ 
+     /**
+      *
+      * @param sstables
+      * @return
+      */
+     public static List<Pair<SSTableReader, Long>> 
createSSTableAndMinTimestampPairs(Iterable<SSTableReader> sstables)
+     {
+         List<Pair<SSTableReader, Long>> sstableMinTimestampPairs = 
Lists.newArrayListWithCapacity(Iterables.size(sstables));
+         for (SSTableReader sstable : sstables)
+             sstableMinTimestampPairs.add(Pair.create(sstable, 
sstable.getMinTimestamp()));
+         return sstableMinTimestampPairs;
+     }
+ 
+ 
+     /**
+      * A target time span used for bucketing SSTables based on timestamps.
+      */
+     private static class Target
+     {
+         // How big a range of timestamps fit inside the target.
+         public final long size;
+         // A timestamp t hits the target iff t / size == divPosition.
+         public final long divPosition;
+ 
+         public Target(long size, long divPosition)
+         {
+             this.size = size;
+             this.divPosition = divPosition;
+         }
+ 
+         /**
+          * Compares the target to a timestamp.
+          * @param timestamp the timestamp to compare.
+          * @return a negative integer, zero, or a positive integer as the 
target lies before, covering, or after than the timestamp.
+          */
+         public int compareToTimestamp(long timestamp)
+         {
+             return Long.compare(divPosition, timestamp / size);
+         }
+ 
+         /**
+          * Tells if the timestamp hits the target.
+          * @param timestamp the timestamp to test.
+          * @return <code>true</code> iff timestamp / size == divPosition.
+          */
+         public boolean onTarget(long timestamp)
+         {
+             return compareToTimestamp(timestamp) == 0;
+         }
+ 
+         /**
+          * Gets the next target, which represents an earlier time span.
+          * @param base The number of contiguous targets that will have the 
same size. Targets following those will be <code>base</code> times as big.
+          * @return
+          */
+         public Target nextTarget(int base)
+         {
+             if (divPosition % base > 0)
+                 return new Target(size, divPosition - 1);
+             else
+                 return new Target(size * base, divPosition / base - 1);
+         }
+     }
+ 
+ 
+     /**
+      * Group files with similar min timestamp into buckets. Files with recent 
min timestamps are grouped together into
+      * buckets designated to short timespans while files with older 
timestamps are grouped into buckets representing
+      * longer timespans.
+      * @param files pairs consisting of a file and its min timestamp
+      * @param timeUnit
+      * @param base
+      * @param now
+      * @return a list of buckets of files. The list is ordered such that the 
files with newest timestamps come first.
+      *         Each bucket is also a list of files ordered from newest to 
oldest.
+      */
+     @VisibleForTesting
+     static <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files, long 
timeUnit, int base, long now)
+     {
+         // Sort files by age. Newest first.
+         final List<Pair<T, Long>> sortedFiles = Lists.newArrayList(files);
+         Collections.sort(sortedFiles, Collections.reverseOrder(new 
Comparator<Pair<T, Long>>()
+         {
+             public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
+             {
+                 return p1.right.compareTo(p2.right);
+             }
+         }));
+ 
+         List<List<T>> buckets = Lists.newArrayList();
+         Target target = getInitialTarget(now, timeUnit);
+         PeekingIterator<Pair<T, Long>> it = 
Iterators.peekingIterator(sortedFiles.iterator());
+ 
+         outerLoop:
+         while (it.hasNext())
+         {
+             while (!target.onTarget(it.peek().right))
+             {
+                 // If the file is too new for the target, skip it.
+                 if (target.compareToTimestamp(it.peek().right) < 0)
+                 {
+                     it.next();
+ 
+                     if (!it.hasNext())
+                         break outerLoop;
+                 }
+                 else // If the file is too old for the target, switch targets.
+                     target = target.nextTarget(base);
+             }
+ 
+             List<T> bucket = Lists.newArrayList();
+             while (target.onTarget(it.peek().right))
+             {
+                 bucket.add(it.next().left);
+ 
+                 if (!it.hasNext())
+                     break;
+             }
+             buckets.add(bucket);
+         }
+ 
+         return buckets;
+     }
+ 
+     @VisibleForTesting
+     static Target getInitialTarget(long now, long timeUnit)
+     {
+         return new Target(timeUnit, now / timeUnit);
+     }
+ 
+ 
+     private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>> 
tasks)
+     {
+         int n = 0;
+         for (List<SSTableReader> bucket : tasks)
+         {
+             if (bucket.size() >= cfs.getMinimumCompactionThreshold())
+                 n += Math.ceil((double)bucket.size() / 
cfs.getMaximumCompactionThreshold());
+         }
+         estimatedRemainingTasks = n;
+     }
+ 
+ 
+     /**
+      * @param buckets list of buckets, sorted from newest to oldest, from 
which to return the newest bucket within thresholds.
+      * @param minThreshold minimum number of sstables in a bucket to qualify.
+      * @param maxThreshold maximum number of sstables to compact at once (the 
returned bucket will be trimmed down to this).
+      * @return a bucket (list) of sstables to compact.
+      */
+     @VisibleForTesting
+     static List<SSTableReader> newestBucket(List<List<SSTableReader>> 
buckets, int minThreshold, int maxThreshold)
+     {
+         // Skip buckets containing less than minThreshold sstables, and limit 
other buckets to maxThreshold sstables.
+         for (List<SSTableReader> bucket : buckets)
+             if (bucket.size() >= minThreshold)
+                 return trimToThreshold(bucket, maxThreshold);
+         return Collections.emptyList();
+     }
+ 
+     /**
+      * @param bucket list of sstables, ordered from newest to oldest by 
getMinTimestamp().
+      * @param maxThreshold maximum number of sstables in a single compaction 
task.
+      * @return A bucket trimmed to the <code>maxThreshold</code> newest 
sstables.
+      */
+     @VisibleForTesting
+     static List<SSTableReader> trimToThreshold(List<SSTableReader> bucket, 
int maxThreshold)
+     {
+         // Trim the oldest sstables off the end to meet the maxThreshold
+         return bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+     }
+ 
+     @Override
 -    public synchronized AbstractCompactionTask getMaximalTask(int gcBefore)
++    public synchronized Collection<AbstractCompactionTask> getMaximalTask(int 
gcBefore)
+     {
+         Iterable<SSTableReader> sstables = cfs.markAllCompacting();
+         if (sstables == null)
+             return null;
+ 
 -        return new CompactionTask(cfs, sstables, gcBefore);
++        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, 
sstables, gcBefore, false));
+     }
+ 
+     @Override
+     public synchronized AbstractCompactionTask 
getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+     {
+         assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+ 
+         if (!cfs.getDataTracker().markCompacting(sstables))
+         {
+             logger.debug("Unable to mark {} for compaction; probably a 
background compaction got to it first.  You can disable background compactions 
temporarily if this is a problem", sstables);
+             return null;
+         }
+ 
 -        return new CompactionTask(cfs, sstables, 
gcBefore).setUserDefined(true);
++        return new CompactionTask(cfs, sstables, gcBefore, 
false).setUserDefined(true);
+     }
+ 
+     public int getEstimatedRemainingTasks()
+     {
+         return estimatedRemainingTasks;
+     }
+ 
+     public long getMaxSSTableBytes()
+     {
+         return Long.MAX_VALUE;
+     }
+ 
+ 
+     public static Map<String, String> validateOptions(Map<String, String> 
options) throws ConfigurationException
+     {
+         Map<String, String> uncheckedOptions = 
AbstractCompactionStrategy.validateOptions(options);
+         uncheckedOptions = 
DateTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+ 
+         uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD);
+         uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD);
+ 
+         return uncheckedOptions;
+     }
+ 
+     public String toString()
+     {
+         return String.format("DateTieredCompactionStrategy[%s/%s]",
+                 cfs.getMinimumCompactionThreshold(),
+                 cfs.getMaximumCompactionThreshold());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
index 0000000,299e1af..ef189ba
mode 000000,100644..100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@@ -1,0 -1,242 +1,242 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.*;
+ 
+ import org.junit.Test;
+ 
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Lists;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.RowMutation;
++import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.SSTableReader;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.getBuckets;
+ import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.newestBucket;
+ import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.trimToThreshold;
+ import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.filterOldSSTables;
+ import static 
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.validateOptions;
+ 
+ import static org.junit.Assert.*;
+ 
+ public class DateTieredCompactionStrategyTest extends SchemaLoader
+ {
+     public static final String KEYSPACE1 = "Keyspace1";
+     private static final String CF_STANDARD1 = "Standard1";
+ 
+     @Test
+     public void testOptionsValidation() throws ConfigurationException
+     {
+         Map<String, String> options = new HashMap<>();
+         options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "30");
+         options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, 
"1825");
+         Map<String, String> unvalidated = validateOptions(options);
+         assertTrue(unvalidated.isEmpty());
+ 
+         try
+         {
+             options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, 
"0");
+             validateOptions(options);
+             fail(String.format("%s == 0 should be rejected", 
DateTieredCompactionStrategyOptions.BASE_TIME_KEY));
+         }
+         catch (ConfigurationException e) {}
+ 
+         try
+         {
+             options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, 
"-1337");
+             validateOptions(options);
+             fail(String.format("%Negative %s should be rejected", 
DateTieredCompactionStrategyOptions.BASE_TIME_KEY));
+         }
+         catch (ConfigurationException e)
+         {
+             options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, 
"1");
+         }
+ 
+         try
+         {
+             
options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "-1337");
+             validateOptions(options);
+             fail(String.format("%Negative %s should be rejected", 
DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY));
+         }
+         catch (ConfigurationException e)
+         {
+             
options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "0");
+         }
+ 
+         options.put("bad_option", "1.0");
+         unvalidated = validateOptions(options);
+         assertTrue(unvalidated.containsKey("bad_option"));
+     }
+ 
+     @Test
+     public void testGetBuckets()
+     {
+         List<Pair<String, Long>> pairs = Lists.newArrayList(
+                 Pair.create("a", 199L),
+                 Pair.create("b", 299L),
+                 Pair.create("a", 1L),
+                 Pair.create("b", 201L)
+         );
+         List<List<String>> buckets = getBuckets(pairs, 100L, 2, 200L);
+         assertEquals(2, buckets.size());
+ 
+         for (List<String> bucket : buckets)
+         {
+             assertEquals(2, bucket.size());
+             assertEquals(bucket.get(0), bucket.get(1));
+         }
+ 
+ 
+         pairs = Lists.newArrayList(
+                 Pair.create("a", 2000L),
+                 Pair.create("b", 3600L),
+                 Pair.create("a", 200L),
+                 Pair.create("c", 3950L),
+                 Pair.create("too new", 4125L),
+                 Pair.create("b", 3899L),
+                 Pair.create("c", 3900L)
+         );
+         buckets = getBuckets(pairs, 100L, 3, 4050L);
+         // targets (divPosition, size): (40, 100), (39, 100), (12, 300), (3, 
900), (0, 2700)
+         // in other words: 0 - 2699, 2700 - 3599, 3600 - 3899, 3900 - 3999, 
4000 - 4099
+         assertEquals(3, buckets.size());
+ 
+         for (List<String> bucket : buckets)
+         {
+             assertEquals(2, bucket.size());
+             assertEquals(bucket.get(0), bucket.get(1));
+         }
+ 
+ 
+         // Test base 1.
+         pairs = Lists.newArrayList(
+                 Pair.create("a", 200L),
+                 Pair.create("a", 299L),
+                 Pair.create("b", 2000L),
+                 Pair.create("b", 2014L),
+                 Pair.create("c", 3610L),
+                 Pair.create("c", 3690L),
+                 Pair.create("d", 3898L),
+                 Pair.create("d", 3899L),
+                 Pair.create("e", 3900L),
+                 Pair.create("e", 3950L),
+                 Pair.create("too new", 4125L)
+         );
+         buckets = getBuckets(pairs, 100L, 1, 4050L);
+ 
+         assertEquals(5, buckets.size());
+ 
+         for (List<String> bucket : buckets)
+         {
+             assertEquals(2, bucket.size());
+             assertEquals(bucket.get(0), bucket.get(1));
+         }
+     }
+ 
+     @Test
+     public void testPrepBucket()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE1);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+         cfs.truncateBlocking();
+         cfs.disableAutoCompaction();
+ 
+         ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+ 
+         // create 3 sstables
+         int numSSTables = 3;
+         for (int r = 0; r < numSSTables; r++)
+         {
+             DecoratedKey key = Util.dk(String.valueOf(r));
 -            RowMutation rm = new RowMutation(KEYSPACE1, key.key);
 -            rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, r);
++            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
++            rm.add(CF_STANDARD1, Util.cellname("column"), value, r);
+             rm.apply();
+             cfs.forceBlockingFlush();
+         }
+         cfs.forceBlockingFlush();
+ 
+         List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+ 
+         List<SSTableReader> newBucket = 
newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32);
+         assertTrue("nothing should be returned when all buckets are below the 
min threshold", newBucket.isEmpty());
+ 
+         assertEquals("an sstable with a single value should have equal 
min/max timestamps", sstrs.get(0).getMinTimestamp(), 
sstrs.get(0).getMaxTimestamp());
+         assertEquals("an sstable with a single value should have equal 
min/max timestamps", sstrs.get(1).getMinTimestamp(), 
sstrs.get(1).getMaxTimestamp());
+         assertEquals("an sstable with a single value should have equal 
min/max timestamps", sstrs.get(2).getMinTimestamp(), 
sstrs.get(2).getMaxTimestamp());
+ 
+         // if we have more than the max threshold, the oldest should be 
dropped
+         Collections.sort(sstrs, Collections.reverseOrder(new 
Comparator<SSTableReader>() {
+             public int compare(SSTableReader o1, SSTableReader o2) {
+                 return Long.compare(o1.getMinTimestamp(), 
o2.getMinTimestamp()) ;
+             }
+         }));
+ 
+         List<SSTableReader> bucket = trimToThreshold(sstrs, 2);
+         assertEquals("one bucket should have been dropped", 2, bucket.size());
+         for (SSTableReader sstr : bucket)
+             assertFalse("the oldest sstable should be dropped", 
sstr.getMinTimestamp() == 0);
+     }
+ 
+     @Test
+     public void testFilterOldSSTables()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE1);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+         cfs.truncateBlocking();
+         cfs.disableAutoCompaction();
+ 
+         ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+ 
+         // create 3 sstables
+         int numSSTables = 3;
+         for (int r = 0; r < numSSTables; r++)
+         {
+             DecoratedKey key = Util.dk(String.valueOf(r));
 -            RowMutation rm = new RowMutation(KEYSPACE1, key.key);
 -            rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, r);
++            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
++            rm.add(CF_STANDARD1, Util.cellname("column"), value, r);
+             rm.apply();
+             cfs.forceBlockingFlush();
+         }
+         cfs.forceBlockingFlush();
+ 
+         Iterable<SSTableReader> filtered;
+         List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+ 
+         filtered = filterOldSSTables(sstrs, 0, 2);
+         assertEquals("when maxSSTableAge is zero, no sstables should be 
filtered", sstrs.size(), Iterables.size(filtered));
+ 
+         filtered = filterOldSSTables(sstrs, 1, 2);
+         assertEquals("only the newest 2 sstables should remain", 2, 
Iterables.size(filtered));
+ 
+         filtered = filterOldSSTables(sstrs, 1, 3);
+         assertEquals("only the newest sstable should remain", 1, 
Iterables.size(filtered));
+ 
+         filtered = filterOldSSTables(sstrs, 1, 4);
+         assertEquals("no sstables should remain when all are too old", 0, 
Iterables.size(filtered));
+     }
+ }

Reply via email to