Repository: cassandra Updated Branches: refs/heads/trunk 914c66685 -> 29f83b888
Consolidate batch write code Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-14742 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29f83b88 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29f83b88 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29f83b88 Branch: refs/heads/trunk Commit: 29f83b88821c4792087df19d829ac87b5c06e9e6 Parents: 914c666 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Mon Sep 17 15:13:05 2018 +0200 Committer: Alex Petrov <oleksandr.pet...@gmail.com> Committed: Thu Sep 27 18:03:48 2018 +0200 ---------------------------------------------------------------------- .../cassandra/batchlog/BatchlogManager.java | 89 ------------- .../cassandra/config/DatabaseDescriptor.java | 2 +- .../db/CounterMutationVerbHandler.java | 2 +- .../org/apache/cassandra/db/SystemKeyspace.java | 4 +- .../org/apache/cassandra/db/view/ViewUtils.java | 6 +- .../org/apache/cassandra/dht/Datacenters.java | 2 +- .../cassandra/dht/RangeFetchMapCalculator.java | 2 +- .../org/apache/cassandra/dht/RangeStreamer.java | 8 +- .../cassandra/locator/EndpointSnitchInfo.java | 4 +- .../cassandra/locator/IEndpointSnitch.java | 22 ++- .../apache/cassandra/locator/InOurDcTester.java | 2 +- .../org/apache/cassandra/locator/Replica.java | 2 +- .../apache/cassandra/locator/ReplicaPlans.java | 133 ++++++++++++++++++- .../cassandra/locator/SystemReplicas.java | 5 +- .../apache/cassandra/net/MessagingService.java | 6 +- .../cassandra/service/RangeRelocator.java | 3 +- .../apache/cassandra/service/StartupChecks.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 85 +++++------- .../cassandra/service/StorageService.java | 8 +- .../service/reads/AbstractReadExecutor.java | 2 +- .../reads/ShortReadPartitionsProtection.java | 2 +- .../apache/cassandra/streaming/StreamPlan.java | 4 +- .../cassandra/streaming/StreamSession.java | 4 +- .../batchlog/BatchlogEndpointFilterTest.java | 65 ++++----- 24 files changed, 240 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 77f725c..91129ed 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -527,93 +527,4 @@ public class BatchlogManager implements BatchlogManagerMBean } } } - - public static class EndpointFilter - { - private final String localRack; - private final Multimap<String, InetAddressAndPort> endpoints; - - public EndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints) - { - this.localRack = localRack; - this.endpoints = endpoints; - } - - /** - * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. - */ - public Collection<InetAddressAndPort> filter() - { - // special case for single-node data centers - if (endpoints.values().size() == 1) - return endpoints.values(); - - // strip out dead endpoints and localhost - ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create(); - for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries()) - if (isValid(entry.getValue())) - validated.put(entry.getKey(), entry.getValue()); - - if (validated.size() <= 2) - return validated.values(); - - if (validated.size() - validated.get(localRack).size() >= 2) - { - // we have enough endpoints in other racks - validated.removeAll(localRack); - } - - if (validated.keySet().size() == 1) - { - /* - * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack) - * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack - * because of the preceding if block. - */ - List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values()); - shuffle(otherRack); - return otherRack.subList(0, 2); - } - - // randomize which racks we pick from if more than 2 remaining - Collection<String> racks; - if (validated.keySet().size() == 2) - { - racks = validated.keySet(); - } - else - { - racks = Lists.newArrayList(validated.keySet()); - shuffle((List<String>) racks); - } - - // grab a random member of up to two racks - List<InetAddressAndPort> result = new ArrayList<>(2); - for (String rack : Iterables.limit(racks, 2)) - { - List<InetAddressAndPort> rackMembers = validated.get(rack); - result.add(rackMembers.get(getRandomInt(rackMembers.size()))); - } - - return result; - } - - @VisibleForTesting - protected boolean isValid(InetAddressAndPort input) - { - return !input.equals(FBUtilities.getBroadcastAddressAndPort()) && FailureDetector.instance.isAlive(input); - } - - @VisibleForTesting - protected int getRandomInt(int bound) - { - return ThreadLocalRandom.current().nextInt(bound); - } - - @VisibleForTesting - protected void shuffle(List<?> list) - { - Collections.shuffle(list); - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 2ad9b18..dc76431 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -991,7 +991,7 @@ public class DatabaseDescriptor snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch); EndpointSnitchInfo.create(); - localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + localDC = snitch.getLocalDatacenter(); localComparator = (replica1, replica2) -> { boolean local1 = localDC.equals(snitch.getDatacenter(replica1)); boolean local2 = localDC.equals(snitch.getDatacenter(replica2)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index 95d7916..c946ea5 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -37,7 +37,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> final CounterMutation cm = message.payload; logger.trace("Applying forwarded {}", cm); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); // We should not wait for the result of the write in this thread, // otherwise we could have a distributed deadlock between replicas // running this VerbHandler (see #4578). http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 0f904ce..1b3b2a6 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -460,8 +460,8 @@ public final class SystemKeyspace FBUtilities.getReleaseVersionString(), QueryProcessor.CQL_VERSION.toString(), String.valueOf(ProtocolVersion.CURRENT.asInt()), - snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()), - snitch.getRack(FBUtilities.getBroadcastAddressAndPort()), + snitch.getLocalDatacenter(), + snitch.getLocalRack(), DatabaseDescriptor.getPartitioner().getClass().getName(), DatabaseDescriptor.getRpcAddress(), DatabaseDescriptor.getNativeTransportPort(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/db/view/ViewUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java index ad10d9d..e824732 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUtils.java +++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java @@ -63,11 +63,11 @@ public final class ViewUtils { AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy(); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); EndpointsForToken naturalBaseReplicas = replicationStrategy.getNaturalReplicasForToken(baseToken); EndpointsForToken naturalViewReplicas = replicationStrategy.getNaturalReplicasForToken(viewToken); - Optional<Replica> localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isLocal).toJavaUtil(); + Optional<Replica> localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isSelf).toJavaUtil(); if (localReplica.isPresent()) return localReplica; @@ -93,7 +93,7 @@ public final class ViewUtils int baseIdx = -1; for (int i=0; i<baseReplicas.size(); i++) { - if (baseReplicas.get(i).isLocal()) + if (baseReplicas.get(i).isSelf()) { baseIdx = i; break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/dht/Datacenters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Datacenters.java b/src/java/org/apache/cassandra/dht/Datacenters.java index 26ae2e6..9695a09 100644 --- a/src/java/org/apache/cassandra/dht/Datacenters.java +++ b/src/java/org/apache/cassandra/dht/Datacenters.java @@ -32,7 +32,7 @@ public class Datacenters private static class DCHandle { - private static final String thisDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + private static final String thisDc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); } public static String thisDatacenter() http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java index 63265b7..2a2de01 100644 --- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java +++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java @@ -357,7 +357,7 @@ public class RangeFetchMapCalculator { sourceFound = true; // if we pass filters, it means that we don't filter away localhost and we can count it as a source: - if (replica.isLocal()) + if (replica.isSelf()) continue; // but don't add localhost to the graph to avoid streaming locally final Vertex endpointVertex = new EndpointVertex(replica.endpoint()); capacityGraph.insertVertex(rangeVertex); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index f46d665..b50a4e2 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -104,7 +104,7 @@ public class RangeStreamer { Preconditions.checkNotNull(local); Preconditions.checkNotNull(remote); - assert local.isLocal() && !remote.isLocal(); + assert local.isSelf() && !remote.isSelf(); this.local = local; this.remote = remote; } @@ -203,7 +203,7 @@ public class RangeStreamer @Override public boolean apply(Replica replica) { - return !replica.isLocal(); + return !replica.isSelf(); } @Override @@ -553,8 +553,8 @@ public class RangeStreamer { for (Replica source : e.getValue()) { - assert e.getKey().isLocal(); - assert !source.isLocal(); + assert (e.getKey()).isSelf(); + assert !source.isSelf(); workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java index c06d765..da90a79 100644 --- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java +++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java @@ -53,12 +53,12 @@ public class EndpointSnitchInfo implements EndpointSnitchInfoMBean public String getDatacenter() { - return DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + return DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); } public String getRack() { - return DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + return DatabaseDescriptor.getEndpointSnitch().getLocalRack(); } public String getSnitchName() http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/IEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java index b7797b0..381a642 100644 --- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java @@ -19,6 +19,8 @@ package org.apache.cassandra.locator; import java.util.Set; +import org.apache.cassandra.utils.FBUtilities; + /** * This interface helps determine location of node in the datacenter relative to another node. * Give a node A and another node B it can tell if A and B are on the same rack or in the same @@ -28,15 +30,31 @@ import java.util.Set; public interface IEndpointSnitch { /** - * returns a String representing the rack this endpoint belongs to + * returns a String representing the rack the given endpoint belongs to */ public String getRack(InetAddressAndPort endpoint); /** - * returns a String representing the datacenter this endpoint belongs to + * returns a String representing the rack current endpoint belongs to + */ + default public String getLocalRack() + { + return getRack(FBUtilities.getBroadcastAddressAndPort()); + } + + /** + * returns a String representing the datacenter the given endpoint belongs to */ public String getDatacenter(InetAddressAndPort endpoint); + /** + * returns a String representing the datacenter current endpoint belongs to + */ + default public String getLocalDatacenter() + { + return getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + } + default public String getDatacenter(Replica replica) { return getDatacenter(replica.endpoint()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/InOurDcTester.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/InOurDcTester.java b/src/java/org/apache/cassandra/locator/InOurDcTester.java index 23a8c13..514c7ef 100644 --- a/src/java/org/apache/cassandra/locator/InOurDcTester.java +++ b/src/java/org/apache/cassandra/locator/InOurDcTester.java @@ -43,7 +43,7 @@ public class InOurDcTester // this final clause checks if somehow the snitch/localDc have got out of whack; // presently, this is possible but very unlikely, but this check will also help // resolve races on these global fields as well - || !dc.equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())); + || !dc.equals(snitch.getLocalDatacenter()); } private static final class ReplicaTester extends InOurDcTester implements Predicate<Replica> http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/Replica.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Replica.java b/src/java/org/apache/cassandra/locator/Replica.java index 37b6050..c884f13 100644 --- a/src/java/org/apache/cassandra/locator/Replica.java +++ b/src/java/org/apache/cassandra/locator/Replica.java @@ -100,7 +100,7 @@ public final class Replica implements Comparable<Replica> return endpoint; } - public boolean isLocal() + public boolean isSelf() { return endpoint.equals(FBUtilities.getBroadcastAddressAndPort()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/ReplicaPlans.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index 3d56a73..87f3c09 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -21,7 +21,13 @@ package org.apache.cassandra.locator; import com.carrotsearch.hppc.ObjectIntOpenHashMap; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; @@ -31,12 +37,23 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; +import org.apache.cassandra.utils.FBUtilities; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; import static com.google.common.collect.Iterables.any; @@ -173,26 +190,132 @@ public class ReplicaPlans return forSingleReplicaWrite(keyspace, token, replica); } + public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() + { + Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); + Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( + EndpointsForToken.of(token, localSystemReplica), + EndpointsForToken.empty(token) + ); + + return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); + } + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. + * @param isAny if batch consistency level is ANY, in which case a local node will be picked */ - public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException + public static ReplicaPlan.ForTokenWrite forBatchlogWrite(boolean isAny) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks() + .get(snitch.getLocalDatacenter())); + // Replicas are picked manually: + // - replicas should be alive according to the failure detector + // - replicas should be in the local datacenter + // - choose min(2, number of qualifying candiates above) + // - allow the local node to be the only replica only if it's a single-node DC + Collection<InetAddressAndPort> chosenEndpoints = filterBatchlogEndpoints(snitch.getLocalRack(), localEndpoints); + + if (chosenEndpoints.isEmpty() && isAny) + chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( - SystemReplicas.getSystemReplicas(endpoints).forToken(token), + SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token), EndpointsForToken.empty(token) ); + + // Batchlog is hosted by either one node or two nodes from different racks. ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); + // assume that we have already been given live endpoints, and skip applying the failure detector - return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, writeAll); + return forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll); + } + + private static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack, + Multimap<String, InetAddressAndPort> endpoints) + { + return filterBatchlogEndpoints(localRack, + endpoints, + Collections::shuffle, + FailureDetector.isEndpointAlive, + ThreadLocalRandom.current()::nextInt); + } + + // Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks. + @VisibleForTesting + public static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack, + Multimap<String, InetAddressAndPort> endpoints, + Consumer<List<?>> shuffle, + Predicate<InetAddressAndPort> isAlive, + Function<Integer, Integer> indexPicker) + { + // special case for single-node data centers + if (endpoints.values().size() == 1) + return endpoints.values(); + + // strip out dead endpoints and localhost + ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create(); + for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries()) + { + InetAddressAndPort addr = entry.getValue(); + if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr)) + validated.put(entry.getKey(), entry.getValue()); + } + + if (validated.size() <= 2) + return validated.values(); + + if (validated.size() - validated.get(localRack).size() >= 2) + { + // we have enough endpoints in other racks + validated.removeAll(localRack); + } + + if (validated.keySet().size() == 1) + { + /* + * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack) + * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack + * because of the preceding if block. + */ + List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values()); + shuffle.accept(otherRack); + return otherRack.subList(0, 2); + } + + // randomize which racks we pick from if more than 2 remaining + Collection<String> racks; + if (validated.keySet().size() == 2) + { + racks = validated.keySet(); + } + else + { + racks = Lists.newArrayList(validated.keySet()); + shuffle.accept((List<?>) racks); + } + + // grab a random member of up to two racks + List<InetAddressAndPort> result = new ArrayList<>(2); + for (String rack : Iterables.limit(racks, 2)) + { + List<InetAddressAndPort> rackMembers = validated.get(rack); + result.add(rackMembers.get(indexPicker.apply(rackMembers.size()))); + } + + return result; } public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/locator/SystemReplicas.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java index 0d1fc8d..456bae5 100644 --- a/src/java/org/apache/cassandra/locator/SystemReplicas.java +++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java @@ -41,8 +41,6 @@ public class SystemReplicas /** * There are a few places where a system function borrows write path functionality, but doesn't otherwise * fit into normal replication strategies (ie: hints and batchlog). So here we provide a replica instance - * @param endpoint - * @return */ public static Replica getSystemReplica(InetAddressAndPort endpoint) { @@ -51,6 +49,9 @@ public class SystemReplicas public static EndpointsForRange getSystemReplicas(Collection<InetAddressAndPort> endpoints) { + if (endpoints.isEmpty()) + return EndpointsForRange.empty(FULL_RANGE); + return EndpointsForRange.copyOf(Collections2.transform(endpoints, SystemReplicas::getSystemReplica)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index bd290a1..c6e8496 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -1696,13 +1696,13 @@ public final class MessagingService implements MessagingServiceMBean case all: break; case dc: - if (snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()))) + if (snitch.getDatacenter(address).equals(snitch.getLocalDatacenter())) return false; break; case rack: // for rack then check if the DC's are the same. - if (snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddressAndPort())) - && snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()))) + if (snitch.getRack(address).equals(snitch.getLocalRack()) + && snitch.getDatacenter(address).equals(snitch.getLocalDatacenter())) return false; break; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/RangeRelocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeRelocator.java b/src/java/org/apache/cassandra/service/RangeRelocator.java index f2af3db..839a34c 100644 --- a/src/java/org/apache/cassandra/service/RangeRelocator.java +++ b/src/java/org/apache/cassandra/service/RangeRelocator.java @@ -185,8 +185,7 @@ public class RangeRelocator //In the single node token move there is nothing to do and Range subtraction is broken //so it's easier to just identify this case up front. - if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort() -)).size() > 1) + if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter()).size() > 1) { // getting collection of the currently used ranges by this keyspace RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/StartupChecks.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java index 224fd5e..8814281 100644 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@ -437,7 +437,7 @@ public class StartupChecks String storedDc = SystemKeyspace.getDatacenter(); if (storedDc != null) { - String currentDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String currentDc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); if (!storedDc.equals(currentDc)) { String formatMessage = "Cannot start node if snitch's data center (%s) differs from previous data center (%s). " + @@ -459,7 +459,7 @@ public class StartupChecks String storedRack = SystemKeyspace.getRack(); if (storedRack != null) { - String currentRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + String currentRack = DatabaseDescriptor.getEndpointSnitch().getLocalRack(); if (!storedRack.equals(currentRack)) { String formatMessage = "Cannot start node if snitch's rack (%s) differs from previous rack (%s). " + http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index c6315ff..b3adc47 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -80,6 +80,8 @@ import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.AbstractIterator; +import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup; + public class StorageProxy implements StorageProxyMBean { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy"; @@ -446,7 +448,7 @@ public class StorageProxy implements StorageProxyMBean MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer); for (Replica replica: replicaPlan.contacts()) { - if (replica.isLocal()) + if (replica.isSelf()) { StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PREPARE)).execute(new Runnable() { @@ -484,7 +486,7 @@ public class StorageProxy implements StorageProxyMBean MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer); for (Replica replica : replicaPlan.contacts()) { - if (replica.isLocal()) + if (replica.isSelf()) { StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PROPOSE)).execute(new Runnable() { @@ -549,7 +551,7 @@ public class StorageProxy implements StorageProxyMBean { if (shouldBlock) { - if (replica.isLocal()) + if (replica.isSelf()) commitPaxosLocal(replica, message, responseHandler); else MessagingService.instance().sendWriteRR(message, replica, responseHandler, allowHints && shouldHint(replica)); @@ -623,7 +625,7 @@ public class StorageProxy implements StorageProxyMBean throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException { Tracing.trace("Determining replicas for mutation"); - final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); long startTime = System.nanoTime(); @@ -755,7 +757,7 @@ public class StorageProxy implements StorageProxyMBean throws UnavailableException, OverloadedException, WriteTimeoutException { Tracing.trace("Determining replicas for mutation"); - final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); long startTime = System.nanoTime(); @@ -780,8 +782,9 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; //Since the base -> view replication is 1:1 we only need to store the BL locally - final Collection<InetAddressAndPort> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); - BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forLocalBatchlogWrite(); + BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(), + () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) @@ -806,7 +809,7 @@ public class StorageProxy implements StorageProxyMBean // When local node is the endpoint we can just apply the mutation locally, // unless there are pending endpoints, in which case we want to do an ordinary // write so the view mutation is sent to the pending endpoint - if (pairedEndpoint.get().isLocal() && StorageService.instance.isJoined() + if (pairedEndpoint.get().isSelf() && StorageService.instance.isJoined() && pendingReplicas.isEmpty()) { try @@ -899,7 +902,6 @@ public class StorageProxy implements StorageProxyMBean long startTime = System.nanoTime(); List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size()); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas())) throw new AssertionError("Logged batches are unsupported with transient replication"); @@ -920,10 +922,11 @@ public class StorageProxy implements StorageProxyMBean batchConsistencyLevel = consistency_level; } - final Collection<InetAddressAndPort> batchlogEndpoints = getBatchlogReplicas(localDataCenter, batchConsistencyLevel); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(batchConsistencyLevel == ConsistencyLevel.ANY); + final UUID batchUUID = UUIDGen.getTimeUUID(); - BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(), + () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) @@ -939,10 +942,10 @@ public class StorageProxy implements StorageProxyMBean } // write to the batchlog - syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID, queryStartNanoTime); + syncWriteToBatchlog(mutations, replicaPlan, batchUUID, queryStartNanoTime); // now actually perform the writes and wait for them to complete - syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION); + syncWriteBatchedMutations(wrappers, Stage.MUTATION); } catch (UnavailableException e) { @@ -998,11 +1001,9 @@ public class StorageProxy implements StorageProxyMBean } } - private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddressAndPort> endpoints, UUID uuid, long queryStartNanoTime) + private static void syncWriteToBatchlog(Collection<Mutation> mutations, ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid, long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException { - Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); - ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints); WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan, WriteType.BATCH_LOG, queryStartNanoTime); @@ -1013,7 +1014,7 @@ public class StorageProxy implements StorageProxyMBean { logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size()); - if (replica.isLocal()) + if (replica.isSelf()) performLocally(Stage.MUTATION, replica, Optional.empty(), () -> BatchlogManager.store(batch), handler); else MessagingService.instance().sendRR(message, replica.endpoint(), handler); @@ -1021,18 +1022,18 @@ public class StorageProxy implements StorageProxyMBean handler.get(); } - private static void asyncRemoveFromBatchlog(Collection<InetAddressAndPort> endpoints, UUID uuid) + private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid) { MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer); - for (InetAddressAndPort target : endpoints) + for (Replica target : replicaPlan.contacts()) { if (logger.isTraceEnabled()) logger.trace("Sending batchlog remove request {} to {}", uuid, target); - if (target.equals(FBUtilities.getBroadcastAddressAndPort())) - performLocally(Stage.MUTATION, SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid)); + if (target.isSelf()) + performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid)); else - MessagingService.instance().sendOneWay(message, target); + MessagingService.instance().sendOneWay(message, target.endpoint()); } } @@ -1054,9 +1055,11 @@ public class StorageProxy implements StorageProxyMBean } } - private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage) + private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, Stage stage) throws WriteTimeoutException, OverloadedException { + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); + for (WriteResponseHandlerWrapper wrapper : wrappers) { EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown(); @@ -1162,32 +1165,6 @@ public class StorageProxy implements StorageProxyMBean } } - /* - * Replicas are picked manually: - * - replicas should be alive according to the failure detector - * - replicas should be in the local datacenter - * - choose min(2, number of qualifying candiates above) - * - allow the local node to be the only replica only if it's a single-node DC - */ - private static Collection<InetAddressAndPort> getBatchlogReplicas(String localDataCenter, ConsistencyLevel consistencyLevel) - throws UnavailableException - { - TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); - Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); - String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); - - Collection<InetAddressAndPort> chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); - if (chosenEndpoints.isEmpty()) - { - if (consistencyLevel == ConsistencyLevel.ANY) - return Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); - - throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); - } - - return chosenEndpoints; - } - /** * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node * is not available. @@ -1231,7 +1208,7 @@ public class StorageProxy implements StorageProxyMBean if (plan.isAlive(destination)) { - if (destination.isLocal()) + if (destination.isSelf()) { insertLocal = true; localReplica = destination; @@ -1424,7 +1401,7 @@ public class StorageProxy implements StorageProxyMBean { Replica replica = findSuitableReplica(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); - if (replica.isLocal()) + if (replica.isSelf()) { return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime); } @@ -2096,7 +2073,7 @@ public class StorageProxy implements StorageProxyMBean command.trackRepairedStatus(); } - if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isLocal()) + if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf()) { StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler)); } @@ -2366,7 +2343,7 @@ public class StorageProxy implements StorageProxyMBean { if (!DatabaseDescriptor.hintedHandoffEnabled()) return false; - if (replica.isTransient() || replica.isLocal()) + if (replica.isTransient() || replica.isSelf()) return false; Set<String> disabledDCs = DatabaseDescriptor.hintedHandoffDisabledDCs(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 391598c..caa732a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1060,8 +1060,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void gossipSnitchInfo() { IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()); - String rack = snitch.getRack(FBUtilities.getBroadcastAddressAndPort()); + String dc = snitch.getLocalDatacenter(); + String rack = snitch.getLocalRack(); Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc)); Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack)); } @@ -1835,7 +1835,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private boolean isLocalDC(InetAddressAndPort targetHost) { String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost); - String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String localDC = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); return remoteDC.equals(localDC); } @@ -4073,7 +4073,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { PendingRangeCalculatorService.instance.blockUntilFinished(); - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String dc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges { http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 6881a2f..8d0f14c 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -138,7 +138,7 @@ public abstract class AbstractReadExecutor for (Replica replica: replicas) { InetAddressAndPort endpoint = replica.endpoint(); - if (replica.isLocal()) + if (replica.isSelf()) { hasLocalEndpoint = true; continue; http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java index b16d105..2e4440f 100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java @@ -181,7 +181,7 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime); ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime); - if (source.isLocal()) + if (source.isSelf()) StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); else MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source.endpoint(), handler); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index ea54f9d..3fcabd0 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -104,8 +104,8 @@ public class StreamPlan public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies) { //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node - assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); - assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); + assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); + assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); StreamSession session = coordinator.getOrCreateNextSession(from); session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 80fcebb..08a1b07 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -311,8 +311,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies) { //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node - assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); - assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); + assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); + assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29f83b88/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java index 41564d9..c2b9fc9 100644 --- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java +++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java @@ -28,9 +28,12 @@ import org.junit.Test; import org.junit.matchers.JUnitMatchers; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; public class BatchlogEndpointFilterTest { @@ -47,10 +50,10 @@ public class BatchlogEndpointFilterTest .put("2", InetAddressAndPort.getByName("2")) .put("2", InetAddressAndPort.getByName("22")) .build(); - Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints); assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11"))); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("22"))); + assertTrue(result.contains(InetAddressAndPort.getByName("11"))); + assertTrue(result.contains(InetAddressAndPort.getByName("22"))); } @Test @@ -61,10 +64,10 @@ public class BatchlogEndpointFilterTest .put(LOCAL, InetAddressAndPort.getByName("00")) .put("1", InetAddressAndPort.getByName("1")) .build(); - Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints); assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1"))); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0"))); + assertTrue(result.contains(InetAddressAndPort.getByName("1"))); + assertTrue(result.contains(InetAddressAndPort.getByName("0"))); } @Test @@ -73,9 +76,9 @@ public class BatchlogEndpointFilterTest Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder() .put(LOCAL, InetAddressAndPort.getByName("0")) .build(); - Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints); assertThat(result.size(), is(1)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0"))); + assertTrue(result.contains(InetAddressAndPort.getByName("0"))); } @Test @@ -88,12 +91,12 @@ public class BatchlogEndpointFilterTest .put("1", InetAddressAndPort.getByName("11")) .put("1", InetAddressAndPort.getByName("111")) .build(); - Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints); // result should be the last two non-local replicas // (Collections.shuffle has been replaced with Collections.reverse for testing) assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11"))); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111"))); + assertTrue(result.contains(InetAddressAndPort.getByName("11"))); + assertTrue(result.contains(InetAddressAndPort.getByName("111"))); } @Test @@ -105,40 +108,22 @@ public class BatchlogEndpointFilterTest .put(LOCAL, InetAddressAndPort.getByName("111")) .put(LOCAL, InetAddressAndPort.getByName("1111")) .build(); - Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints); // result should be the last two non-local replicas // (Collections.shuffle has been replaced with Collections.reverse for testing) assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111"))); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1111"))); + assertTrue(result.contains(InetAddressAndPort.getByName("111"))); + assertTrue(result.contains(InetAddressAndPort.getByName("1111"))); } - private static class TestEndpointFilter extends BatchlogManager.EndpointFilter + private Collection<InetAddressAndPort> filterBatchlogEndpoints(Multimap<String, InetAddressAndPort> endpoints) { - TestEndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints) - { - super(localRack, endpoints); - } - - @Override - protected boolean isValid(InetAddressAndPort input) - { - // We will use always alive non-localhost endpoints - return true; - } - - @Override - protected int getRandomInt(int bound) - { - // We don't need random behavior here - return bound - 1; - } - - @Override - protected void shuffle(List<?> list) - { - // We don't need random behavior here - Collections.reverse(list); - } + return ReplicaPlans.filterBatchlogEndpoints(LOCAL, endpoints, + // Reverse instead of shuffle + Collections::reverse, + // Always alive + (addr) -> true, + // Always pick the last + (size) -> size - 1); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org