Repository: cassandra Updated Branches: refs/heads/trunk 467068d1e -> e645b9172
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/locator/RangesByEndpoint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/RangesByEndpoint.java b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java index 698b133..1a71141 100644 --- a/src/java/org/apache/cassandra/locator/RangesByEndpoint.java +++ b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java @@ -19,9 +19,11 @@ package org.apache.cassandra.locator; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class RangesByEndpoint extends ReplicaMultimap<InetAddressAndPort, RangesAtEndpoint> @@ -37,17 +39,19 @@ public class RangesByEndpoint extends ReplicaMultimap<InetAddressAndPort, Ranges return map.getOrDefault(endpoint, RangesAtEndpoint.empty(endpoint)); } - public static class Mutable extends ReplicaMultimap.Mutable<InetAddressAndPort, RangesAtEndpoint.Mutable> + public static class Builder extends ReplicaMultimap.Builder<InetAddressAndPort, RangesAtEndpoint.Builder> { @Override - protected RangesAtEndpoint.Mutable newMutable(InetAddressAndPort endpoint) + protected RangesAtEndpoint.Builder newBuilder(InetAddressAndPort endpoint) { - return new RangesAtEndpoint.Mutable(endpoint); + return new RangesAtEndpoint.Builder(endpoint); } - public RangesByEndpoint asImmutableView() + public RangesByEndpoint build() { - return new RangesByEndpoint(Collections.unmodifiableMap(Maps.transformValues(map, RangesAtEndpoint.Mutable::asImmutableView))); + return new RangesByEndpoint( + ImmutableMap.copyOf( + Maps.transformValues(this.map, RangesAtEndpoint.Builder::build))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/locator/ReplicaCollection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java index d1006dc..d870316 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java +++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java @@ -18,8 +18,6 @@ package org.apache.cassandra.locator; -import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; - import java.util.Comparator; import java.util.Iterator; import java.util.Set; @@ -69,28 +67,39 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera /** * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate. * An effort will be made to either return ourself, or a subList, where possible. - * It is guaranteed that no changes to any upstream Mutable will affect the state of the result. + * It is guaranteed that no changes to any upstream Builder will affect the state of the result. */ public abstract C filter(Predicate<Replica> predicate); /** * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate. * An effort will be made to either return ourself, or a subList, where possible. - * It is guaranteed that no changes to any upstream Mutable will affect the state of the result. + * It is guaranteed that no changes to any upstream Builder will affect the state of the result. * Only the first maxSize items will be returned. */ public abstract C filter(Predicate<Replica> predicate, int maxSize); /** + * @return a *lazily constructed* Iterable over this collection, containing the Replica that match the provided predicate. + */ + public abstract Iterable<Replica> filterLazily(Predicate<Replica> predicate); + + /** + * @return a *lazily constructed* Iterable over this collection, containing the Replica that match the provided predicate. + * Only the first maxSize matching items will be returned. + */ + public abstract Iterable<Replica> filterLazily(Predicate<Replica> predicate, int maxSize); + + /** * @return an *eagerly constructed* copy of this collection containing the Replica at positions [start..end); * An effort will be made to either return ourself, or a subList, where possible. - * It is guaranteed that no changes to any upstream Mutable will affect the state of the result. + * It is guaranteed that no changes to any upstream Builder will affect the state of the result. */ public abstract C subList(int start, int end); /** * @return an *eagerly constructed* copy of this collection containing the Replica re-ordered according to this comparator - * It is guaranteed that no changes to any upstream Mutable will affect the state of the result. + * It is guaranteed that no changes to any upstream Builder will affect the state of the result. */ public abstract C sorted(Comparator<Replica> comparator); @@ -102,23 +111,18 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera public abstract String toString(); /** - * A mutable extension of a ReplicaCollection. This is append-only, so it is safe to select a subList, - * or at any time take an asImmutableView() snapshot. + * A mutable (append-only) extension of a ReplicaCollection. + * All methods besides add() will return an immutable snapshot of the collection, or the matching items. */ - public interface Mutable<C extends ReplicaCollection<C>> extends ReplicaCollection<C> + public interface Builder<C extends ReplicaCollection<C>> extends ReplicaCollection<C> { /** - * @return an Immutable clone that mirrors any modifications to this Mutable instance. - */ - C asImmutableView(); - - /** - * @return an Immutable clone that assumes this Mutable will never be modified again, + * @return an Immutable clone that assumes this Builder will never be modified again, * so its contents can be reused. * - * This Mutable should enforce that it is no longer modified. + * This Builder should enforce that it is no longer modified. */ - C asSnapshot(); + public C build(); /** * Passed to add() and addAll() as ignoreConflicts parameter. The meaning of conflict varies by collection type @@ -138,41 +142,23 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera * @param replica add this replica to the end of the collection * @param ignoreConflict conflicts to ignore, see {@link Conflict} */ - void add(Replica replica, Conflict ignoreConflict); + Builder<C> add(Replica replica, Conflict ignoreConflict); - default public void add(Replica replica) + default public Builder<C> add(Replica replica) { - add(replica, Conflict.NONE); + return add(replica, Conflict.NONE); } - default public void addAll(Iterable<Replica> replicas, Conflict ignoreConflicts) + default public Builder<C> addAll(Iterable<Replica> replicas, Conflict ignoreConflicts) { for (Replica replica : replicas) add(replica, ignoreConflicts); + return this; } - default public void addAll(Iterable<Replica> replicas) - { - addAll(replicas, Conflict.NONE); - } - } - - public static class Builder<C extends ReplicaCollection<C>, M extends Mutable<C>, B extends Builder<C, M, B>> - { - Mutable<C> mutable; - public Builder(Mutable<C> mutable) { this.mutable = mutable; } - - public int size() { return mutable.size(); } - public B add(Replica replica) { mutable.add(replica); return (B) this; } - public B add(Replica replica, Conflict ignoreConflict) { mutable.add(replica, ignoreConflict); return (B) this; } - public B addAll(Iterable<Replica> replica) { mutable.addAll(replica); return (B) this; } - public B addAll(Iterable<Replica> replica, Conflict ignoreConflict) { mutable.addAll(replica, ignoreConflict); return (B) this; } - - public C build() + default public Builder<C> addAll(Iterable<Replica> replicas) { - C result = mutable.asSnapshot(); - mutable = null; - return result; + return addAll(replicas, Conflict.NONE); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/locator/ReplicaLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java index f48c989..cba4f68 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java +++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java @@ -193,6 +193,7 @@ public abstract class ReplicaLayout<E extends Endpoints<E>> */ public static ReplicaLayout.ForTokenWrite forTokenWriteLiveAndDown(Keyspace keyspace, Token token) { + // TODO: these should be cached, not the natural replicas // TODO: race condition to fetch these. implications?? EndpointsForToken natural = keyspace.getReplicationStrategy().getNaturalReplicasForToken(token); EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspace.getName()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/locator/ReplicaMultimap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaMultimap.java b/src/java/org/apache/cassandra/locator/ReplicaMultimap.java index 3e3fcb4..5a8551a 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaMultimap.java +++ b/src/java/org/apache/cassandra/locator/ReplicaMultimap.java @@ -25,9 +25,11 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Stream; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; +@VisibleForTesting public abstract class ReplicaMultimap<K, C extends ReplicaCollection<?>> { final Map<K, C> map; @@ -37,23 +39,29 @@ public abstract class ReplicaMultimap<K, C extends ReplicaCollection<?>> } public abstract C get(K key); - public C getIfPresent(K key) { return map.get(key); } - public static abstract class Mutable - <K, MutableCollection extends ReplicaCollection.Mutable<?>> - extends ReplicaMultimap<K, MutableCollection> + public static abstract class Builder + <K, B extends ReplicaCollection.Builder<?>> + { - protected abstract MutableCollection newMutable(K key); + protected abstract B newBuilder(K key); - Mutable() + final Map<K, B> map; + Builder() { - super(new HashMap<>()); + this.map = new HashMap<>(); + } + + public B get(K key) + { + Preconditions.checkNotNull(key); + return map.computeIfAbsent(key, k -> newBuilder(key)); } - public MutableCollection get(K key) + public B getIfPresent(K key) { Preconditions.checkNotNull(key); - return map.computeIfAbsent(key, k -> newMutable(key)); + return map.get(key); } public void put(K key, Replica replica) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/locator/ReplicaPlans.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index 87f3c09..5551e5f 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -41,8 +41,8 @@ import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.FBUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +57,6 @@ import java.util.function.Function; import java.util.function.Predicate; import static com.google.common.collect.Iterables.any; -import static com.google.common.collect.Iterables.filter; -import static com.google.common.collect.Iterables.limit; import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM; import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumFor; import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor; @@ -170,7 +168,6 @@ public class ReplicaPlans } } - /** * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive. */ @@ -390,16 +387,16 @@ public class ReplicaPlans assert consistencyLevel != EACH_QUORUM; - ReplicaCollection.Mutable<E> contacts = liveAndDown.all().newMutable(liveAndDown.all().size()); - contacts.addAll(filter(liveAndDown.natural(), Replica::isFull)); + ReplicaCollection.Builder<E> contacts = liveAndDown.all().newBuilder(liveAndDown.all().size()); + contacts.addAll(liveAndDown.natural().filterLazily(Replica::isFull)); contacts.addAll(liveAndDown.pending()); // TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all) int liveCount = contacts.count(live.all()::contains); int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount; if (requiredTransientCount > 0) - contacts.addAll(limit(filter(live.natural(), Replica::isTransient), requiredTransientCount)); - return contacts.asSnapshot(); + contacts.addAll(live.natural().filterLazily(Replica::isTransient, requiredTransientCount)); + return contacts.build(); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/locator/SimpleStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java index 7a000b7..2dd835c 100644 --- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java +++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java @@ -20,9 +20,11 @@ package org.apache.cassandra.locator; import java.util.ArrayList; import java.util.Collections; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.Map; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.Token; @@ -55,17 +57,21 @@ public class SimpleStrategy extends AbstractReplicationStrategy Range<Token> replicaRange = new Range<>(replicaStart, replicaEnd); Iterator<Token> iter = TokenMetadata.ringIterator(ring, token, false); - EndpointsForRange.Builder replicas = EndpointsForRange.builder(replicaRange, rf.allReplicas); + EndpointsForRange.Builder replicas = new EndpointsForRange.Builder(replicaRange, rf.allReplicas); // Add the token at the index by default while (replicas.size() < rf.allReplicas && iter.hasNext()) { Token tk = iter.next(); InetAddressAndPort ep = metadata.getEndpoint(tk); - if (!replicas.containsEndpoint(ep)) + if (!replicas.endpoints().contains(ep)) replicas.add(new Replica(ep, replicaRange, replicas.size() < rf.fullReplicas)); } - return replicas.build(); + + // group endpoints by DC, so that we can cheaply filter them to a given DC + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + return replicas.build() + .sorted(Comparator.comparing(r -> snitch.getDatacenter(r.endpoint()))); } public ReplicationFactor getReplicationFactor() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index ad40d7b..f95ee0c 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -27,7 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; -import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; +import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -737,18 +737,18 @@ public class TokenMetadata public EndpointsByRange getPendingRangesMM(String keyspaceName) { - EndpointsByRange.Mutable byRange = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder byRange = new EndpointsByRange.Builder(); PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); if (pendingRangeMaps != null) { - for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : pendingRangeMaps) + for (Map.Entry<Range<Token>, EndpointsForRange.Builder> entry : pendingRangeMaps) { byRange.putAll(entry.getKey(), entry.getValue(), Conflict.ALL); } } - return byRange.asImmutableView(); + return byRange.build(); } /** a mutable map may be returned but caller should not modify it */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/service/RangeRelocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeRelocator.java b/src/java/org/apache/cassandra/service/RangeRelocator.java index 839a34c..b63c105 100644 --- a/src/java/org/apache/cassandra/service/RangeRelocator.java +++ b/src/java/org/apache/cassandra/service/RangeRelocator.java @@ -114,7 +114,7 @@ public class RangeRelocator TokenMetadata tmdBefore, TokenMetadata tmdAfter) { - RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable(); + RangesByEndpoint.Builder endpointRanges = new RangesByEndpoint.Builder(); for (Replica toStream : streamRanges) { //If the range we are sending is full only send it to the new full replica @@ -158,7 +158,7 @@ public class RangeRelocator } } } - return endpointRanges.asImmutableView(); + return endpointRanges.build(); } public void calculateToFromStreams() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index caa732a..abc189e 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -47,7 +47,7 @@ import com.google.common.collect.*; import com.google.common.util.concurrent.*; import org.apache.cassandra.dht.RangeStreamer.FetchReplica; -import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; +import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -2975,7 +2975,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (temp.isMember(endpoint)) temp.removeEndpoint(endpoint); - EndpointsByReplica.Mutable changedRanges = new EndpointsByReplica.Mutable(); + EndpointsByReplica.Builder changedRanges = new EndpointsByReplica.Builder(); // Go through the ranges and for each range check who will be // storing replicas for these ranges when the leaving endpoint @@ -3011,7 +3011,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE changedRanges.putAll(replica, newReplicaEndpoints, Conflict.NONE); } - return changedRanges.asImmutableView(); + return changedRanges.build(); } public void onJoin(InetAddressAndPort endpoint, EndpointState epState) @@ -3717,7 +3717,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } else { - Iterables.addAll(option.getRanges(), getLocalReplicas(keyspace).filter(Replica::isFull).ranges()); + Iterables.addAll(option.getRanges(), getLocalReplicas(keyspace).onlyFull().ranges()); } } if (option.getRanges().isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor().allReplicas < 2) @@ -5028,7 +5028,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Map<InetAddressAndPort, Set<Range<Token>>> transferredRangePerKeyspace = SystemKeyspace.getTransferredRanges("Unbootstrap", keyspace, StorageService.instance.getTokenMetadata().partitioner); - RangesByEndpoint.Mutable replicasPerEndpoint = new RangesByEndpoint.Mutable(); + RangesByEndpoint.Builder replicasPerEndpoint = new RangesByEndpoint.Builder(); for (Map.Entry<Replica, Replica> endPointEntry : rangesWithEndpoints.flattenEntries()) { Replica local = endPointEntry.getKey(); @@ -5043,7 +5043,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE replicasPerEndpoint.put(remote.endpoint(), remote.decorateSubrange(local.range())); } - sessionsToStreamByKeyspace.put(keyspace, replicasPerEndpoint.asImmutableView()); + sessionsToStreamByKeyspace.put(keyspace, replicasPerEndpoint.build()); } StreamPlan streamPlan = new StreamPlan(StreamOperation.DECOMMISSION); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 8f7bc26..174ed7b 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -117,26 +117,26 @@ public abstract class AbstractReadExecutor makeRequests(command, replicas); } - protected void makeTransientDataRequests(ReplicaCollection<?> replicas) + protected void makeTransientDataRequests(Iterable<Replica> replicas) { makeRequests(command.copyAsTransientQuery(replicas), replicas); } - protected void makeDigestRequests(ReplicaCollection<?> replicas) + protected void makeDigestRequests(Iterable<Replica> replicas) { assert all(replicas, Replica::isFull); // only send digest requests to full replicas, send data requests instead to the transient replicas makeRequests(command.copyAsDigestQuery(replicas), replicas); } - private void makeRequests(ReadCommand readCommand, ReplicaCollection<?> replicas) + private void makeRequests(ReadCommand readCommand, Iterable<Replica> replicas) { boolean hasLocalEndpoint = false; - Preconditions.checkArgument(replicas.stream().allMatch(replica -> replica.isFull() || !readCommand.isDigestQuery()), - "Can not send digest requests to transient replicas"); for (Replica replica: replicas) { + assert replica.isFull() || readCommand.acceptsTransient(); + InetAddressAndPort endpoint = replica.endpoint(); if (replica.isSelf()) { @@ -172,8 +172,8 @@ public abstract class AbstractReadExecutor EndpointsForToken selected = replicaPlan().contacts(); EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, initialDataRequestCount); makeFullDataRequests(fullDataRequests); - makeTransientDataRequests(selected.filter(Replica::isTransient)); - makeDigestRequests(selected.filter(r -> r.isFull() && !fullDataRequests.contains(r))); + makeTransientDataRequests(selected.filterLazily(Replica::isTransient)); + makeDigestRequests(selected.filterLazily(r -> r.isFull() && !fullDataRequests.contains(r))); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index f514ea6..597879f 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -155,8 +155,8 @@ public class AntiCompactionTest SSTableStats stats = new SSTableStats(); stats.numLiveSSTables = store.getLiveSSTables().size(); - Predicate<Token> fullContains = t -> Iterables.any(ranges.fullRanges(), r -> r.contains(t)); - Predicate<Token> transContains = t -> Iterables.any(ranges.transientRanges(), r -> r.contains(t)); + Predicate<Token> fullContains = t -> Iterables.any(ranges.onlyFull().ranges(), r -> r.contains(t)); + Predicate<Token> transContains = t -> Iterables.any(ranges.onlyTransient().ranges(), r -> r.contains(t)); for (SSTableReader sstable : store.getLiveSSTables()) { assertFalse(sstable.isRepaired()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java index cee4bb9..e2b09bc 100644 --- a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java @@ -36,7 +36,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaUtils; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -81,14 +80,14 @@ public class RangeFetchMapCalculatorTest @Test public void testWithSingleSource() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.4"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.5"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -98,14 +97,14 @@ public class RangeFetchMapCalculatorTest @Test public void testWithNonOverlappingSource() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.4"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.5", "127.0.0.6"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.7", "127.0.0.8"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.9", "127.0.0.10"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -115,12 +114,12 @@ public class RangeFetchMapCalculatorTest @Test public void testWithRFThreeReplacement() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2", "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3", "127.0.0.4"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -131,14 +130,14 @@ public class RangeFetchMapCalculatorTest @Test public void testForMultipleRoundsComputation() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.3", "127.0.0.2"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -153,14 +152,14 @@ public class RangeFetchMapCalculatorTest @Test public void testForMultipleRoundsComputationWithLocalHost() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1", "127.0.0.2"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -173,14 +172,14 @@ public class RangeFetchMapCalculatorTest @Test public void testForEmptyGraph() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); //All ranges map to local host so we will not stream anything. assertTrue(map.isEmpty()); @@ -189,7 +188,7 @@ public class RangeFetchMapCalculatorTest @Test public void testWithNoSourceWithLocal() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.5"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); @@ -218,7 +217,7 @@ public class RangeFetchMapCalculatorTest } }; - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(filter), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Arrays.asList(filter), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -233,7 +232,7 @@ public class RangeFetchMapCalculatorTest @Test (expected = IllegalStateException.class) public void testWithNoLiveSource() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.5"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); @@ -251,19 +250,19 @@ public class RangeFetchMapCalculatorTest } }; - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(allDeadFilter), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Arrays.asList(allDeadFilter), "Test"); calculator.getRangeFetchMap(); } @Test public void testForLocalDC() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.3", "127.0.0.53"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1", "127.0.0.3", "127.0.0.57"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59", "127.0.0.61"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), new ArrayList<>(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), new ArrayList<>(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); Assert.assertEquals(2, map.asMap().size()); @@ -276,7 +275,7 @@ public class RangeFetchMapCalculatorTest @Test public void testForRemoteDC() throws Exception { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59"); @@ -305,7 +304,7 @@ public class RangeFetchMapCalculatorTest } }; - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(localHostFilter), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Arrays.asList(localHostFilter), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); Assert.assertEquals(3, map.asMap().size()); @@ -319,14 +318,14 @@ public class RangeFetchMapCalculatorTest @Test public void testTrivialRanges() throws UnknownHostException { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); // add non-trivial ranges addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59"); // and a trivial one: addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges(); Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap); assertTrue(trivialMap.get(InetAddressAndPort.getByName("127.0.0.3")).contains(generateTrivialRange(1,10)) ^ @@ -337,7 +336,7 @@ public class RangeFetchMapCalculatorTest @Test(expected = IllegalStateException.class) public void testNotEnoughEndpointsForTrivialRange() throws UnknownHostException { - EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); + EndpointsByRange.Builder rangesWithSources = new EndpointsByRange.Builder(); // add non-trivial ranges addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55"); @@ -366,7 +365,7 @@ public class RangeFetchMapCalculatorTest return "Not 127.0.0.3"; } }; - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.singleton(filter), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.build(), Collections.singleton(filter), "Test"); Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges(); Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap); @@ -378,7 +377,7 @@ public class RangeFetchMapCalculatorTest assertTrue(result.containsAll(expected)); } - private void validateRange(EndpointsByRange.Mutable rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> result) + private void validateRange(EndpointsByRange.Builder rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> result) { for (Map.Entry<InetAddressAndPort, Range<Token>> entry : result.entries()) { @@ -386,7 +385,7 @@ public class RangeFetchMapCalculatorTest } } - private void addNonTrivialRangeAndSources(EndpointsByRange.Mutable rangesWithSources, int left, int right, String... hosts) throws UnknownHostException + private void addNonTrivialRangeAndSources(EndpointsByRange.Builder rangesWithSources, int left, int right, String... hosts) throws UnknownHostException { for (InetAddressAndPort endpoint : makeAddrs(hosts)) { @@ -395,7 +394,7 @@ public class RangeFetchMapCalculatorTest } } - private void addTrivialRangeAndSources(EndpointsByRange.Mutable rangesWithSources, int left, int right, String... hosts) throws UnknownHostException + private void addTrivialRangeAndSources(EndpointsByRange.Builder rangesWithSources, int left, int right, String... hosts) throws UnknownHostException { for (InetAddressAndPort endpoint : makeAddrs(hosts)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java index c289d50..ced49e2 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java @@ -22,18 +22,20 @@ import com.google.common.base.Predicates; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; +import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; import org.apache.cassandra.utils.FBUtilities; import org.junit.Assert; import org.junit.Test; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.AbstractMap; import java.util.Comparator; import java.util.LinkedHashSet; import java.util.List; @@ -50,7 +52,8 @@ public class ReplicaCollectionTest static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, NULL_EP; static final Range<Token> R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE; - + static final List<InetAddressAndPort> ALL_EP; + static final List<Range<Token>> ALL_R; static { try @@ -69,6 +72,8 @@ public class ReplicaCollectionTest R5 = range(4, 0); BROADCAST_RANGE = range(10, 11); NULL_RANGE = range(10000, 10001); + ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP); + ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE); } catch (UnknownHostException e) { @@ -88,13 +93,15 @@ public class ReplicaCollectionTest static class TestCase<C extends AbstractReplicaCollection<C>> { + final boolean isBuilder; final C test; final List<Replica> canonicalList; final Multimap<InetAddressAndPort, Replica> canonicalByEndpoint; final Multimap<Range<Token>, Replica> canonicalByRange; - TestCase(C test, List<Replica> canonicalList) + TestCase(boolean isBuilder, C test, List<Replica> canonicalList) { + this.isBuilder = isBuilder; this.test = test; this.canonicalList = canonicalList; this.canonicalByEndpoint = HashMultimap.create(); @@ -103,6 +110,8 @@ public class ReplicaCollectionTest canonicalByEndpoint.put(replica.endpoint(), replica); for (Replica replica : canonicalList) canonicalByRange.put(replica.range(), replica); + if (isBuilder) + Assert.assertTrue(test instanceof ReplicaCollection.Builder<?>); } void testSize() @@ -133,7 +142,7 @@ public class ReplicaCollectionTest Assert.assertTrue(test.endpoints().containsAll(canonicalByEndpoint.keySet())); for (InetAddressAndPort ep : canonicalByEndpoint.keySet()) Assert.assertTrue(test.endpoints().contains(ep)); - for (InetAddressAndPort ep : ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP)) + for (InetAddressAndPort ep : ALL_EP) if (!canonicalByEndpoint.containsKey(ep)) Assert.assertFalse(test.endpoints().contains(ep)); } @@ -142,44 +151,54 @@ public class ReplicaCollectionTest { Assert.assertEquals(canonicalList, ImmutableList.copyOf(test)); Assert.assertEquals(canonicalList, test.stream().collect(Collectors.toList())); - Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints()); + Assert.assertTrue(Iterables.elementsEqual(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints())); } private void assertSubList(C subCollection, int from, int to) { - Assert.assertTrue(subCollection.isSnapshot); if (from == to) { Assert.assertTrue(subCollection.isEmpty()); } else { - List<Replica> subList = this.test.list.subList(from, to); - if (test.isSnapshot) - Assert.assertSame(subList.getClass(), subCollection.list.getClass()); + AbstractReplicaCollection.ReplicaList subList = this.test.list.subList(from, to); + if (!isBuilder) + Assert.assertSame(subList.contents, subCollection.list.contents); Assert.assertEquals(subList, subCollection.list); } } + private void assertSubSequence(Iterable<Replica> subSequence, int from, int to) + { + AbstractReplicaCollection.ReplicaList subList = this.test.list.subList(from, to); + if (!elementsEqual(subList, subSequence)) + { + elementsEqual(subList, subSequence); + } + Assert.assertTrue(elementsEqual(subList, subSequence)); + } + void testSubList(int subListDepth, int filterDepth, int sortDepth) { - if (test.isSnapshot) + if (!isBuilder) Assert.assertSame(test, test.subList(0, test.size())); if (test.isEmpty()) return; - TestCase<C> skipFront = new TestCase<>(test.subList(1, test.size()), canonicalList.subList(1, canonicalList.size())); + Assert.assertSame(test.list.contents, test.subList(0, 1).list.contents); + TestCase<C> skipFront = new TestCase<>(false, test.subList(1, test.size()), canonicalList.subList(1, canonicalList.size())); assertSubList(skipFront.test, 1, canonicalList.size()); skipFront.testAll(subListDepth - 1, filterDepth, sortDepth); - TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() - 1), canonicalList.subList(0, canonicalList.size() - 1)); + TestCase<C> skipBack = new TestCase<>(false, test.subList(0, test.size() - 1), canonicalList.subList(0, canonicalList.size() - 1)); assertSubList(skipBack.test, 0, canonicalList.size() - 1); skipBack.testAll(subListDepth - 1, filterDepth, sortDepth); } void testFilter(int subListDepth, int filterDepth, int sortDepth) { - if (test.isSnapshot) + if (!isBuilder) Assert.assertSame(test, test.filter(Predicates.alwaysTrue())); if (test.isEmpty()) @@ -191,6 +210,8 @@ public class ReplicaCollectionTest Predicate<Replica> removeFirst = r -> !r.equals(canonicalList.get(0)); assertSubList(test.filter(removeFirst), 1, canonicalList.size()); assertSubList(test.filter(removeFirst, 1), 1, Math.min(canonicalList.size(), 2)); + assertSubSequence(test.filterLazily(removeFirst), 1, canonicalList.size()); + assertSubSequence(test.filterLazily(removeFirst, 1), 1, Math.min(canonicalList.size(), 2)); } if (test.size() <= 1) @@ -202,14 +223,17 @@ public class ReplicaCollectionTest int last = canonicalList.size() - 1; Predicate<Replica> removeLast = r -> !r.equals(canonicalList.get(last)); assertSubList(test.filter(removeLast), 0, last); + assertSubSequence(test.filterLazily(removeLast), 0, last); } if (test.size() <= 2) return; Predicate<Replica> removeMiddle = r -> !r.equals(canonicalList.get(canonicalList.size() / 2)); - TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(filter(canonicalList, removeMiddle::test))); + TestCase<C> filtered = new TestCase<>(false, test.filter(removeMiddle), ImmutableList.copyOf(filter(canonicalList, removeMiddle::test))); filtered.testAll(subListDepth, filterDepth - 1, sortDepth); + Assert.assertTrue(elementsEqual(filtered.canonicalList, test.filterLazily(removeMiddle, Integer.MAX_VALUE))); + Assert.assertTrue(elementsEqual(limit(filter(canonicalList, removeMiddle::test), canonicalList.size() - 2), test.filterLazily(removeMiddle, canonicalList.size() - 2))); } void testCount() @@ -250,7 +274,7 @@ public class ReplicaCollectionTest boolean f2 = o2.equals(canonicalList.get(0)); return f1 == f2 ? 0 : f1 ? 1 : -1; }; - TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList)); + TestCase<C> sorted = new TestCase<>(false, test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList)); sorted.testAll(subListDepth, filterDepth, sortDepth - 1); } @@ -279,9 +303,9 @@ public class ReplicaCollectionTest static class RangesAtEndpointTestCase extends TestCase<RangesAtEndpoint> { - RangesAtEndpointTestCase(RangesAtEndpoint test, List<Replica> canonicalList) + RangesAtEndpointTestCase(boolean isBuilder, RangesAtEndpoint test, List<Replica> canonicalList) { - super(test, canonicalList); + super(isBuilder, test, canonicalList); } void testRanges() @@ -301,16 +325,53 @@ public class ReplicaCollectionTest Assert.assertTrue(test.ranges().containsAll(canonicalByRange.keySet())); for (Range<Token> range : canonicalByRange.keySet()) Assert.assertTrue(test.ranges().contains(range)); - for (Range<Token> range : ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE)) + for (Range<Token> range : ALL_R) if (!canonicalByRange.containsKey(range)) Assert.assertFalse(test.ranges().contains(range)); } + void testByRange() + { + // check byEndppint() and byRange().entrySet() + Assert.assertFalse(test.byRange().containsKey(EP1)); + Assert.assertFalse(test.byRange().entrySet().contains(EP1)); + try + { + test.byRange().entrySet().contains(null); + Assert.fail(); + } catch (NullPointerException | IllegalArgumentException e) {} + try + { + test.byRange().containsKey(null); + Assert.fail(); + } catch (NullPointerException | IllegalArgumentException e) {} + + for (Range<Token> r : ALL_R) + { + if (canonicalByRange.containsKey(r)) + { + Assert.assertTrue(test.byRange().containsKey(r)); + Assert.assertEquals(canonicalByRange.get(r), ImmutableSet.of(test.byRange().get(r))); + for (Replica replica : canonicalByRange.get(r)) + Assert.assertTrue(test.byRange().entrySet().contains(new AbstractMap.SimpleImmutableEntry<>(r, replica))); + } + else + { + Assert.assertFalse(test.byRange().containsKey(r)); + Assert.assertFalse(test.byRange().entrySet().contains(new AbstractMap.SimpleImmutableEntry<>(r, Replica.fullReplica(EP1, r)))); + } + } + } + @Override public void testOrderOfIteration() { super.testOrderOfIteration(); - Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::range)), test.ranges()); + Assert.assertTrue(Iterables.elementsEqual(Lists.transform(canonicalList, Replica::range), test.ranges())); + Assert.assertTrue(Iterables.elementsEqual(canonicalList, test.byRange().values())); + Assert.assertTrue(Iterables.elementsEqual( + Lists.transform(canonicalList, r -> new AbstractMap.SimpleImmutableEntry<>(r.range(), r)), + test.byRange().entrySet())); } public void testUnwrap(int subListDepth, int filterDepth, int sortDepth) @@ -326,7 +387,7 @@ public class ReplicaCollectionTest } else { - new RangesAtEndpointTestCase(testUnwrap, canonUnwrap) + new RangesAtEndpointTestCase(false, testUnwrap, canonUnwrap) .testAllExceptUnwrap(subListDepth, filterDepth, sortDepth); } } @@ -335,8 +396,10 @@ public class ReplicaCollectionTest { super.testAll(subListDepth, filterDepth, sortDepth); testRanges(); + testByRange(); } + @Override void testAll(int subListDepth, int filterDepth, int sortDepth) { testAllExceptUnwrap(subListDepth, filterDepth, sortDepth); @@ -344,6 +407,64 @@ public class ReplicaCollectionTest } } + static class EndpointsTestCase<E extends Endpoints<E>> extends TestCase<E> + { + EndpointsTestCase(boolean isBuilder, E test, List<Replica> canonicalList) + { + super(isBuilder, test, canonicalList); + } + + void testByEndpoint() + { + // check byEndppint() and byEndpoint().entrySet() + Assert.assertFalse(test.byEndpoint().containsKey(R1)); + Assert.assertFalse(test.byEndpoint().entrySet().contains(EP1)); + try + { + test.byEndpoint().entrySet().contains(null); + Assert.fail(); + } catch (NullPointerException | IllegalArgumentException e) {} + try + { + test.byEndpoint().containsKey(null); + Assert.fail(); + } catch (NullPointerException | IllegalArgumentException e) {} + + for (InetAddressAndPort ep : ALL_EP) + { + if (canonicalByEndpoint.containsKey(ep)) + { + Assert.assertTrue(test.byEndpoint().containsKey(ep)); + Assert.assertEquals(canonicalByEndpoint.get(ep), ImmutableSet.of(test.byEndpoint().get(ep))); + for (Replica replica : canonicalByEndpoint.get(ep)) + Assert.assertTrue(test.byEndpoint().entrySet().contains(new AbstractMap.SimpleImmutableEntry<>(ep, replica))); + } + else + { + Assert.assertFalse(test.byEndpoint().containsKey(ep)); + Assert.assertFalse(test.byEndpoint().entrySet().contains(new AbstractMap.SimpleImmutableEntry<>(ep, Replica.fullReplica(ep, R1)))); + } + } + } + + @Override + public void testOrderOfIteration() + { + super.testOrderOfIteration(); + Assert.assertTrue(Iterables.elementsEqual(canonicalList, test.byEndpoint().values())); + Assert.assertTrue(Iterables.elementsEqual( + Lists.transform(canonicalList, r -> new AbstractMap.SimpleImmutableEntry<>(r.endpoint(), r)), + test.byEndpoint().entrySet())); + } + + @Override + void testAll(int subListDepth, int filterDepth, int sortDepth) + { + super.testAll(subListDepth, filterDepth, sortDepth); + testByEndpoint(); + } + } + private static final ImmutableList<Replica> RANGES_AT_ENDPOINT = ImmutableList.of( fullReplica(EP1, R1), fullReplica(EP1, R2), @@ -357,7 +478,7 @@ public class ReplicaCollectionTest { ImmutableList<Replica> canonical = RANGES_AT_ENDPOINT; new RangesAtEndpointTestCase( - RangesAtEndpoint.copyOf(canonical), canonical + false, RangesAtEndpoint.copyOf(canonical), canonical ).testAll(); } @@ -365,7 +486,7 @@ public class ReplicaCollectionTest public void testMutableRangesAtEndpoint() { ImmutableList<Replica> canonical1 = RANGES_AT_ENDPOINT.subList(0, RANGES_AT_ENDPOINT.size()); - RangesAtEndpoint.Mutable test = new RangesAtEndpoint.Mutable(RANGES_AT_ENDPOINT.get(0).endpoint(), canonical1.size()); + RangesAtEndpoint.Builder test = new RangesAtEndpoint.Builder(RANGES_AT_ENDPOINT.get(0).endpoint(), canonical1.size()); test.addAll(canonical1, Conflict.NONE); try { // incorrect range @@ -385,16 +506,14 @@ public class ReplicaCollectionTest } catch (IllegalArgumentException e) { } test.add(fullReplica(EP1, R3), Conflict.ALL); - new RangesAtEndpointTestCase(test, canonical1).testAll(); + new RangesAtEndpointTestCase(true, test, canonical1).testAll(); - RangesAtEndpoint view = test.asImmutableView(); - RangesAtEndpoint snapshot = view.subList(0, view.size()); + RangesAtEndpoint snapshot = test.subList(0, test.size()); ImmutableList<Replica> canonical2 = RANGES_AT_ENDPOINT; test.addAll(canonical2.reverse(), Conflict.DUPLICATE); - new TestCase<>(snapshot, canonical1).testAll(); - new TestCase<>(view, canonical2).testAll(); - new TestCase<>(test, canonical2).testAll(); + new TestCase<>(false, snapshot, canonical1).testAll(); + new TestCase<>(true, test, canonical2).testAll(); } private static final ImmutableList<Replica> ENDPOINTS_FOR_X = ImmutableList.of( @@ -409,8 +528,8 @@ public class ReplicaCollectionTest public void testEndpointsForRange() { ImmutableList<Replica> canonical = ENDPOINTS_FOR_X; - new TestCase<>( - EndpointsForRange.copyOf(canonical), canonical + new EndpointsTestCase<>( + false, EndpointsForRange.copyOf(canonical), canonical ).testAll(); } @@ -418,7 +537,7 @@ public class ReplicaCollectionTest public void testMutableEndpointsForRange() { ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, ENDPOINTS_FOR_X.size() - 1); - EndpointsForRange.Mutable test = new EndpointsForRange.Mutable(R1, canonical1.size()); + EndpointsForRange.Builder test = new EndpointsForRange.Builder(R1, canonical1.size()); test.addAll(canonical1, Conflict.NONE); try { // incorrect range @@ -438,24 +557,22 @@ public class ReplicaCollectionTest } catch (IllegalArgumentException e) { } test.add(transientReplica(EP1, R1), Conflict.ALL); - new TestCase<>(test, canonical1).testAll(); + new EndpointsTestCase<>(true, test, canonical1).testAll(); - EndpointsForRange view = test.asImmutableView(); - EndpointsForRange snapshot = view.subList(0, view.size()); + EndpointsForRange snapshot = test.subList(0, test.size()); ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X; test.addAll(canonical2.reverse(), Conflict.DUPLICATE); - new TestCase<>(snapshot, canonical1).testAll(); - new TestCase<>(view, canonical2).testAll(); - new TestCase<>(test, canonical2).testAll(); + new EndpointsTestCase<>(false, snapshot, canonical1).testAll(); + new EndpointsTestCase<>(true, test, canonical2).testAll(); } @Test public void testEndpointsForToken() { ImmutableList<Replica> canonical = ENDPOINTS_FOR_X; - new TestCase<>( - EndpointsForToken.copyOf(tk(1), canonical), canonical + new EndpointsTestCase<>( + false, EndpointsForToken.copyOf(tk(1), canonical), canonical ).testAll(); } @@ -463,7 +580,7 @@ public class ReplicaCollectionTest public void testMutableEndpointsForToken() { ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, ENDPOINTS_FOR_X.size() - 1); - EndpointsForToken.Mutable test = new EndpointsForToken.Mutable(tk(1), canonical1.size()); + EndpointsForToken.Builder test = new EndpointsForToken.Builder(tk(1), canonical1.size()); test.addAll(canonical1, Conflict.NONE); try { // incorrect range @@ -483,15 +600,13 @@ public class ReplicaCollectionTest } catch (IllegalArgumentException e) { } test.add(transientReplica(EP1, R1), Conflict.ALL); - new TestCase<>(test, canonical1).testAll(); + new EndpointsTestCase<>(true, test, canonical1).testAll(); - EndpointsForToken view = test.asImmutableView(); - EndpointsForToken snapshot = view.subList(0, view.size()); + EndpointsForToken snapshot = test.subList(0, test.size()); ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X; test.addAll(canonical2.reverse(), Conflict.DUPLICATE); - new TestCase<>(snapshot, canonical1).testAll(); - new TestCase<>(view, canonical2).testAll(); - new TestCase<>(test, canonical2).testAll(); + new EndpointsTestCase<>(false, snapshot, canonical1).testAll(); + new EndpointsTestCase<>(true, test, canonical2).testAll(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index 731a25d..a7cfc1b 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -37,7 +37,6 @@ import org.junit.Test; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaCollection; -import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.schema.MigrationManager; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; @@ -535,10 +534,10 @@ public class MoveTest private void assertMaps(Map<Range<Token>, EndpointsForRange> expected, PendingRangeMaps actual) { int sizeOfActual = 0; - Iterator<Map.Entry<Range<Token>, EndpointsForRange.Mutable>> iterator = actual.iterator(); + Iterator<Map.Entry<Range<Token>, EndpointsForRange.Builder>> iterator = actual.iterator(); while(iterator.hasNext()) { - Map.Entry<Range<Token>, EndpointsForRange.Mutable> actualEntry = iterator.next(); + Map.Entry<Range<Token>, EndpointsForRange.Builder> actualEntry = iterator.next(); assertNotNull(expected.get(actualEntry.getKey())); assertEquals(ImmutableSet.copyOf(expected.get(actualEntry.getKey())), ImmutableSet.copyOf(actualEntry.getValue())); sizeOfActual++; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/test/unit/org/apache/cassandra/service/MoveTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTransientTest.java b/test/unit/org/apache/cassandra/service/MoveTransientTest.java index e5a63c7..a3868a2 100644 --- a/test/unit/org/apache/cassandra/service/MoveTransientTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTransientTest.java @@ -392,7 +392,7 @@ public class MoveTransientTest @Test public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints() throws Exception { - EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + EndpointsByReplica.Builder expectedResult = new EndpointsByReplica.Builder(); InetAddressAndPort cOrB = (downNodes.contains(address03) || sourceFilterDownNodes.contains(address03)) ? address02 : address03; @@ -406,7 +406,7 @@ public class MoveTransientTest invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().right, constructTMDsMoveForwardBetween(), - expectedResult.asImmutableView()); + expectedResult.build()); } @Test @@ -469,7 +469,7 @@ public class MoveTransientTest @Test public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints() throws Exception { - EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + EndpointsByReplica.Builder expectedResult = new EndpointsByReplica.Builder(); //Need to pull the full replica and the transient replica that is losing the range expectedResult.put(fullReplica(address01, nineToken, elevenToken), fullReplica(address05, nineToken, elevenToken)); @@ -477,7 +477,7 @@ public class MoveTransientTest invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().right, constructTMDsMoveBackwardBetween(), - expectedResult.asImmutableView()); + expectedResult.build()); } @@ -503,18 +503,18 @@ public class MoveTransientTest public void testMoveBackwardCalculateRangesToFetchWithPreferredEndpoints() throws Exception { //Moving backwards should fetch nothing and fetch ranges is emptys so this doesn't test a ton - EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + EndpointsByReplica.Builder expectedResult = new EndpointsByReplica.Builder(); invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().right, constructTMDsMoveBackward(), - expectedResult.asImmutableView()); + expectedResult.build()); } @Test public void testMoveForwardCalculateRangesToFetchWithPreferredEndpoints() throws Exception { - EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + EndpointsByReplica.Builder expectedResult = new EndpointsByReplica.Builder(); InetAddressAndPort cOrBAddress = (downNodes.contains(address03) || sourceFilterDownNodes.contains(address03)) ? address02 : address03; @@ -524,7 +524,7 @@ public class MoveTransientTest invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().right, constructTMDsMoveForward(), - expectedResult.asImmutableView()); + expectedResult.build()); } @@ -626,7 +626,7 @@ public class MoveTransientTest public void testMoveForwardBetweenCalculateRangesToStreamWithPreferredEndpoints() throws Exception { DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); - RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); + RangesByEndpoint.Builder expectedResult = new RangesByEndpoint.Builder(); //Need to pull the full replica and the transient replica that is losing the range expectedResult.put(address02, transientReplica(address02, nineToken, elevenToken)); @@ -634,13 +634,13 @@ public class MoveTransientTest invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().left, constructTMDsMoveForwardBetween(), - expectedResult.asImmutableView()); + expectedResult.build()); } @Test public void testMoveBackwardBetweenCalculateRangesToStreamWithPreferredEndpoints() throws Exception { - RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); + RangesByEndpoint.Builder expectedResult = new RangesByEndpoint.Builder(); expectedResult.put(address02, fullReplica(address02, fourteenToken, oneToken)); @@ -651,30 +651,30 @@ public class MoveTransientTest invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().left, constructTMDsMoveBackwardBetween(), - expectedResult.asImmutableView()); + expectedResult.build()); } @Test public void testMoveBackwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception { - RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); + RangesByEndpoint.Builder expectedResult = new RangesByEndpoint.Builder(); expectedResult.put(address03, fullReplica(address03, twoToken, threeToken)); expectedResult.put(address04, transientReplica(address04, twoToken, threeToken)); invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().left, constructTMDsMoveBackward(), - expectedResult.asImmutableView()); + expectedResult.build()); } @Test public void testMoveForwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception { //Nothing to stream moving forward because we are acquiring more range not losing range - RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); + RangesByEndpoint.Builder expectedResult = new RangesByEndpoint.Builder(); invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().left, constructTMDsMoveForward(), - expectedResult.asImmutableView()); + expectedResult.build()); } private void invokeCalculateRangesToStreamWithPreferredEndpoints(RangesAtEndpoint toStream, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/test/unit/org/apache/cassandra/service/StorageServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/StorageServiceTest.java b/test/unit/org/apache/cassandra/service/StorageServiceTest.java index cc7fac3..f22f89f 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceTest.java @@ -149,12 +149,12 @@ public class StorageServiceTest EndpointsByReplica result = StorageService.getChangedReplicasForLeaving("StorageServiceTest", aAddress, tmd, strat); System.out.println(result); - EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + EndpointsByReplica.Builder expectedResult = new EndpointsByReplica.Builder(); expectedResult.put(new Replica(aAddress, aRange, true), new Replica(cAddress, new Range<>(oneToken, sixToken), true)); expectedResult.put(new Replica(aAddress, aRange, true), new Replica(dAddress, new Range<>(oneToken, sixToken), false)); expectedResult.put(new Replica(aAddress, eRange, true), new Replica(bAddress, eRange, true)); expectedResult.put(new Replica(aAddress, eRange, true), new Replica(cAddress, eRange, false)); expectedResult.put(new Replica(aAddress, dRange, false), new Replica(bAddress, dRange, false)); - assertMultimapEqualsIgnoreOrder(result, expectedResult.asImmutableView()); + assertMultimapEqualsIgnoreOrder(result, expectedResult.build()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e645b917/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java index c49bf3a..17c6e41 100644 --- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java @@ -57,7 +57,6 @@ import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.*; import org.apache.cassandra.service.reads.repair.ReadRepair; @@ -1186,10 +1185,10 @@ public class DataResolverTest extends AbstractReadResponseTest public void responsesFromTransientReplicasAreNotTracked() { EndpointsForRange replicas = makeReplicas(2); - EndpointsForRange.Mutable mutable = replicas.newMutable(2); + EndpointsForRange.Builder mutable = replicas.newBuilder(2); mutable.add(replicas.get(0)); mutable.add(Replica.transientReplica(replicas.get(1).endpoint(), replicas.range())); - replicas = mutable.asSnapshot(); + replicas = mutable.build(); TestRepairedDataVerifier verifier = new TestRepairedDataVerifier(); ByteBuffer digest1 = ByteBufferUtil.bytes("digest1"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
