Repository: cassandra Updated Branches: refs/heads/trunk 16499ca9b -> 8b3221a8f
Optimise max purgeable timestamp calculation in compaction patch by benedict; reviewed by marcus for CASSANDRA-8920 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b3221a8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b3221a8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b3221a8 Branch: refs/heads/trunk Commit: 8b3221a8fcdcafc78e4cee908768b9f8612df31e Parents: 16499ca Author: Benedict Elliott Smith <[email protected]> Authored: Tue Mar 31 17:32:18 2015 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Tue Mar 31 17:32:18 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/DataTracker.java | 7 +- .../db/compaction/CompactionController.java | 21 ++-- .../cassandra/utils/AsymmetricOrdering.java | 3 - .../org/apache/cassandra/utils/Interval.java | 2 + .../apache/cassandra/utils/OverlapIterator.java | 58 ++++++++++ .../cassandra/utils/OverlapIteratorTest.java | 114 +++++++++++++++++++ 7 files changed, 191 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 22bdc5e..e51be76 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Optimise max purgeable timestamp calculation in compaction (CASSANDRA-8920) * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670) * New tool added to validate all sstables in a node (CASSANDRA-5791) * Push notification when tracing completes for an operation (CASSANDRA-7807) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index dd1dc5a..6de0b2c 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -575,10 +575,15 @@ public class DataTracker public static SSTableIntervalTree buildIntervalTree(Iterable<SSTableReader> sstables) { + return new SSTableIntervalTree(buildIntervals(sstables)); + } + + public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables) + { List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables)); for (SSTableReader sstable : sstables) intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable)); - return new SSTableIntervalTree(intervals); + return intervals; } public Set<SSTableReader> getCompacting() http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 148b1b6..a49a3ea 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -17,24 +17,22 @@ */ package org.apache.cassandra.db.compaction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.utils.AlwaysPresentFilter; +import org.apache.cassandra.utils.OverlapIterator; import org.apache.cassandra.utils.concurrent.Refs; +import static org.apache.cassandra.db.DataTracker.buildIntervals; + /** * Manage compaction options. */ @@ -43,8 +41,8 @@ public class CompactionController implements AutoCloseable private static final Logger logger = LoggerFactory.getLogger(CompactionController.class); public final ColumnFamilyStore cfs; - private DataTracker.SSTableIntervalTree overlappingTree; private Refs<SSTableReader> overlappingSSTables; + private OverlapIterator<RowPosition, SSTableReader> overlapIterator; private final Iterable<SSTableReader> compacting; public final int gcBefore; @@ -84,7 +82,7 @@ public class CompactionController implements AutoCloseable overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList()); else overlappingSSTables = cfs.getAndReferenceOverlappingSSTables(compacting); - this.overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables); + this.overlapIterator = new OverlapIterator<>(buildIntervals(overlappingSSTables)); } public Set<SSTableReader> getFullyExpiredSSTables() @@ -170,9 +168,9 @@ public class CompactionController implements AutoCloseable */ public long maxPurgeableTimestamp(DecoratedKey key) { - List<SSTableReader> filteredSSTables = overlappingTree.search(key); long min = Long.MAX_VALUE; - for (SSTableReader sstable : filteredSSTables) + overlapIterator.update(key); + for (SSTableReader sstable : overlapIterator.overlaps()) { // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing), // we check index file instead. @@ -193,4 +191,5 @@ public class CompactionController implements AutoCloseable { overlappingSSTables.release(); } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java b/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java index ed8c99f..74597f5 100644 --- a/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java +++ b/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java @@ -93,7 +93,6 @@ public abstract class AsymmetricOrdering<T1, T2> extends Ordering<T1> // { a[m] >= v ==> a[ub] >= v ==> a[lb] < v ^ a[ub] >= v } // { a[m] < v ==> a[lb] < v ==> a[lb] < v ^ a[ub] >= v } } - throw new IllegalStateException(); } @@ -116,8 +115,6 @@ public abstract class AsymmetricOrdering<T1, T2> extends Ordering<T1> throw new IllegalStateException(); } - - private class Reversed extends AsymmetricOrdering<T1, T2> { public int compareAsymmetric(T1 left, T2 right) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/utils/Interval.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Interval.java b/src/java/org/apache/cassandra/utils/Interval.java index 9398144..335ef27 100644 --- a/src/java/org/apache/cassandra/utils/Interval.java +++ b/src/java/org/apache/cassandra/utils/Interval.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.utils; +import java.util.Comparator; + import com.google.common.base.Objects; public class Interval<C, D> http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/utils/OverlapIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/OverlapIterator.java b/src/java/org/apache/cassandra/utils/OverlapIterator.java new file mode 100644 index 0000000..bc43742 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/OverlapIterator.java @@ -0,0 +1,58 @@ +package org.apache.cassandra.utils; + +import java.util.*; + +import org.apache.cassandra.utils.AsymmetricOrdering.Op; + +/** + * A class for iterating sequentially through an ordered collection and efficiently + * finding the overlapping set of matching intervals. + * + * The algorithm is quite simple: the intervals are sorted ascending by both min and max + * in two separate lists. These lists are walked forwards each time we visit a new point, + * with the set of intervals in the min-ordered list being added to our set of overlaps, + * and those in the max-ordered list being removed. + */ +public class OverlapIterator<I extends Comparable<? super I>, V> +{ + // indexing into sortedByMin, tracks the next interval to include + int nextToInclude; + final List<Interval<I, V>> sortedByMin; + // indexing into sortedByMax, tracks the next interval to exclude + int nextToExclude; + final List<Interval<I, V>> sortedByMax; + final Set<V> overlaps = new HashSet<>(); + final Set<V> accessible = Collections.unmodifiableSet(overlaps); + + I mostRecent; + + public OverlapIterator(Collection<Interval<I, V>> intervals) + { + sortedByMax = new ArrayList<>(intervals); + Collections.sort(sortedByMax, Interval.<I, V>maxOrdering()); + // we clone after first sorting by max; this is quite likely to make sort cheaper, since a.max < b.max + // generally increases the likelihood that a.min < b.min, so the list may be partially sorted already. + // this also means if (in future) we sort either collection (or a subset thereof) by the other's comparator + // all items, including equal, will occur in the same order, including + sortedByMin = new ArrayList<>(sortedByMax); + Collections.sort(sortedByMin, Interval.<I, V>minOrdering()); + } + + // move the iterator forwards to the overlaps matching point + public void update(I point) + { + // we don't use binary search here since we expect points to be a superset of the min/max values + mostRecent = point; + // add those we are now after the start of + while (nextToInclude < sortedByMin.size() && sortedByMin.get(nextToInclude).min.compareTo(point) <= 0) + overlaps.add(sortedByMin.get(nextToInclude++).data); + // remove those we are now after the end of + while (nextToExclude < sortedByMax.size() && sortedByMax.get(nextToExclude).max.compareTo(point) < 0) + overlaps.remove(sortedByMax.get(nextToExclude++).data); + } + + public Set<V> overlaps() + { + return accessible; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/test/unit/org/apache/cassandra/utils/OverlapIteratorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/OverlapIteratorTest.java b/test/unit/org/apache/cassandra/utils/OverlapIteratorTest.java new file mode 100644 index 0000000..5bbe267 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/OverlapIteratorTest.java @@ -0,0 +1,114 @@ +package org.apache.cassandra.utils; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.Test; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class OverlapIteratorTest +{ + + private static List<Interval<Integer, Integer>> randomIntervals(int range, int increment, int count) + { + List<Integer> a = random(range, increment, count); + List<Integer> b = random(range, increment, count); + List<Interval<Integer, Integer>> r = new ArrayList<>(); + for (int i = 0 ; i < count ; i++) + { + r.add(a.get(i) < b.get(i) ? Interval.create(a.get(i), b.get(i), i) + : Interval.create(b.get(i), a.get(i), i)); + } + return r; + } + + private static List<Integer> random(int range, int increment, int count) + { + List<Integer> random = new ArrayList<>(); + for (int i = 0 ; i < count ; i++) + { + int base = i * increment; + random.add(ThreadLocalRandom.current().nextInt(base, base + range)); + } + return random; + } + + @Test + public void test() + { + for (int i = 0 ; i < 10 ; i++) + { + test(1000, 0, 1000); + test(100000, 100, 1000); + test(1000000, 0, 1000); + } + } + + private void test(int range, int increment, int count) + { + compare(randomIntervals(range, increment, count), random(range, increment, count), 1); + compare(randomIntervals(range, increment, count), random(range, increment, count), 2); + compare(randomIntervals(range, increment, count), random(range, increment, count), 3); + } + + private <I extends Comparable<I>, V> void compare(List<Interval<I, V>> intervals, List<I> points, int initCount) + { + Collections.sort(points); + IntervalTree<I, V, Interval<I, V>> tree = IntervalTree.build(intervals); + OverlapIterator<I, V> iter = new OverlapIterator<>(intervals); + int initPoint = points.size() / initCount; + int i = 0; + for (I point : points) + { + if (i++ == initPoint) + iter = new OverlapIterator<>(intervals); + iter.update(point); + TreeSet<V> act = new TreeSet<>(iter.overlaps); + TreeSet<V> exp = new TreeSet<>(tree.search(point)); + TreeSet<V> extra = new TreeSet<>(act); + extra.removeAll(exp); + TreeSet<V> missing = new TreeSet<>(exp); + missing.removeAll(act); + assertTrue(extra.isEmpty()); + assertTrue(missing.isEmpty()); + } + } + +}
