Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
NEWS.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1dea9839
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1dea9839
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1dea9839
Branch: refs/heads/trunk
Commit: 1dea98396266a41f9bf64ade05dcf02f12f301d2
Parents: 1ef7d05 77dae50
Author: Marcus Eriksson <[email protected]>
Authored: Thu Oct 16 18:32:16 2014 +0200
Committer: Marcus Eriksson <[email protected]>
Committed: Thu Oct 16 18:33:24 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 7 +
pylib/cqlshlib/cql3handling.py | 4 +
pylib/cqlshlib/cqlhandling.py | 3 +-
.../DateTieredCompactionStrategy.java | 399 +++++++++++++++++++
.../DateTieredCompactionStrategyOptions.java | 100 +++++
.../DateTieredCompactionStrategyTest.java | 242 +++++++++++
7 files changed, 755 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 74fe652,102a87b..50b9c7e
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,82 -13,13 +13,89 @@@ restore snapshots created with the prev
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
+2.1.1
+=====
+
+New features
+------------
+ - Netty support for epoll on linux is now enabled. If for some
+ reason you want to disable it pass, the following system property
+ -Dcassandra.native.epoll.enabled=false
+
+2.1
+===
+
+New features
+------------
+ - Default data and log locations have changed. If not set in
+ cassandra.yaml, the data file directory, commitlog directory,
+ and saved caches directory will default to $CASSANDRA_HOME/data/data,
+ $CASSANDRA_HOME/data/commitlog, and $CASSANDRA_HOME/data/saved_caches,
+ respectively. The log directory now defaults to $CASSANDRA_HOME/logs.
+ If not set, $CASSANDRA_HOME, defaults to the top-level directory of
+ the installation.
+ Note that this should only affect source checkouts and tarballs.
+ Deb and RPM packages will continue to use /var/lib/cassandra and
+ /var/log/cassandra in cassandra.yaml.
+ - SSTable data directory name is slightly changed. Each directory will
+ have hex string appended after CF name, e.g.
+ ks/cf-5be396077b811e3a3ab9dc4b9ac088d/
+ This hex string part represents unique ColumnFamily ID.
+ Note that existing directories are used as is, so only newly created
+ directories after upgrade have new directory name format.
+ - Saved key cache files also have ColumnFamily ID in their file name.
+ - It is now possible to do incremental repairs, sstables that have been
+ repaired are marked with a timestamp and not included in the next
+ repair session. Use nodetool repair -par -inc to use this feature.
+ A tool to manually mark/unmark sstables as repaired is available in
+ tools/bin/sstablerepairedset. This is particularly important when
+ using LCS, or any data not repaired in your first incremental repair
+ will be put back in L0.
+ - Bootstrapping now ensures that range movements are consistent,
+ meaning the data for the new node is taken from the node that is no
+ longer a responsible for that range of keys.
+ If you want the old behavior (due to a lost node perhaps)
+ you can set the following property
(-Dcassandra.consistent.rangemovement=false)
+ - It is now possible to use quoted identifiers in triggers' names.
+ WARNING: if you previously used triggers with capital letters in their
+ names, then you must quote them from now on.
+ - Improved stress tool (http://goo.gl/OTNqiQ)
+ - New incremental repair option (http://goo.gl/MjohJp,
http://goo.gl/f8jSme)
+ - Incremental replacement of compacted SSTables (http://goo.gl/JfDBGW)
+ - The row cache can now cache only the head of partitions
(http://goo.gl/6TJPH6)
+ - Off-heap memtables (http://goo.gl/YT7znJ)
+ - CQL improvements and additions: User-defined types, tuple types, 2ndary
+ indexing of collections, ... (http://goo.gl/kQl7GW)
+
+Upgrading
+---------
+ - Rolling upgrades from anything pre-2.0.7 is not supported. Furthermore
+ pre-2.0 sstables are not supported. This means that before upgrading
+ a node on 2.1, this node must be started on 2.0 and
+ 'nodetool upgdradesstables' must be run (and this even in the case
+ of not-rolling upgrades).
+ - For size-tiered compaction users, Cassandra now defaults to ignoring
+ the coldest 5% of sstables. This can be customized with the
+ cold_reads_to_omit compaction option; 0.0 omits nothing (the old
+ behavior) and 1.0 omits everything.
+ - Multithreaded compaction has been removed.
+ - Counters implementation has been changed, replaced by a safer one with
+ less caveats, but different performance characteristics. You might have
+ to change your data model to accomodate the new implementation.
+ (See https://issues.apache.org/jira/browse/CASSANDRA-6504 and the
+ blog post at http://goo.gl/qj8iQl for details).
+ - (per-table) index_interval parameter has been replaced with
+ min_index_interval and max_index_interval paratemeters. index_interval
+ has been deprecated.
+ - support for supercolumns has been removed from json2sstable
+
+ 2.0.11
+ ======
+ New features
+ ------------
+ - DateTieredCompactionStrategy added, optimized for time series data and
groups
+ data that is written closely in time (CASSANDRA-6602 for details).
Consider
+ this experimental for now.
2.0.10
======
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 0000000,9c708db..8c997ed
mode 000000,100644..100644
---
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@@ -1,0 -1,374 +1,399 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.db.compaction;
+
+ import java.util.*;
+
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Predicate;
+ import com.google.common.collect.*;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import org.apache.cassandra.cql3.statements.CFPropDefs;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.SSTableReader;
+ import org.apache.cassandra.utils.Pair;
+
+ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
+ {
+ private static final Logger logger =
LoggerFactory.getLogger(DateTieredCompactionStrategy.class);
+
+ protected DateTieredCompactionStrategyOptions options;
+ protected volatile int estimatedRemainingTasks;
+
+ public DateTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String,
String> options)
+ {
+ super(cfs, options);
+ this.estimatedRemainingTasks = 0;
+ this.options = new DateTieredCompactionStrategyOptions(options);
+ }
+
+ @Override
+ public synchronized AbstractCompactionTask getNextBackgroundTask(int
gcBefore)
+ {
+ if (!isEnabled())
+ return null;
+
+ while (true)
+ {
+ List<SSTableReader> latestBucket =
getNextBackgroundSStables(gcBefore);
+
+ if (latestBucket.isEmpty())
+ return null;
+
+ if (cfs.getDataTracker().markCompacting(latestBucket))
- return new CompactionTask(cfs, latestBucket, gcBefore);
++ return new CompactionTask(cfs, latestBucket, gcBefore, false);
+ }
+ }
+
+ /**
+ *
+ * @param gcBefore
+ * @return
+ */
+ private List<SSTableReader> getNextBackgroundSStables(final int gcBefore)
+ {
+ if (!isEnabled() || cfs.getSSTables().isEmpty())
+ return Collections.emptyList();
+
+ int base = cfs.getMinimumCompactionThreshold();
+ long now = getNow();
-
+ Iterable<SSTableReader> candidates =
filterSuspectSSTables(cfs.getUncompactingSSTables());
+
- List<SSTableReader> mostInteresting =
getCompactionCandidates(candidates, now, base);
- if (mostInteresting != null)
- return mostInteresting;
++ Set<SSTableReader> repairedCandidates = new HashSet<>();
++ Set<SSTableReader> unRepairedCandidates = new HashSet<>();
++ for (SSTableReader sstable : candidates)
++ {
++ if (sstable.isRepaired())
++ {
++ repairedCandidates.add(sstable);
++ }
++ else
++ {
++ unRepairedCandidates.add(sstable);
++ }
++ }
++
++
++ List<SSTableReader> mostInterestingRepaired =
getCompactionCandidates(repairedCandidates, now, base);
++ List<SSTableReader> mostInterestingUnrepaired =
getCompactionCandidates(unRepairedCandidates, now, base);
++ if (mostInterestingRepaired != null && mostInterestingUnrepaired !=
null)
++ {
++ return mostInterestingRepaired.size() >
mostInterestingUnrepaired.size() ? mostInterestingRepaired :
mostInterestingUnrepaired;
++ }
++ else if (mostInterestingRepaired != null)
++ {
++ return mostInterestingRepaired;
++ }
++ else if (mostInterestingUnrepaired != null)
++ {
++ return mostInterestingUnrepaired;
++ }
+
+ // if there is no sstable to compact in standard way, try compacting
single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ List<SSTableReader> sstablesWithTombstones = Lists.newArrayList();
+ for (SSTableReader sstable : candidates)
+ {
+ if (worthDroppingTombstones(sstable, gcBefore))
+ sstablesWithTombstones.add(sstable);
+ }
+ if (sstablesWithTombstones.isEmpty())
+ return Collections.emptyList();
+
+ return
Collections.singletonList(Collections.min(sstablesWithTombstones, new
SSTableReader.SizeComparator()));
+ }
+
+ private List<SSTableReader>
getCompactionCandidates(Iterable<SSTableReader> candidateSSTables, long now,
int base)
+ {
+ Iterable<SSTableReader> candidates =
filterOldSSTables(Lists.newArrayList(candidateSSTables), options.maxSSTableAge,
now);
+
+ List<List<SSTableReader>> buckets =
getBuckets(createSSTableAndMinTimestampPairs(candidates), options.baseTime,
base, now);
+ logger.debug("Compaction buckets are {}", buckets);
+ updateEstimatedCompactionsByTasks(buckets);
+ List<SSTableReader> mostInteresting = newestBucket(buckets,
cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold());
+ if (!mostInteresting.isEmpty())
+ return mostInteresting;
+ return null;
+ }
+
+ /**
+ * Gets the timestamp that DateTieredCompactionStrategy considers to be
the "current time".
+ * @return the maximum timestamp across all SSTables.
+ * @throws java.util.NoSuchElementException if there are no SSTables.
+ */
+ private long getNow()
+ {
+ return Collections.max(cfs.getSSTables(), new
Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return Long.compare(o1.getMaxTimestamp(),
o2.getMaxTimestamp());
+ }
+ }).getMaxTimestamp();
+ }
+
+ /**
+ * Removes all sstables with max timestamp older than maxSSTableAge.
+ * @param sstables all sstables to consider
+ * @param maxSSTableAge the age in milliseconds when an SSTable stops
participating in compactions
+ * @param now current time. SSTables with max timestamp less than (now -
maxSSTableAge) are filtered.
+ * @return a list of sstables with the oldest sstables excluded
+ */
+ @VisibleForTesting
+ static Iterable<SSTableReader> filterOldSSTables(List<SSTableReader>
sstables, long maxSSTableAge, long now)
+ {
+ if (maxSSTableAge == 0)
+ return sstables;
+ final long cutoff = now - maxSSTableAge;
+ return Iterables.filter(sstables, new Predicate<SSTableReader>()
+ {
+ @Override
+ public boolean apply(SSTableReader sstable)
+ {
+ return sstable.getMaxTimestamp() >= cutoff;
+ }
+ });
+ }
+
+ /**
+ *
+ * @param sstables
+ * @return
+ */
+ public static List<Pair<SSTableReader, Long>>
createSSTableAndMinTimestampPairs(Iterable<SSTableReader> sstables)
+ {
+ List<Pair<SSTableReader, Long>> sstableMinTimestampPairs =
Lists.newArrayListWithCapacity(Iterables.size(sstables));
+ for (SSTableReader sstable : sstables)
+ sstableMinTimestampPairs.add(Pair.create(sstable,
sstable.getMinTimestamp()));
+ return sstableMinTimestampPairs;
+ }
+
+
+ /**
+ * A target time span used for bucketing SSTables based on timestamps.
+ */
+ private static class Target
+ {
+ // How big a range of timestamps fit inside the target.
+ public final long size;
+ // A timestamp t hits the target iff t / size == divPosition.
+ public final long divPosition;
+
+ public Target(long size, long divPosition)
+ {
+ this.size = size;
+ this.divPosition = divPosition;
+ }
+
+ /**
+ * Compares the target to a timestamp.
+ * @param timestamp the timestamp to compare.
+ * @return a negative integer, zero, or a positive integer as the
target lies before, covering, or after than the timestamp.
+ */
+ public int compareToTimestamp(long timestamp)
+ {
+ return Long.compare(divPosition, timestamp / size);
+ }
+
+ /**
+ * Tells if the timestamp hits the target.
+ * @param timestamp the timestamp to test.
+ * @return <code>true</code> iff timestamp / size == divPosition.
+ */
+ public boolean onTarget(long timestamp)
+ {
+ return compareToTimestamp(timestamp) == 0;
+ }
+
+ /**
+ * Gets the next target, which represents an earlier time span.
+ * @param base The number of contiguous targets that will have the
same size. Targets following those will be <code>base</code> times as big.
+ * @return
+ */
+ public Target nextTarget(int base)
+ {
+ if (divPosition % base > 0)
+ return new Target(size, divPosition - 1);
+ else
+ return new Target(size * base, divPosition / base - 1);
+ }
+ }
+
+
+ /**
+ * Group files with similar min timestamp into buckets. Files with recent
min timestamps are grouped together into
+ * buckets designated to short timespans while files with older
timestamps are grouped into buckets representing
+ * longer timespans.
+ * @param files pairs consisting of a file and its min timestamp
+ * @param timeUnit
+ * @param base
+ * @param now
+ * @return a list of buckets of files. The list is ordered such that the
files with newest timestamps come first.
+ * Each bucket is also a list of files ordered from newest to
oldest.
+ */
+ @VisibleForTesting
+ static <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files, long
timeUnit, int base, long now)
+ {
+ // Sort files by age. Newest first.
+ final List<Pair<T, Long>> sortedFiles = Lists.newArrayList(files);
+ Collections.sort(sortedFiles, Collections.reverseOrder(new
Comparator<Pair<T, Long>>()
+ {
+ public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
+ {
+ return p1.right.compareTo(p2.right);
+ }
+ }));
+
+ List<List<T>> buckets = Lists.newArrayList();
+ Target target = getInitialTarget(now, timeUnit);
+ PeekingIterator<Pair<T, Long>> it =
Iterators.peekingIterator(sortedFiles.iterator());
+
+ outerLoop:
+ while (it.hasNext())
+ {
+ while (!target.onTarget(it.peek().right))
+ {
+ // If the file is too new for the target, skip it.
+ if (target.compareToTimestamp(it.peek().right) < 0)
+ {
+ it.next();
+
+ if (!it.hasNext())
+ break outerLoop;
+ }
+ else // If the file is too old for the target, switch targets.
+ target = target.nextTarget(base);
+ }
+
+ List<T> bucket = Lists.newArrayList();
+ while (target.onTarget(it.peek().right))
+ {
+ bucket.add(it.next().left);
+
+ if (!it.hasNext())
+ break;
+ }
+ buckets.add(bucket);
+ }
+
+ return buckets;
+ }
+
+ @VisibleForTesting
+ static Target getInitialTarget(long now, long timeUnit)
+ {
+ return new Target(timeUnit, now / timeUnit);
+ }
+
+
+ private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>>
tasks)
+ {
+ int n = 0;
+ for (List<SSTableReader> bucket : tasks)
+ {
+ if (bucket.size() >= cfs.getMinimumCompactionThreshold())
+ n += Math.ceil((double)bucket.size() /
cfs.getMaximumCompactionThreshold());
+ }
+ estimatedRemainingTasks = n;
+ }
+
+
+ /**
+ * @param buckets list of buckets, sorted from newest to oldest, from
which to return the newest bucket within thresholds.
+ * @param minThreshold minimum number of sstables in a bucket to qualify.
+ * @param maxThreshold maximum number of sstables to compact at once (the
returned bucket will be trimmed down to this).
+ * @return a bucket (list) of sstables to compact.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> newestBucket(List<List<SSTableReader>>
buckets, int minThreshold, int maxThreshold)
+ {
+ // Skip buckets containing less than minThreshold sstables, and limit
other buckets to maxThreshold sstables.
+ for (List<SSTableReader> bucket : buckets)
+ if (bucket.size() >= minThreshold)
+ return trimToThreshold(bucket, maxThreshold);
+ return Collections.emptyList();
+ }
+
+ /**
+ * @param bucket list of sstables, ordered from newest to oldest by
getMinTimestamp().
+ * @param maxThreshold maximum number of sstables in a single compaction
task.
+ * @return A bucket trimmed to the <code>maxThreshold</code> newest
sstables.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> trimToThreshold(List<SSTableReader> bucket,
int maxThreshold)
+ {
+ // Trim the oldest sstables off the end to meet the maxThreshold
+ return bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+ }
+
+ @Override
- public synchronized AbstractCompactionTask getMaximalTask(int gcBefore)
++ public synchronized Collection<AbstractCompactionTask> getMaximalTask(int
gcBefore)
+ {
+ Iterable<SSTableReader> sstables = cfs.markAllCompacting();
+ if (sstables == null)
+ return null;
+
- return new CompactionTask(cfs, sstables, gcBefore);
++ return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs,
sstables, gcBefore, false));
+ }
+
+ @Override
+ public synchronized AbstractCompactionTask
getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+ if (!cfs.getDataTracker().markCompacting(sstables))
+ {
+ logger.debug("Unable to mark {} for compaction; probably a
background compaction got to it first. You can disable background compactions
temporarily if this is a problem", sstables);
+ return null;
+ }
+
- return new CompactionTask(cfs, sstables,
gcBefore).setUserDefined(true);
++ return new CompactionTask(cfs, sstables, gcBefore,
false).setUserDefined(true);
+ }
+
+ public int getEstimatedRemainingTasks()
+ {
+ return estimatedRemainingTasks;
+ }
+
+ public long getMaxSSTableBytes()
+ {
+ return Long.MAX_VALUE;
+ }
+
+
+ public static Map<String, String> validateOptions(Map<String, String>
options) throws ConfigurationException
+ {
+ Map<String, String> uncheckedOptions =
AbstractCompactionStrategy.validateOptions(options);
+ uncheckedOptions =
DateTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+ uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD);
+ uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD);
+
+ return uncheckedOptions;
+ }
+
+ public String toString()
+ {
+ return String.format("DateTieredCompactionStrategy[%s/%s]",
+ cfs.getMinimumCompactionThreshold(),
+ cfs.getMaximumCompactionThreshold());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1dea9839/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc
test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
index 0000000,299e1af..ef189ba
mode 000000,100644..100644
---
a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@@ -1,0 -1,242 +1,242 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.db.compaction;
+
+ import java.nio.ByteBuffer;
+ import java.util.*;
+
+ import org.junit.Test;
+
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Lists;
+
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.RowMutation;
++import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.SSTableReader;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.Pair;
+
+ import static
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.getBuckets;
+ import static
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.newestBucket;
+ import static
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.trimToThreshold;
+ import static
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.filterOldSSTables;
+ import static
org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.validateOptions;
+
+ import static org.junit.Assert.*;
+
+ public class DateTieredCompactionStrategyTest extends SchemaLoader
+ {
+ public static final String KEYSPACE1 = "Keyspace1";
+ private static final String CF_STANDARD1 = "Standard1";
+
+ @Test
+ public void testOptionsValidation() throws ConfigurationException
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "30");
+ options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY,
"1825");
+ Map<String, String> unvalidated = validateOptions(options);
+ assertTrue(unvalidated.isEmpty());
+
+ try
+ {
+ options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY,
"0");
+ validateOptions(options);
+ fail(String.format("%s == 0 should be rejected",
DateTieredCompactionStrategyOptions.BASE_TIME_KEY));
+ }
+ catch (ConfigurationException e) {}
+
+ try
+ {
+ options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY,
"-1337");
+ validateOptions(options);
+ fail(String.format("%Negative %s should be rejected",
DateTieredCompactionStrategyOptions.BASE_TIME_KEY));
+ }
+ catch (ConfigurationException e)
+ {
+ options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY,
"1");
+ }
+
+ try
+ {
+
options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "-1337");
+ validateOptions(options);
+ fail(String.format("%Negative %s should be rejected",
DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY));
+ }
+ catch (ConfigurationException e)
+ {
+
options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "0");
+ }
+
+ options.put("bad_option", "1.0");
+ unvalidated = validateOptions(options);
+ assertTrue(unvalidated.containsKey("bad_option"));
+ }
+
+ @Test
+ public void testGetBuckets()
+ {
+ List<Pair<String, Long>> pairs = Lists.newArrayList(
+ Pair.create("a", 199L),
+ Pair.create("b", 299L),
+ Pair.create("a", 1L),
+ Pair.create("b", 201L)
+ );
+ List<List<String>> buckets = getBuckets(pairs, 100L, 2, 200L);
+ assertEquals(2, buckets.size());
+
+ for (List<String> bucket : buckets)
+ {
+ assertEquals(2, bucket.size());
+ assertEquals(bucket.get(0), bucket.get(1));
+ }
+
+
+ pairs = Lists.newArrayList(
+ Pair.create("a", 2000L),
+ Pair.create("b", 3600L),
+ Pair.create("a", 200L),
+ Pair.create("c", 3950L),
+ Pair.create("too new", 4125L),
+ Pair.create("b", 3899L),
+ Pair.create("c", 3900L)
+ );
+ buckets = getBuckets(pairs, 100L, 3, 4050L);
+ // targets (divPosition, size): (40, 100), (39, 100), (12, 300), (3,
900), (0, 2700)
+ // in other words: 0 - 2699, 2700 - 3599, 3600 - 3899, 3900 - 3999,
4000 - 4099
+ assertEquals(3, buckets.size());
+
+ for (List<String> bucket : buckets)
+ {
+ assertEquals(2, bucket.size());
+ assertEquals(bucket.get(0), bucket.get(1));
+ }
+
+
+ // Test base 1.
+ pairs = Lists.newArrayList(
+ Pair.create("a", 200L),
+ Pair.create("a", 299L),
+ Pair.create("b", 2000L),
+ Pair.create("b", 2014L),
+ Pair.create("c", 3610L),
+ Pair.create("c", 3690L),
+ Pair.create("d", 3898L),
+ Pair.create("d", 3899L),
+ Pair.create("e", 3900L),
+ Pair.create("e", 3950L),
+ Pair.create("too new", 4125L)
+ );
+ buckets = getBuckets(pairs, 100L, 1, 4050L);
+
+ assertEquals(5, buckets.size());
+
+ for (List<String> bucket : buckets)
+ {
+ assertEquals(2, bucket.size());
+ assertEquals(bucket.get(0), bucket.get(1));
+ }
+ }
+
+ @Test
+ public void testPrepBucket()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ // create 3 sstables
+ int numSSTables = 3;
+ for (int r = 0; r < numSSTables; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
- RowMutation rm = new RowMutation(KEYSPACE1, key.key);
- rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, r);
++ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
++ rm.add(CF_STANDARD1, Util.cellname("column"), value, r);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ }
+ cfs.forceBlockingFlush();
+
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+
+ List<SSTableReader> newBucket =
newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32);
+ assertTrue("nothing should be returned when all buckets are below the
min threshold", newBucket.isEmpty());
+
+ assertEquals("an sstable with a single value should have equal
min/max timestamps", sstrs.get(0).getMinTimestamp(),
sstrs.get(0).getMaxTimestamp());
+ assertEquals("an sstable with a single value should have equal
min/max timestamps", sstrs.get(1).getMinTimestamp(),
sstrs.get(1).getMaxTimestamp());
+ assertEquals("an sstable with a single value should have equal
min/max timestamps", sstrs.get(2).getMinTimestamp(),
sstrs.get(2).getMaxTimestamp());
+
+ // if we have more than the max threshold, the oldest should be
dropped
+ Collections.sort(sstrs, Collections.reverseOrder(new
Comparator<SSTableReader>() {
+ public int compare(SSTableReader o1, SSTableReader o2) {
+ return Long.compare(o1.getMinTimestamp(),
o2.getMinTimestamp()) ;
+ }
+ }));
+
+ List<SSTableReader> bucket = trimToThreshold(sstrs, 2);
+ assertEquals("one bucket should have been dropped", 2, bucket.size());
+ for (SSTableReader sstr : bucket)
+ assertFalse("the oldest sstable should be dropped",
sstr.getMinTimestamp() == 0);
+ }
+
+ @Test
+ public void testFilterOldSSTables()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ // create 3 sstables
+ int numSSTables = 3;
+ for (int r = 0; r < numSSTables; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
- RowMutation rm = new RowMutation(KEYSPACE1, key.key);
- rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, r);
++ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
++ rm.add(CF_STANDARD1, Util.cellname("column"), value, r);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ }
+ cfs.forceBlockingFlush();
+
+ Iterable<SSTableReader> filtered;
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+
+ filtered = filterOldSSTables(sstrs, 0, 2);
+ assertEquals("when maxSSTableAge is zero, no sstables should be
filtered", sstrs.size(), Iterables.size(filtered));
+
+ filtered = filterOldSSTables(sstrs, 1, 2);
+ assertEquals("only the newest 2 sstables should remain", 2,
Iterables.size(filtered));
+
+ filtered = filterOldSSTables(sstrs, 1, 3);
+ assertEquals("only the newest sstable should remain", 1,
Iterables.size(filtered));
+
+ filtered = filterOldSSTables(sstrs, 1, 4);
+ assertEquals("no sstables should remain when all are too old", 0,
Iterables.size(filtered));
+ }
+ }