Add mistakenly forgotten files for CASSANDRA-9258
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e0c1b0bb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e0c1b0bb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e0c1b0bb Branch: refs/heads/cassandra-3.0 Commit: e0c1b0bb7121df1cc0185ffc0b35547f75daa281 Parents: 6ff1cbb Author: Sylvain Lebresne <[email protected]> Authored: Tue Jan 5 15:26:54 2016 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Tue Jan 5 15:26:54 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/locator/PendingRangeMaps.java | 209 +++++++++++++++++++ .../test/microbench/PendingRangesBench.java | 89 ++++++++ .../cassandra/locator/PendingRangeMapsTest.java | 78 +++++++ 4 files changed, 377 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0c1b0bb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3c919c7..648200b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.5 + * Optimize pending range computation (CASSANDRA-9258) * Skip commit log and saved cache directories in SSTable version startup check (CASSANDRA-10902) * drop/alter user should be case sensitive (CASSANDRA-10817) * jemalloc detection fails due to quoting issues in regexv (CASSANDRA-10946) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0c1b0bb/src/java/org/apache/cassandra/locator/PendingRangeMaps.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java new file mode 100644 index 0000000..1892cc3 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java @@ -0,0 +1,209 @@ +package org.apache.cassandra.locator; + +import com.google.common.collect.Iterators; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.*; + +public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddress>>> +{ + private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class); + + /** + * We have for NavigableMap to be able to search for ranges containing a token efficiently. + * + * First two are for non-wrap-around ranges, and the last two are for wrap-around ranges. + */ + // ascendingMap will sort the ranges by the ascending order of right token + final NavigableMap<Range<Token>, List<InetAddress>> ascendingMap; + /** + * sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will + * come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap. + */ + static final Comparator<Range<Token>> ascendingComparator = new Comparator<Range<Token>>() + { + @Override + public int compare(Range<Token> o1, Range<Token> o2) + { + int res = o1.right.compareTo(o2.right); + if (res != 0) + return res; + + return o2.left.compareTo(o1.left); + } + }; + + // ascendingMap will sort the ranges by the descending order of left token + final NavigableMap<Range<Token>, List<InetAddress>> descendingMap; + /** + * sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will + * come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap. + */ + static final Comparator<Range<Token>> descendingComparator = new Comparator<Range<Token>>() + { + @Override + public int compare(Range<Token> o1, Range<Token> o2) + { + int res = o2.left.compareTo(o1.left); + if (res != 0) + return res; + + // if left tokens are same, sort by the descending of the right tokens. + return o2.right.compareTo(o1.right); + } + }; + + // these two maps are for warp around ranges. + final NavigableMap<Range<Token>, List<InetAddress>> ascendingMapForWrapAround; + /** + * for wrap around range (begin, end], which begin > end. + * Sorting end ascending, if ends are same, sorting begin ascending, + * so that token (end, end) will come before (begin, end] with the same end, and (begin, end] will be selected in + * the tailMap. + */ + static final Comparator<Range<Token>> ascendingComparatorForWrapAround = new Comparator<Range<Token>>() + { + @Override + public int compare(Range<Token> o1, Range<Token> o2) + { + int res = o1.right.compareTo(o2.right); + if (res != 0) + return res; + + return o1.left.compareTo(o2.left); + } + }; + + final NavigableMap<Range<Token>, List<InetAddress>> descendingMapForWrapAround; + /** + * for wrap around ranges, which begin > end. + * Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin, + * and (begin, end) won't be selected in the tailMap. + */ + static final Comparator<Range<Token>> descendingComparatorForWrapAround = new Comparator<Range<Token>>() + { + @Override + public int compare(Range<Token> o1, Range<Token> o2) + { + int res = o2.left.compareTo(o1.left); + if (res != 0) + return res; + return o1.right.compareTo(o2.right); + } + }; + + public PendingRangeMaps() + { + this.ascendingMap = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparator); + this.descendingMap = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparator); + this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparatorForWrapAround); + this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparatorForWrapAround); + } + + static final void addToMap(Range<Token> range, + InetAddress address, + NavigableMap<Range<Token>, List<InetAddress>> ascendingMap, + NavigableMap<Range<Token>, List<InetAddress>> descendingMap) + { + List<InetAddress> addresses = ascendingMap.get(range); + if (addresses == null) + { + addresses = new ArrayList<InetAddress>(1); + ascendingMap.put(range, addresses); + descendingMap.put(range, addresses); + } + addresses.add(address); + } + + public void addPendingRange(Range<Token> range, InetAddress address) + { + if (Range.isWrapAround(range.left, range.right)) + { + addToMap(range, address, ascendingMapForWrapAround, descendingMapForWrapAround); + } + else + { + addToMap(range, address, ascendingMap, descendingMap); + } + } + + static final void addIntersections(Set<InetAddress> endpointsToAdd, + NavigableMap<Range<Token>, List<InetAddress>> smallerMap, + NavigableMap<Range<Token>, List<InetAddress>> biggerMap) + { + // find the intersection of two sets + for (Range<Token> range : smallerMap.keySet()) + { + List<InetAddress> addresses = biggerMap.get(range); + if (addresses != null) + { + endpointsToAdd.addAll(addresses); + } + } + } + + public Collection<InetAddress> pendingEndpointsFor(Token token) + { + Set<InetAddress> endpoints = new HashSet<>(); + + Range searchRange = new Range(token, token); + + // search for non-wrap-around maps + NavigableMap<Range<Token>, List<InetAddress>> ascendingTailMap = ascendingMap.tailMap(searchRange, true); + NavigableMap<Range<Token>, List<InetAddress>> descendingTailMap = descendingMap.tailMap(searchRange, false); + + // add intersections of two maps + if (ascendingTailMap.size() < descendingTailMap.size()) + { + addIntersections(endpoints, ascendingTailMap, descendingTailMap); + } + else + { + addIntersections(endpoints, descendingTailMap, ascendingTailMap); + } + + // search for wrap-around sets + ascendingTailMap = ascendingMapForWrapAround.tailMap(searchRange, true); + descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false); + + // add them since they are all necessary. + for (Map.Entry<Range<Token>, List<InetAddress>> entry : ascendingTailMap.entrySet()) + { + endpoints.addAll(entry.getValue()); + } + for (Map.Entry<Range<Token>, List<InetAddress>> entry : descendingTailMap.entrySet()) + { + endpoints.addAll(entry.getValue()); + } + + return endpoints; + } + + public String printPendingRanges() + { + StringBuilder sb = new StringBuilder(); + + for (Map.Entry<Range<Token>, List<InetAddress>> entry : this) + { + Range<Token> range = entry.getKey(); + + for (InetAddress address : entry.getValue()) + { + sb.append(address).append(':').append(range); + sb.append(System.getProperty("line.separator")); + } + } + + return sb.toString(); + } + + @Override + public Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator() + { + return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0c1b0bb/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java new file mode 100644 index 0000000..e50cbaf --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java @@ -0,0 +1,89 @@ +package org.apache.cassandra.test.microbench; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.PendingRangeMaps; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 50, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 3,jvmArgsAppend = "-Xmx512M") +@Threads(1) +@State(Scope.Benchmark) +public class PendingRangesBench +{ + PendingRangeMaps pendingRangeMaps; + int maxToken = 256 * 100; + + Multimap<Range<Token>, InetAddress> oldPendingRanges; + + private Range<Token> genRange(String left, String right) + { + return new Range<Token>(new RandomPartitioner.BigIntegerToken(left), new RandomPartitioner.BigIntegerToken(right)); + } + + @Setup + public void setUp() throws UnknownHostException + { + pendingRangeMaps = new PendingRangeMaps(); + oldPendingRanges = HashMultimap.create(); + + InetAddress[] addresses = {InetAddress.getByName("127.0.0.1"), InetAddress.getByName("127.0.0.2")}; + + for (int i = 0; i < maxToken; i++) + { + for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++) + { + Range<Token> range = genRange(Integer.toString(i * 10 + 5), Integer.toString(i * 10 + 15)); + pendingRangeMaps.addPendingRange(range, addresses[j]); + oldPendingRanges.put(range, addresses[j]); + } + } + + // add the wrap around range + for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++) + { + Range<Token> range = genRange(Integer.toString(maxToken * 10 + 5), Integer.toString(5)); + pendingRangeMaps.addPendingRange(range, addresses[j]); + oldPendingRanges.put(range, addresses[j]); + } + } + + @Benchmark + public void searchToken(final Blackhole bh) + { + int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5); + Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken)); + bh.consume(pendingRangeMaps.pendingEndpointsFor(searchToken)); + } + + @Benchmark + public void searchTokenForOldPendingRanges(final Blackhole bh) + { + int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5); + Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken)); + Set<InetAddress> endpoints = new HashSet<>(); + for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : oldPendingRanges.asMap().entrySet()) + { + if (entry.getKey().contains(searchToken)) + endpoints.addAll(entry.getValue()); + } + bh.consume(endpoints); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0c1b0bb/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java new file mode 100644 index 0000000..6d24447 --- /dev/null +++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java @@ -0,0 +1,78 @@ +package org.apache.cassandra.locator; + +import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PendingRangeMapsTest { + + private Range<Token> genRange(String left, String right) + { + return new Range<Token>(new BigIntegerToken(left), new BigIntegerToken(right)); + } + + @Test + public void testPendingEndpoints() throws UnknownHostException + { + PendingRangeMaps pendingRangeMaps = new PendingRangeMaps(); + + pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1")); + pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2")); + pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3")); + pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4")); + pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5")); + pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6")); + + assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size()); + assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size()); + assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size()); + + Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")); + assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1"))); + } + + @Test + public void testWrapAroundRanges() throws UnknownHostException + { + PendingRangeMaps pendingRangeMaps = new PendingRangeMaps(); + + pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1")); + pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2")); + pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3")); + pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4")); + pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5")); + pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6")); + pendingRangeMaps.addPendingRange(genRange("65", "7"), InetAddress.getByName("127.0.0.7")); + + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size()); + assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("7")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size()); + assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size()); + assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size()); + + Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6")); + assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1"))); + assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.7"))); + } +}
