Repository: cassandra Updated Branches: refs/heads/trunk 0379201c7 -> 8554d6b35
LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-14735 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8554d6b3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8554d6b3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8554d6b3 Branch: refs/heads/trunk Commit: 8554d6b35dcc5eec46ed7edc809a36c1f7fa588f Parents: 0379201 Author: Benedict Elliott Smith <[email protected]> Authored: Thu Sep 20 08:54:55 2018 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Wed Sep 26 10:55:11 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ConsistencyLevel.java | 233 ++----------------- .../apache/cassandra/locator/InOurDcTester.java | 93 ++++++++ .../apache/cassandra/locator/ReplicaPlan.java | 3 - .../apache/cassandra/locator/ReplicaPlans.java | 193 +++++++++++++-- .../org/apache/cassandra/locator/Replicas.java | 65 +++++- .../service/DatacenterWriteResponseHandler.java | 7 +- .../apache/cassandra/service/StorageProxy.java | 6 +- .../reads/repair/BlockingPartitionRepair.java | 27 ++- .../reads/repair/BlockingReadRepairTest.java | 10 +- .../DiagEventsBlockingReadRepairTest.java | 23 +- .../service/reads/repair/ReadRepairTest.java | 9 +- 12 files changed, 373 insertions(+), 297 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9f7958c..9139822 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735) * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780) * Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763) * Add a check for receiving digest response from transient node (CASSANDRA-14750) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 5a4baf7..9e884a7 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -17,26 +17,18 @@ */ package org.apache.cassandra.db; -import java.util.HashMap; -import java.util.Map; -import com.google.common.collect.Iterables; +import com.carrotsearch.hppc.ObjectIntOpenHashMap; import org.apache.cassandra.locator.Endpoints; -import org.apache.cassandra.locator.ReplicaCollection; -import org.apache.cassandra.locator.Replicas; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.transport.ProtocolException; +import static org.apache.cassandra.locator.Replicas.countInOurDc; + public enum ConsistencyLevel { ANY (0), @@ -52,8 +44,6 @@ public enum ConsistencyLevel LOCAL_ONE (10, true), NODE_LOCAL (11, true); - private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class); - // Used by the binary protocol public final int code; private final boolean isDCLocal; @@ -90,18 +80,27 @@ public enum ConsistencyLevel return codeIdx[code]; } - private int quorumFor(Keyspace keyspace) + public static int quorumFor(Keyspace keyspace) { return (keyspace.getReplicationStrategy().getReplicationFactor().allReplicas / 2) + 1; } - private int localQuorumFor(Keyspace keyspace, String dc) + public static int localQuorumFor(Keyspace keyspace, String dc) { return (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc).allReplicas / 2) + 1 : quorumFor(keyspace); } + public static ObjectIntOpenHashMap<String> eachQuorumFor(Keyspace keyspace) + { + NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); + ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(strategy.getDatacenters().size()); + for (String dc : strategy.getDatacenters()) + perDc.put(dc, ConsistencyLevel.localQuorumFor(keyspace, dc)); + return perDc; + } + public int blockFor(Keyspace keyspace) { switch (this) @@ -152,7 +151,7 @@ public enum ConsistencyLevel break; case LOCAL_ONE: case LOCAL_QUORUM: case LOCAL_SERIAL: // we will only count local replicas towards our response count, as these queries only care about local guarantees - blockFor += countDCLocalReplicas(pending).allReplicas(); + blockFor += countInOurDc(pending).allReplicas(); break; case ONE: case TWO: case THREE: case QUORUM: case EACH_QUORUM: @@ -176,208 +175,6 @@ public enum ConsistencyLevel return isDCLocal; } - public static boolean isLocal(InetAddressAndPort endpoint) - { - return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint)); - } - - public static boolean isLocal(Replica replica) - { - return isLocal(replica.endpoint()); - } - - private static ReplicaCount countDCLocalReplicas(ReplicaCollection<?> liveReplicas) - { - ReplicaCount count = new ReplicaCount(); - for (Replica replica : liveReplicas) - if (isLocal(replica)) - count.increment(replica); - return count; - } - - private static class ReplicaCount - { - int fullReplicas; - int transientReplicas; - - int allReplicas() - { - return fullReplicas + transientReplicas; - } - - void increment(Replica replica) - { - if (replica.isFull()) ++fullReplicas; - else ++transientReplicas; - } - - boolean isSufficient(int allReplicas, int fullReplicas) - { - return this.fullReplicas >= fullReplicas - && this.allReplicas() >= allReplicas; - } - } - - private static Map<String, ReplicaCount> countPerDCEndpoints(Keyspace keyspace, Iterable<Replica> liveReplicas) - { - NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); - - Map<String, ReplicaCount> dcEndpoints = new HashMap<>(); - for (String dc: strategy.getDatacenters()) - dcEndpoints.put(dc, new ReplicaCount()); - - for (Replica replica : liveReplicas) - { - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); - dcEndpoints.get(dc).increment(replica); - } - return dcEndpoints; - } - - public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E liveReplicas) - { - return filterForQuery(keyspace, liveReplicas, false); - } - - public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E liveReplicas, boolean alwaysSpeculate) - { - /* - * If we are doing an each quorum query, we have to make sure that the endpoints we select - * provide a quorum for each data center. If we are not using a NetworkTopologyStrategy, - * we should fall through and grab a quorum in the replication strategy. - * - * We do not speculate for EACH_QUORUM. - */ - if (this == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) - return filterForEachQuorum(keyspace, liveReplicas); - - int count = blockFor(keyspace) + (alwaysSpeculate ? 1 : 0); - return isDCLocal - ? liveReplicas.filter(ConsistencyLevel::isLocal, count) - : liveReplicas.subList(0, Math.min(liveReplicas.size(), count)); - } - - private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, E liveReplicas) - { - NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); - Map<String, Integer> dcsReplicas = new HashMap<>(); - for (String dc : strategy.getDatacenters()) - { - // we put _up to_ dc replicas only - dcsReplicas.put(dc, localQuorumFor(keyspace, dc)); - } - - return liveReplicas.filter((replica) -> { - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); - int replicas = dcsReplicas.get(dc); - if (replicas > 0) - { - dcsReplicas.put(dc, --replicas); - return true; - } - return false; - }); - } - - public boolean isSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas) - { - switch (this) - { - case ANY: - // local hint is acceptable, and local node is always live - return true; - case LOCAL_ONE: - return countDCLocalReplicas(liveReplicas).isSufficient(1, 1); - case LOCAL_QUORUM: - return countDCLocalReplicas(liveReplicas).isSufficient(blockFor(keyspace), 1); - case EACH_QUORUM: - if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) - { - int fullCount = 0; - for (Map.Entry<String, ReplicaCount> entry : countPerDCEndpoints(keyspace, liveReplicas).entrySet()) - { - ReplicaCount count = entry.getValue(); - if (!count.isSufficient(localQuorumFor(keyspace, entry.getKey()), 0)) - return false; - fullCount += count.fullReplicas; - } - return fullCount > 0; - } - // Fallthough on purpose for SimpleStrategy - default: - return liveReplicas.size() >= blockFor(keyspace) - && Replicas.countFull(liveReplicas) > 0; - } - } - - public void assureSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException - { - assureSufficientLiveReplicas(keyspace, liveReplicas, blockFor(keyspace), 1); - } - public void assureSufficientLiveReplicasForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException - { - assureSufficientLiveReplicas(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0); - } - void assureSufficientLiveReplicas(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException - { - switch (this) - { - case ANY: - // local hint is acceptable, and local node is always live - break; - case LOCAL_ONE: - { - ReplicaCount localLive = countDCLocalReplicas(allLive); - if (!localLive.isSufficient(blockFor, blockForFullReplicas)) - throw UnavailableException.create(this, 1, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas); - break; - } - case LOCAL_QUORUM: - { - ReplicaCount localLive = countDCLocalReplicas(allLive); - if (!localLive.isSufficient(blockFor, blockForFullReplicas)) - { - if (logger.isTraceEnabled()) - { - logger.trace(String.format("Local replicas %s are insufficient to satisfy LOCAL_QUORUM requirement of %d live replicas and %d full replicas in '%s'", - allLive.filter(ConsistencyLevel::isLocal), blockFor, blockForFullReplicas, DatabaseDescriptor.getLocalDataCenter())); - } - throw UnavailableException.create(this, blockFor, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas); - } - break; - } - case EACH_QUORUM: - if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) - { - int total = 0; - int totalFull = 0; - for (Map.Entry<String, ReplicaCount> entry : countPerDCEndpoints(keyspace, allLive).entrySet()) - { - int dcBlockFor = localQuorumFor(keyspace, entry.getKey()); - ReplicaCount dcCount = entry.getValue(); - if (!dcCount.isSufficient(dcBlockFor, 0)) - throw UnavailableException.create(this, entry.getKey(), dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas); - totalFull += dcCount.fullReplicas; - total += dcCount.allReplicas(); - } - if (totalFull < blockForFullReplicas) - throw UnavailableException.create(this, blockFor, total, blockForFullReplicas, totalFull); - break; - } - // Fallthough on purpose for SimpleStrategy - default: - int live = allLive.size(); - int full = Replicas.countFull(allLive); - if (live < blockFor || full < blockForFullReplicas) - { - if (logger.isTraceEnabled()) - logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(allLive), blockFor); - throw UnavailableException.create(this, blockFor, blockForFullReplicas, live, full); - } - break; - } - } - public void validateForRead(String keyspaceName) throws InvalidRequestException { switch (this) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/locator/InOurDcTester.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/InOurDcTester.java b/src/java/org/apache/cassandra/locator/InOurDcTester.java new file mode 100644 index 0000000..23a8c13 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/InOurDcTester.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.FBUtilities; +import java.util.function.Predicate; + +public class InOurDcTester +{ + private static ReplicaTester replicas; + private static EndpointTester endpoints; + + final String dc; + final IEndpointSnitch snitch; + + private InOurDcTester(String dc, IEndpointSnitch snitch) + { + this.dc = dc; + this.snitch = snitch; + } + + boolean stale() + { + return dc != DatabaseDescriptor.getLocalDataCenter() + || snitch != DatabaseDescriptor.getEndpointSnitch() + // this final clause checks if somehow the snitch/localDc have got out of whack; + // presently, this is possible but very unlikely, but this check will also help + // resolve races on these global fields as well + || !dc.equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())); + } + + private static final class ReplicaTester extends InOurDcTester implements Predicate<Replica> + { + private ReplicaTester(String dc, IEndpointSnitch snitch) + { + super(dc, snitch); + } + + @Override + public boolean test(Replica replica) + { + return dc.equals(snitch.getDatacenter(replica.endpoint())); + } + } + + private static final class EndpointTester extends InOurDcTester implements Predicate<InetAddressAndPort> + { + private EndpointTester(String dc, IEndpointSnitch snitch) + { + super(dc, snitch); + } + + @Override + public boolean test(InetAddressAndPort endpoint) + { + return dc.equals(snitch.getDatacenter(endpoint)); + } + } + + public static Predicate<Replica> replicas() + { + ReplicaTester cur = replicas; + if (cur == null || cur.stale()) + replicas = cur = new ReplicaTester(DatabaseDescriptor.getLocalDataCenter(), DatabaseDescriptor.getEndpointSnitch()); + return cur; + } + + public static Predicate<InetAddressAndPort> endpoints() + { + EndpointTester cur = endpoints; + if (cur == null || cur.stale()) + endpoints = cur = new EndpointTester(DatabaseDescriptor.getLocalDataCenter(), DatabaseDescriptor.getEndpointSnitch()); + return cur; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/locator/ReplicaPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java index 4d6127b..861c912 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java @@ -50,7 +50,6 @@ public abstract class ReplicaPlan<E extends Endpoints<E>> } public abstract int blockFor(); - public abstract void assureSufficientReplicas(); public E contacts() { return contacts; } public boolean contacts(Replica replica) { return contacts.contains(replica); } @@ -70,7 +69,6 @@ public abstract class ReplicaPlan<E extends Endpoints<E>> } public int blockFor() { return consistencyLevel.blockFor(keyspace); } - public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForRead(keyspace, candidates()); } public E candidates() { return candidates; } @@ -142,7 +140,6 @@ public abstract class ReplicaPlan<E extends Endpoints<E>> } public int blockFor() { return consistencyLevel.blockForWrite(keyspace, pending()); } - public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForWrite(keyspace, live(), pending()); } /** Replicas that a region of the ring is moving to; not yet ready to serve reads, but should receive writes */ public E pending() { return pending; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/locator/ReplicaPlans.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index 25f42c3..3d56a73 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -18,7 +18,10 @@ package org.apache.cassandra.locator; +import com.carrotsearch.hppc.ObjectIntOpenHashMap; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; @@ -30,7 +33,8 @@ import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; -import org.apache.cassandra.utils.FBUtilities; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.function.Predicate; @@ -38,9 +42,117 @@ import java.util.function.Predicate; import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.limit; +import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM; +import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumFor; +import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor; +import static org.apache.cassandra.locator.Replicas.countInOurDc; +import static org.apache.cassandra.locator.Replicas.countPerDc; public class ReplicaPlans { + private static final Logger logger = LoggerFactory.getLogger(ReplicaPlans.class); + + public static boolean isSufficientLiveReplicasForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) + { + switch (consistencyLevel) + { + case ANY: + // local hint is acceptable, and local node is always live + return true; + case LOCAL_ONE: + return countInOurDc(liveReplicas).hasAtleast(1, 1); + case LOCAL_QUORUM: + return countInOurDc(liveReplicas).hasAtleast(consistencyLevel.blockFor(keyspace), 1); + case EACH_QUORUM: + if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) + { + int fullCount = 0; + Collection<String> dcs = ((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getDatacenters(); + for (ObjectObjectCursor<String, Replicas.ReplicaCount> entry : countPerDc(dcs, liveReplicas)) + { + Replicas.ReplicaCount count = entry.value; + if (!count.hasAtleast(localQuorumFor(keyspace, entry.key), 0)) + return false; + fullCount += count.fullReplicas(); + } + return fullCount > 0; + } + // Fallthough on purpose for SimpleStrategy + default: + return liveReplicas.size() >= consistencyLevel.blockFor(keyspace) + && Replicas.countFull(liveReplicas) > 0; + } + } + + static void assureSufficientLiveReplicasForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws UnavailableException + { + assureSufficientLiveReplicas(keyspace, consistencyLevel, liveReplicas, consistencyLevel.blockFor(keyspace), 1); + } + static void assureSufficientLiveReplicasForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException + { + assureSufficientLiveReplicas(keyspace, consistencyLevel, allLive, consistencyLevel.blockForWrite(keyspace, pendingWithDown), 0); + } + static void assureSufficientLiveReplicas(Keyspace keyspace, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException + { + switch (consistencyLevel) + { + case ANY: + // local hint is acceptable, and local node is always live + break; + case LOCAL_ONE: + { + Replicas.ReplicaCount localLive = countInOurDc(allLive); + if (!localLive.hasAtleast(blockFor, blockForFullReplicas)) + throw UnavailableException.create(consistencyLevel, 1, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas()); + break; + } + case LOCAL_QUORUM: + { + Replicas.ReplicaCount localLive = countInOurDc(allLive); + if (!localLive.hasAtleast(blockFor, blockForFullReplicas)) + { + if (logger.isTraceEnabled()) + { + logger.trace(String.format("Local replicas %s are insufficient to satisfy LOCAL_QUORUM requirement of %d live replicas and %d full replicas in '%s'", + allLive.filter(InOurDcTester.replicas()), blockFor, blockForFullReplicas, DatabaseDescriptor.getLocalDataCenter())); + } + throw UnavailableException.create(consistencyLevel, blockFor, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas()); + } + break; + } + case EACH_QUORUM: + if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) + { + int total = 0; + int totalFull = 0; + Collection<String> dcs = ((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getDatacenters(); + for (ObjectObjectCursor<String, Replicas.ReplicaCount> entry : countPerDc(dcs, allLive)) + { + int dcBlockFor = ConsistencyLevel.localQuorumFor(keyspace, entry.key); + Replicas.ReplicaCount dcCount = entry.value; + if (!dcCount.hasAtleast(dcBlockFor, 0)) + throw UnavailableException.create(consistencyLevel, entry.key, dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas()); + totalFull += dcCount.fullReplicas(); + total += dcCount.allReplicas(); + } + if (totalFull < blockForFullReplicas) + throw UnavailableException.create(consistencyLevel, blockFor, total, blockForFullReplicas, totalFull); + break; + } + // Fallthough on purpose for SimpleStrategy + default: + int live = allLive.size(); + int full = Replicas.countFull(allLive); + if (live < blockFor || full < blockForFullReplicas) + { + if (logger.isTraceEnabled()) + logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(allLive), blockFor); + throw UnavailableException.create(consistencyLevel, blockFor, blockForFullReplicas, live, full); + } + break; + } + } + /** * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive. @@ -109,9 +221,8 @@ public class ReplicaPlans public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException { EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live); - ReplicaPlan.ForTokenWrite result = new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts); - result.assureSufficientReplicas(); - return result; + assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending()); + return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts); } public interface Selector @@ -154,7 +265,7 @@ public class ReplicaPlans if (!any(liveAndDown.all(), Replica::isTransient)) return liveAndDown.all(); - assert consistencyLevel != ConsistencyLevel.EACH_QUORUM; + assert consistencyLevel != EACH_QUORUM; ReplicaCollection.Mutable<E> contacts = liveAndDown.all().newMutable(liveAndDown.all().size()); contacts.addAll(filter(liveAndDown.natural(), Replica::isFull)); @@ -186,11 +297,7 @@ public class ReplicaPlans { // TODO: we should cleanup our semantics here, as we're filtering ALL nodes to localDC which is unexpected for ReplicaPlan // Restrict natural and pending to node in the local DC only - String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica)); - - liveAndDown = liveAndDown.filter(isLocalDc); + liveAndDown = liveAndDown.filter(InOurDcTester.replicas()); } ReplicaLayout.ForTokenWrite live = liveAndDown.filter(FailureDetector.isReplicaAlive); @@ -215,6 +322,45 @@ public class ReplicaPlans return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, requiredParticipants); } + + private static <E extends Endpoints<E>> E candidatesForRead(ConsistencyLevel consistencyLevel, E liveNaturalReplicas) + { + return consistencyLevel.isDatacenterLocal() + ? liveNaturalReplicas.filter(InOurDcTester.replicas()) + : liveNaturalReplicas; + } + + private static <E extends Endpoints<E>> E contactForEachQuorumRead(Keyspace keyspace, E candidates) + { + assert keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy; + ObjectIntOpenHashMap<String> perDc = eachQuorumFor(keyspace); + + final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + return candidates.filter(replica -> { + String dc = snitch.getDatacenter(replica); + return perDc.addTo(dc, -1) >= 0; + }); + } + + private static <E extends Endpoints<E>> E contactForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates) + { + /* + * If we are doing an each quorum query, we have to make sure that the endpoints we select + * provide a quorum for each data center. If we are not using a NetworkTopologyStrategy, + * we should fall through and grab a quorum in the replication strategy. + * + * We do not speculate for EACH_QUORUM. + * + * TODO: this is still very inconistently managed between {LOCAL,EACH}_QUORUM and other consistency levels - should address this in a follow-up + */ + if (consistencyLevel == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) + return contactForEachQuorumRead(keyspace, candidates); + + int count = consistencyLevel.blockFor(keyspace) + (alwaysSpeculate ? 1 : 0); + return candidates.subList(0, Math.min(count, candidates.size())); + } + + /** * Construct a plan for reading from a single node - this permits no speculation or read-repair */ @@ -239,18 +385,16 @@ public class ReplicaPlans * - candidates who are: alive, replicate the token, and are sorted by their snitch scores * - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) candidates * - * The candidate collection can be used for speculation, although at present it would break - * LOCAL_QUORUM and EACH_QUORUM to do so without further filtering + * The candidate collection can be used for speculation, although at present + * it would break EACH_QUORUM to do so without further filtering */ public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry) { - ReplicaLayout.ForTokenRead candidates = ReplicaLayout.forTokenReadLiveSorted(keyspace, token); - EndpointsForToken contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural(), - retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE)); + EndpointsForToken candidates = candidatesForRead(consistencyLevel, ReplicaLayout.forTokenReadLiveSorted(keyspace, token).natural()); + EndpointsForToken contacts = contactForRead(keyspace, consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), candidates); - ReplicaPlan.ForTokenRead result = new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates.natural(), contacts); - result.assureSufficientReplicas(); // Throw UAE early if we don't have enough replicas. - return result; + assureSufficientLiveReplicasForRead(keyspace, consistencyLevel, contacts); + return new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates, contacts); } /** @@ -262,12 +406,11 @@ public class ReplicaPlans */ public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range) { - ReplicaLayout.ForRangeRead candidates = ReplicaLayout.forRangeReadLiveSorted(keyspace, range); - EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural()); + EndpointsForRange candidates = candidatesForRead(consistencyLevel, ReplicaLayout.forRangeReadLiveSorted(keyspace, range).natural()); + EndpointsForRange contacts = contactForRead(keyspace, consistencyLevel, false, candidates); - ReplicaPlan.ForRangeRead result = new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, candidates.natural(), contacts); - result.assureSufficientReplicas(); - return result; + assureSufficientLiveReplicasForRead(keyspace, consistencyLevel, contacts); + return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, candidates, contacts); } /** @@ -280,10 +423,10 @@ public class ReplicaPlans EndpointsForRange mergedCandidates = left.candidates().keep(right.candidates().endpoints()); // Check if there are enough shared endpoints for the merge to be possible. - if (!consistencyLevel.isSufficientLiveReplicasForRead(keyspace, mergedCandidates)) + if (!isSufficientLiveReplicasForRead(keyspace, consistencyLevel, mergedCandidates)) return null; - EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, mergedCandidates); + EndpointsForRange contacts = contactForRead(keyspace, consistencyLevel, false, mergedCandidates); // Estimate whether merging will be a win or not if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, left.contacts(), right.contacts())) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/locator/Replicas.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Replicas.java b/src/java/org/apache/cassandra/locator/Replicas.java index 299e6ec..6c80134 100644 --- a/src/java/org/apache/cassandra/locator/Replicas.java +++ b/src/java/org/apache/cassandra/locator/Replicas.java @@ -19,24 +19,85 @@ package org.apache.cassandra.locator; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.function.Predicate; +import com.carrotsearch.hppc.ObjectObjectOpenHashMap; import com.google.common.collect.Iterables; +import org.apache.cassandra.config.DatabaseDescriptor; import static com.google.common.collect.Iterables.all; public class Replicas { - public static int countFull(ReplicaCollection<?> liveReplicas) + public static int countFull(ReplicaCollection<?> replicas) { int count = 0; - for (Replica replica : liveReplicas) + for (Replica replica : replicas) if (replica.isFull()) ++count; return count; } + public static class ReplicaCount + { + int fullReplicas; + int transientReplicas; + + public int allReplicas() + { + return fullReplicas + transientReplicas; + } + + public int fullReplicas() + { + return fullReplicas; + } + + public int transientReplicas() + { + return transientReplicas; + } + + public void increment(Replica replica) + { + if (replica.isFull()) ++fullReplicas; + else ++transientReplicas; + } + + public boolean hasAtleast(int allReplicas, int fullReplicas) + { + return this.fullReplicas >= fullReplicas + && this.allReplicas() >= allReplicas; + } + } + + public static ReplicaCount countInOurDc(ReplicaCollection<?> replicas) + { + ReplicaCount count = new ReplicaCount(); + Predicate<Replica> inOurDc = InOurDcTester.replicas(); + for (Replica replica : replicas) + if (inOurDc.test(replica)) + count.increment(replica); + return count; + } + + public static ObjectObjectOpenHashMap<String, ReplicaCount> countPerDc(Collection<String> dataCenters, Iterable<Replica> liveReplicas) + { + ObjectObjectOpenHashMap<String, ReplicaCount> perDc = new ObjectObjectOpenHashMap<>(dataCenters.size()); + for (String dc: dataCenters) + perDc.put(dc, new ReplicaCount()); + + for (Replica replica : liveReplicas) + { + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); + perDc.get(dc).increment(replica); + } + return perDc; + } + /** * A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index a3ef76f..f30b452 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -18,15 +18,20 @@ package org.apache.cassandra.service; import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.locator.InOurDcTester; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.net.MessageIn; +import java.util.function.Predicate; + /** * This class blocks for a quorum of responses _in the local datacenter only_ (CL.LOCAL_QUORUM). */ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> { + private final Predicate<InetAddressAndPort> waitingFor = InOurDcTester.endpoints(); + public DatacenterWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, Runnable callback, WriteType writeType, @@ -54,6 +59,6 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> @Override protected boolean waitingFor(InetAddressAndPort from) { - return replicaPlan.consistencyLevel().isLocal(from); + return waitingFor.test(from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 0d52afa..c6315ff 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1435,8 +1435,8 @@ public class StorageProxy implements StorageProxyMBean Keyspace keyspace = Keyspace.open(keyspaceName); Token tk = cm.key().getToken(); - ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, ReplicaPlans.writeAll) - .assureSufficientReplicas(); + // we build this ONLY to perform the sufficiency check that happens on construction + ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, ReplicaPlans.writeAll); // Forward the actual update to the chosen leader replica AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, replica), @@ -2088,8 +2088,6 @@ public class StorageProxy implements StorageProxyMBean ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler = new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime); - replicaPlan.assureSufficientReplicas(); - // If enabled, request repaired data tracking info from full replicas but // only if there are multiple full replicas to compare results from if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled() http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java index f536ea8..624c78f 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java @@ -23,9 +23,11 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractFuture; @@ -41,6 +43,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.Replicas; +import org.apache.cassandra.locator.InOurDcTester; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.MessageIn; @@ -56,14 +59,21 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl private final P replicaPlan; private final Map<Replica, Mutation> pendingRepairs; private final CountDownLatch latch; + private final Predicate<InetAddressAndPort> shouldBlockOn; private volatile long mutationsSentTime; public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) { + this(key, repairs, maxBlockFor, replicaPlan, + replicaPlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue()); + } + public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan, Predicate<InetAddressAndPort> shouldBlockOn) + { this.key = key; this.pendingRepairs = new ConcurrentHashMap<>(repairs); this.replicaPlan = replicaPlan; + this.shouldBlockOn = shouldBlockOn; // here we remove empty repair mutations from the block for total, since // we're not sending them mutations @@ -72,7 +82,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl { // remote dcs can sometimes get involved in dc-local reads. We want to repair // them if they do, but they shouldn't interfere with blocking the client read. - if (!repairs.containsKey(participant) && shouldBlockOn(participant.endpoint())) + if (!repairs.containsKey(participant) && shouldBlockOn.test(participant.endpoint())) blockFor--; } @@ -91,20 +101,9 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl } @VisibleForTesting - boolean isLocal(InetAddressAndPort endpoint) - { - return ConsistencyLevel.isLocal(endpoint); - } - - private boolean shouldBlockOn(InetAddressAndPort endpoint) - { - return !replicaPlan.consistencyLevel().isDatacenterLocal() || isLocal(endpoint); - } - - @VisibleForTesting void ack(InetAddressAndPort from) { - if (shouldBlockOn(from)) + if (shouldBlockOn.test(from)) { pendingRepairs.remove(replicaPlan.getReplicaFor(from)); latch.countDown(); @@ -161,7 +160,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), destination.endpoint()); ColumnFamilyStore.metricsFor(tableId).readRepairRequests.mark(); - if (!shouldBlockOn(destination.endpoint())) + if (!shouldBlockOn.test(destination.endpoint())) pendingRepairs.remove(destination); ReadRepairDiagnostics.sendInitialRepair(this, destination.endpoint(), mutation); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java index 6bb1b7a..34bbf32 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java @@ -38,7 +38,6 @@ import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.service.reads.ReadCallback; @@ -50,7 +49,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest { public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) { - super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaPlan); + super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaPlan, + e -> targets.contains(e)); } Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>(); @@ -59,12 +59,6 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest { mutationsSent.put(endpoint, message.payload); } - - @Override - protected boolean isLocal(InetAddressAndPort endpoint) - { - return targets.contains(endpoint); - } } @BeforeClass http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java index c64a73b..2471ffd 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import com.google.common.collect.Lists; import org.apache.cassandra.locator.ReplicaPlan; @@ -43,7 +44,6 @@ import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.service.reads.ReadCallback; import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType; @@ -169,9 +169,15 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest { private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>(); + private static Predicate<InetAddressAndPort> isLocal() + { + List<InetAddressAndPort> candidates = targets; + return e -> candidates.contains(e); + } + DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) { - super(key, repairs, maxBlockFor, replicaPlan); + super(key, repairs, maxBlockFor, replicaPlan, isLocal()); DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent); } @@ -184,18 +190,5 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort endpoint) { } - - List<InetAddressAndPort> candidates = targets; - - protected List<InetAddressAndPort> getCandidateEndpoints() - { - return candidates; - } - - @Override - protected boolean isLocal(InetAddressAndPort endpoint) - { - return targets.contains(endpoint); - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java index b678b4d..c3f05c0 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java @@ -76,7 +76,8 @@ public class ReadRepairTest { public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan) { - super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaPlan); + super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaPlan, + e -> replicaPlan.consistencyLevel().isDatacenterLocal() && targets.endpoints().contains(e)); } Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>(); @@ -85,12 +86,6 @@ public class ReadRepairTest { mutationsSent.put(endpoint, message.payload); } - - @Override - protected boolean isLocal(InetAddressAndPort endpoint) - { - return targets.endpoints().contains(endpoint); - } } static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
