Repository: cassandra Updated Branches: refs/heads/trunk 994da255c -> 644676b08
Improve read repair blocking behavior Patch by Blake Eggleston; reviewed by Marcus Eriksson and Alex Petrov for CASSANDRA-10726 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/644676b0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/644676b0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/644676b0 Branch: refs/heads/trunk Commit: 644676b088be5177ef1d0cdaf450306ea28d8a12 Parents: 994da25 Author: Blake Eggleston <[email protected]> Authored: Mon May 14 14:24:03 2018 -0700 Committer: Blake Eggleston <[email protected]> Committed: Tue Aug 21 11:01:10 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ConsistencyLevel.java | 10 +- .../cassandra/db/PartitionRangeReadCommand.java | 1 + .../org/apache/cassandra/db/ReadCommand.java | 1 + .../db/SinglePartitionReadCommand.java | 1 + .../cassandra/metrics/ReadRepairMetrics.java | 5 + .../apache/cassandra/net/MessagingService.java | 1 + .../apache/cassandra/service/StorageProxy.java | 65 +++- .../service/reads/AbstractReadExecutor.java | 40 +- .../reads/repair/BlockingPartitionRepair.java | 243 +++++++++++++ .../reads/repair/BlockingReadRepair.java | 229 ++++++------ .../reads/repair/BlockingReadRepairs.java | 114 ++++++ .../service/reads/repair/NoopReadRepair.java | 29 +- .../repair/PartitionIteratorMergeListener.java | 13 +- .../service/reads/repair/ReadRepair.java | 44 ++- .../service/reads/repair/RepairListener.java | 34 -- .../reads/repair/RowIteratorMergeListener.java | 31 +- .../service/reads/DataResolverTest.java | 18 +- .../service/reads/repair/ReadRepairTest.java | 361 +++++++++++++++++++ .../reads/repair/TestableReadRepair.java | 41 ++- 20 files changed, 1046 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9fbaf25..b34979a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Improve read repair blocking behavior (CASSANDRA-10726) * Add a virtual table to expose settings (CASSANDRA-14573) * Fix up chunk cache handling of metrics (CASSANDRA-14628) * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652) http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 8f3a51c..d37da0a 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -138,12 +138,20 @@ public enum ConsistencyLevel } } + /** + * Determine if this consistency level meets or exceeds the consistency requirements of the given cl for the given keyspace + */ + public boolean satisfies(ConsistencyLevel other, Keyspace keyspace) + { + return blockFor(keyspace) >= other.blockFor(keyspace); + } + public boolean isDatacenterLocal() { return isDCLocal; } - public boolean isLocal(InetAddressAndPort endpoint) + public static boolean isLocal(InetAddressAndPort endpoint) { return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index a6641d4..c312acc 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -24,6 +24,7 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.*; http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 82ca054..0262140 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.transform.RTBoundCloser; import org.apache.cassandra.db.transform.RTBoundValidator; import org.apache.cassandra.db.transform.StoppingTransformation; import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.UnknownIndexException; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.IndexNotAvailableException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index c091bf1..1fdb11f 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -36,6 +36,7 @@ import org.apache.cassandra.db.lifecycle.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReadsListener; http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java index c79fe89..e639be9 100644 --- a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java @@ -34,4 +34,9 @@ public class ReadRepairMetrics public static final Meter repairedBackground = Metrics.meter(factory.createMetricName("RepairedBackground")); @Deprecated public static final Meter attempted = Metrics.meter(factory.createMetricName("Attempted")); + + public static final Meter speculatedRead = Metrics.meter(factory.createMetricName("SpeculatedRead")); + public static final Meter speculatedWrite = Metrics.meter(factory.createMetricName("SpeculatedWrite")); + + public static void init() {} } http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index b2e72f4..7732673 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -125,6 +125,7 @@ public final class MessagingService implements MessagingServiceMBean public static final int VERSION_30 = 10; public static final int VERSION_3014 = 11; public static final int VERSION_40 = 12; + public static final int minimum_version = VERSION_30; public static final int current_version = VERSION_40; public static final byte[] ONE_BYTE = new byte[1]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 81e6dae..7fdf591 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -183,6 +183,8 @@ public class StorageProxy implements StorageProxyMBean readMetricsMap.put(level, new ClientRequestMetrics("Read-" + level.name())); writeMetricsMap.put(level, new ClientWriteRequestMetrics("Write-" + level.name())); } + + ReadRepairMetrics.init(); } /** @@ -1766,6 +1768,34 @@ public class StorageProxy implements StorageProxyMBean } } + private static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair> repairs) + { + PartitionIterator concatenated = PartitionIterators.concat(iterators); + + if (repairs.isEmpty()) + return concatenated; + + return new PartitionIterator() + { + public void close() + { + concatenated.close(); + repairs.forEach(ReadRepair::maybeSendAdditionalWrites); + repairs.forEach(ReadRepair::awaitWrites); + } + + public boolean hasNext() + { + return concatenated.hasNext(); + } + + public RowIterator next() + { + return concatenated.next(); + } + }; + } + /** * This function executes local and remote reads, and blocks for the results: * @@ -1784,43 +1814,59 @@ public class StorageProxy implements StorageProxyMBean AbstractReadExecutor[] reads = new AbstractReadExecutor[cmdCount]; + // Get the replica locations, sorted by response time according to the snitch, and create a read executor + // for type of speculation we'll use in this read for (int i=0; i<cmdCount; i++) { reads[i] = AbstractReadExecutor.getReadExecutor(commands.get(i), consistencyLevel, queryStartNanoTime); } + // sends a data request to the closest replica, and a digest request to the others. If we have a speculating + // read executoe, we'll only send read requests to enough replicas to satisfy the consistency level for (int i=0; i<cmdCount; i++) { reads[i].executeAsync(); } + // if we have a speculating read executor and it looks like we may not receive a response from the initial + // set of replicas we sent messages to, speculatively send an additional messages to an un-contacted replica for (int i=0; i<cmdCount; i++) { reads[i].maybeTryAdditionalReplicas(); } + // wait for enough responses to meet the consistency level. If there's a digest mismatch, begin the read + // repair process by sending full data reads to all replicas we received responses from. for (int i=0; i<cmdCount; i++) { reads[i].awaitResponses(); } + // read repair - if it looks like we may not receive enough full data responses to meet CL, send + // an additional request to any remaining replicas we haven't contacted (if there are any) for (int i=0; i<cmdCount; i++) { - reads[i].maybeRepairAdditionalReplicas(); + reads[i].maybeSendAdditionalDataRequests(); } + // read repair - block on full data responses for (int i=0; i<cmdCount; i++) { reads[i].awaitReadRepair(); } + // if we didn't do a read repair, return the contents of the data response, if we did do a read + // repair, merge the full data reads List<PartitionIterator> results = new ArrayList<>(cmdCount); + List<ReadRepair> repairs = new ArrayList<>(cmdCount); for (int i=0; i<cmdCount; i++) { results.add(reads[i].getResult()); + repairs.add(reads[i].getReadRepair()); } - return PartitionIterators.concat(results); + // if we did a read repair, assemble repair mutation and block on them + return concatAndBlockOnRepair(results, repairs); } public static class LocalReadRunnable extends DroppableRunnable @@ -2029,12 +2075,14 @@ public class StorageProxy implements StorageProxyMBean { private final DataResolver resolver; private final ReadCallback handler; + private final ReadRepair readRepair; private PartitionIterator result; - private SingleRangeResponse(DataResolver resolver, ReadCallback handler) + private SingleRangeResponse(DataResolver resolver, ReadCallback handler, ReadRepair readRepair) { this.resolver = resolver; this.handler = handler; + this.readRepair = readRepair; } private void waitForResponse() throws ReadTimeoutException @@ -2165,7 +2213,7 @@ public class StorageProxy implements StorageProxyMBean { PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range, isFirst); - ReadRepair readRepair = ReadRepair.create(command, toQuery.filteredEndpoints, queryStartNanoTime, consistency); + ReadRepair readRepair = ReadRepair.create(command, queryStartNanoTime, consistency); DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime, readRepair); int blockFor = consistency.blockFor(keyspace); @@ -2188,15 +2236,18 @@ public class StorageProxy implements StorageProxyMBean } } - return new SingleRangeResponse(resolver, handler); + return new SingleRangeResponse(resolver, handler, readRepair); } private PartitionIterator sendNextRequests() { List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor); + List<ReadRepair> readRepairs = new ArrayList<>(concurrencyFactor); for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++) { - concurrentQueries.add(query(ranges.next(), i == 0)); + SingleRangeResponse response = query(ranges.next(), i == 0); + concurrentQueries.add(response); + readRepairs.add(response.readRepair); ++rangesQueried; } @@ -2204,7 +2255,7 @@ public class StorageProxy implements StorageProxyMBean // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE. counter = DataLimits.NONE.newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness); - return counter.applyTo(PartitionIterators.concat(concurrentQueries)); + return counter.applyTo(concatAndBlockOnRepair(concurrentQueries, readRepairs)); } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index e531e0c..61b9948 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -69,17 +69,23 @@ public abstract class AbstractReadExecutor protected final long queryStartNanoTime; protected volatile PartitionIterator result = null; + protected final Keyspace keyspace; + protected final int blockFor; + AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime) { this.command = command; this.consistency = consistency; this.targetReplicas = targetReplicas; - this.readRepair = ReadRepair.create(command, targetReplicas, queryStartNanoTime, consistency); + this.readRepair = ReadRepair.create(command, queryStartNanoTime, consistency); this.digestResolver = new DigestResolver(keyspace, command, consistency, readRepair, targetReplicas.size()); this.handler = new ReadCallback(digestResolver, consistency, command, targetReplicas, queryStartNanoTime); this.cfs = cfs; this.traceState = Tracing.instance.get(); this.queryStartNanoTime = queryStartNanoTime; + this.keyspace = keyspace; + this.blockFor = consistency.blockFor(keyspace); + // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes // knows how to produce older digest but the reverse is not true. @@ -91,16 +97,16 @@ public abstract class AbstractReadExecutor command.setDigestVersion(digestVersion); } - private DecoratedKey getKey() + public DecoratedKey getKey() { - if (command instanceof SinglePartitionReadCommand) - { - return ((SinglePartitionReadCommand) command).partitionKey(); - } - else - { - return null; - } + Preconditions.checkState(command instanceof SinglePartitionReadCommand, + "Can only get keys for SinglePartitionReadCommand"); + return ((SinglePartitionReadCommand) command).partitionKey(); + } + + public ReadRepair getReadRepair() + { + return readRepair; } protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints) @@ -409,7 +415,7 @@ public abstract class AbstractReadExecutor { try { - readRepair.awaitRepair(); + readRepair.awaitReads(); } catch (ReadTimeoutException e) { @@ -424,9 +430,17 @@ public abstract class AbstractReadExecutor } } - public void maybeRepairAdditionalReplicas() + boolean isDone() { - // TODO: this + return result != null; + } + + public void maybeSendAdditionalDataRequests() + { + if (isDone()) + return; + + readRepair.maybeSendAdditionalReads(); } public PartitionIterator getResult() throws ReadFailureException, ReadTimeoutException http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java new file mode 100644 index 0000000..eb402ba --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractFuture; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.ReadRepairMetrics; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.tracing.Tracing; + +public class BlockingPartitionRepair extends AbstractFuture<Object> implements IAsyncCallback<Object> +{ + private final Keyspace keyspace; + private final DecoratedKey key; + private final ConsistencyLevel consistency; + private final InetAddressAndPort[] participants; + private final ConcurrentMap<InetAddressAndPort, Mutation> pendingRepairs; + private final CountDownLatch latch; + + private volatile long mutationsSentTime; + + public BlockingPartitionRepair(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, InetAddressAndPort[] participants) + { + this.keyspace = keyspace; + this.key = key; + this.consistency = consistency; + this.pendingRepairs = new ConcurrentHashMap<>(repairs); + this.participants = participants; + + // here we remove empty repair mutations from the block for total, since + // we're not sending them mutations + int blockFor = maxBlockFor; + for (InetAddressAndPort participant: participants) + { + // remote dcs can sometimes get involved in dc-local reads. We want to repair + // them if they do, but they shouldn't interfere with blocking the client read. + if (!repairs.containsKey(participant) && shouldBlockOn(participant)) + blockFor--; + } + + // there are some cases where logically identical data can return different digests + // For read repair, this would result in ReadRepairHandler being called with a map of + // empty mutations. If we'd also speculated on either of the read stages, the number + // of empty mutations would be greater than blockFor, causing the latch ctor to throw + // an illegal argument exception due to a negative start value. So here we clamp it 0 + latch = new CountDownLatch(Math.max(blockFor, 0)); + } + + @VisibleForTesting + long waitingOn() + { + return latch.getCount(); + } + + @VisibleForTesting + boolean isLocal(InetAddressAndPort endpoint) + { + return ConsistencyLevel.isLocal(endpoint); + } + + private boolean shouldBlockOn(InetAddressAndPort endpoint) + { + return !consistency.isDatacenterLocal() || isLocal(endpoint); + } + + @VisibleForTesting + void ack(InetAddressAndPort from) + { + if (shouldBlockOn(from)) + { + pendingRepairs.remove(from); + latch.countDown(); + } + } + + @Override + public void response(MessageIn<Object> msg) + { + ack(msg.from); + } + + @Override + public boolean isLatencyForSnitch() + { + return false; + } + + private static PartitionUpdate extractUpdate(Mutation mutation) + { + return Iterables.getOnlyElement(mutation.getPartitionUpdates()); + } + + /** + * Combine the contents of any unacked repair into a single update + */ + private PartitionUpdate mergeUnackedUpdates() + { + // recombinate the updates + List<PartitionUpdate> updates = Lists.newArrayList(Iterables.transform(pendingRepairs.values(), BlockingPartitionRepair::extractUpdate)); + return updates.isEmpty() ? null : PartitionUpdate.merge(updates); + } + + @VisibleForTesting + protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort endpoint) + { + MessagingService.instance().sendRR(message, endpoint, this); + } + + public void sendInitialRepairs() + { + mutationsSentTime = System.nanoTime(); + for (Map.Entry<InetAddressAndPort, Mutation> entry: pendingRepairs.entrySet()) + { + InetAddressAndPort destination = entry.getKey(); + Mutation mutation = entry.getValue(); + TableId tableId = extractUpdate(mutation).metadata().id; + + Tracing.trace("Sending read-repair-mutation to {}", destination); + // use a separate verb here to avoid writing hints on timeouts + sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), destination); + ColumnFamilyStore.metricsFor(tableId).readRepairRequests.mark(); + + if (!shouldBlockOn(destination)) + pendingRepairs.remove(destination); + } + } + + public boolean awaitRepairs(long timeout, TimeUnit timeoutUnit) + { + long elapsed = System.nanoTime() - mutationsSentTime; + long remaining = timeoutUnit.toNanos(timeout) - elapsed; + + try + { + return latch.await(remaining, TimeUnit.NANOSECONDS); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + } + + private static int msgVersionIdx(int version) + { + return version - MessagingService.minimum_version; + } + + /** + * If it looks like we might not receive acks for all the repair mutations we sent out, combine all + * the unacked mutations and send them to the minority of nodes not involved in the read repair data + * read / write cycle. We will accept acks from them in lieu of acks from the initial mutations sent + * out, so long as we receive the same number of acks as repair mutations transmitted. This prevents + * misbehaving nodes from killing a quorum read, while continuing to guarantee monotonic quorum reads + */ + public void maybeSendAdditionalWrites(long timeout, TimeUnit timeoutUnit) + { + if (awaitRepairs(timeout, timeoutUnit)) + return; + + Set<InetAddressAndPort> exclude = Sets.newHashSet(participants); + Iterable<InetAddressAndPort> candidates = Iterables.filter(getCandidateEndpoints(), e -> !exclude.contains(e)); + if (Iterables.isEmpty(candidates)) + return; + + PartitionUpdate update = mergeUnackedUpdates(); + if (update == null) + // final response was received between speculate + // timeout and call to get unacked mutation. + return; + + ReadRepairMetrics.speculatedWrite.mark(); + + Mutation[] versionedMutations = new Mutation[msgVersionIdx(MessagingService.current_version) + 1]; + + for (InetAddressAndPort endpoint: candidates) + { + int versionIdx = msgVersionIdx(MessagingService.instance().getVersion(endpoint)); + + Mutation mutation = versionedMutations[versionIdx]; + + if (mutation == null) + { + mutation = BlockingReadRepairs.createRepairMutation(update, consistency, endpoint, true); + versionedMutations[versionIdx] = mutation; + } + + if (mutation == null) + { + // the mutation is too large to send. + continue; + } + + Tracing.trace("Sending speculative read-repair-mutation to {}", endpoint); + sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), endpoint); + } + } + + @VisibleForTesting + protected Iterable<InetAddressAndPort> getCandidateEndpoints() + { + return BlockingReadRepairs.getCandidateEndpoints(keyspace, key.getToken(), consistency); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java index cd4d2a7..c5f1bea 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java @@ -18,20 +18,17 @@ package org.apache.cassandra.service.reads.repair; -import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Consumer; -import javax.annotation.Nullable; - -import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,13 +40,15 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.metrics.ReadRepairMetrics; -import org.apache.cassandra.net.AsyncOneResponse; -import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.reads.DataResolver; import org.apache.cassandra.service.reads.DigestResolver; @@ -60,17 +59,14 @@ import org.apache.cassandra.tracing.Tracing; * 'Classic' read repair. Doesn't allow the client read to return until * updates have been written to nodes needing correction. */ -public class BlockingReadRepair implements ReadRepair, RepairListener +public class BlockingReadRepair implements ReadRepair { private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class); - private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS = - Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); - private final ReadCommand command; - private final List<InetAddressAndPort> endpoints; private final long queryStartNanoTime; private final ConsistencyLevel consistency; + private final ColumnFamilyStore cfs; private final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>(); @@ -81,170 +77,149 @@ public class BlockingReadRepair implements ReadRepair, RepairListener private final DataResolver dataResolver; private final ReadCallback readCallback; private final Consumer<PartitionIterator> resultConsumer; + private final List<InetAddressAndPort> initialContacts; - public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer) + public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer, List<InetAddressAndPort> initialContacts) { this.dataResolver = dataResolver; this.readCallback = readCallback; this.resultConsumer = resultConsumer; + this.initialContacts = initialContacts; } } public BlockingReadRepair(ReadCommand command, - List<InetAddressAndPort> endpoints, long queryStartNanoTime, ConsistencyLevel consistency) { this.command = command; - this.endpoints = endpoints; this.queryStartNanoTime = queryStartNanoTime; this.consistency = consistency; + this.cfs = Keyspace.openAndGetStore(command.metadata()); } public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints) { - return new PartitionIteratorMergeListener(endpoints, command, this); + return new PartitionIteratorMergeListener(endpoints, command, consistency, this); + } + + private int getMaxResponses() + { + AbstractReplicationStrategy strategy = cfs.keyspace.getReplicationStrategy(); + if (consistency.isDatacenterLocal() && strategy instanceof NetworkTopologyStrategy) + { + NetworkTopologyStrategy nts = (NetworkTopologyStrategy) strategy; + return nts.getReplicationFactor(DatabaseDescriptor.getLocalDataCenter()); + } + else + { + return strategy.getReplicationFactor(); + } } - public static class BlockingPartitionRepair extends AbstractFuture<Object> implements RepairListener.PartitionRepair + // digestResolver isn't used here because we resend read requests to all participants + public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) { + ReadRepairMetrics.repairedBlocking.mark(); + + // Do a full data read to resolve the correct response (and repair node that need be) + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); + DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, getMaxResponses(), queryStartNanoTime, this); + ReadCallback readCallback = new ReadCallback(resolver, ConsistencyLevel.ALL, consistency.blockFor(cfs.keyspace), command, + keyspace, allEndpoints, queryStartNanoTime); - final List<AsyncOneResponse<?>> responses; - final ReadCommand command; - final ConsistencyLevel consistency; + digestRepair = new DigestRepair(resolver, readCallback, resultConsumer, contactedEndpoints); - public BlockingPartitionRepair(int expectedResponses, ReadCommand command, ConsistencyLevel consistency) + for (InetAddressAndPort endpoint : contactedEndpoints) { - this.responses = new ArrayList<>(expectedResponses); - this.command = command; - this.consistency = consistency; + Tracing.trace("Enqueuing full data read to {}", endpoint); + MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, readCallback); } + } - private AsyncOneResponse sendRepairMutation(Mutation mutation, InetAddressAndPort destination) - { - DecoratedKey key = mutation.key(); - Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); - int messagingVersion = MessagingService.instance().getVersion(destination); + public void awaitReads() throws ReadTimeoutException + { + DigestRepair repair = digestRepair; + if (repair == null) + return; - int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion); - int maxMutationSize = DatabaseDescriptor.getMaxMutationSize(); + repair.readCallback.awaitResults(); + repair.resultConsumer.accept(digestRepair.dataResolver.resolve()); + } - AsyncOneResponse callback = null; + private boolean shouldSpeculate() + { + ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; + return consistency != ConsistencyLevel.EACH_QUORUM + && consistency.satisfies(speculativeCL, cfs.keyspace) + && cfs.sampleLatencyNanos <= TimeUnit.MILLISECONDS.toNanos(command.getTimeout()); + } - if (mutationSize <= maxMutationSize) - { - Tracing.trace("Sending read-repair-mutation to {}", destination); - // use a separate verb here to avoid writing hints on timeouts - MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR); - callback = MessagingService.instance().sendRR(message, destination); - ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark(); - } - else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS) - { - logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", - mutationSize, - maxMutationSize, - command.metadata(), - command.metadata().partitionKeyType.getString(key.getKey()), - destination); - } - else - { - logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", - mutationSize, - maxMutationSize, - command.metadata(), - command.metadata().partitionKeyType.getString(key.getKey()), - destination); - - int blockFor = consistency.blockFor(keyspace); - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); - } - return callback; - } + public void maybeSendAdditionalReads() + { + Preconditions.checkState(command instanceof SinglePartitionReadCommand, + "maybeSendAdditionalReads can only be called for SinglePartitionReadCommand"); + DigestRepair repair = digestRepair; + if (repair == null) + return; - public void reportMutation(InetAddressAndPort endpoint, Mutation mutation) + if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS)) { - AsyncOneResponse<?> response = sendRepairMutation(mutation, endpoint); + Set<InetAddressAndPort> contacted = Sets.newHashSet(repair.initialContacts); + Token replicaToken = ((SinglePartitionReadCommand) command).partitionKey().getToken(); + Iterable<InetAddressAndPort> candidates = BlockingReadRepairs.getCandidateEndpoints(cfs.keyspace, replicaToken, consistency); + boolean speculated = false; + for (InetAddressAndPort endpoint: Iterables.filter(candidates, e -> !contacted.contains(e))) + { + speculated = true; + Tracing.trace("Enqueuing speculative full data read to {}", endpoint); + MessagingService.instance().sendRR(command.createMessage(), endpoint, repair.readCallback); + break; + } - if (response != null) - responses.add(response); + if (speculated) + ReadRepairMetrics.speculatedRead.mark(); } + } - public void finish() + @Override + public void maybeSendAdditionalWrites() + { + for (BlockingPartitionRepair repair: repairs) { - Futures.addCallback(Futures.allAsList(responses), new FutureCallback<List<Object>>() - { - public void onSuccess(@Nullable List<Object> result) - { - set(result); - } - - public void onFailure(Throwable t) - { - setException(t); - } - }); + repair.maybeSendAdditionalWrites(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS); } } - public void awaitRepairs(long timeout) + @Override + public void awaitWrites() { - try + boolean timedOut = false; + for (BlockingPartitionRepair repair: repairs) { - Futures.allAsList(repairs).get(timeout, TimeUnit.MILLISECONDS); + if (!repair.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout(), TimeUnit.MILLISECONDS)) + { + timedOut = true; + } } - catch (TimeoutException ex) + if (timedOut) { // We got all responses, but timed out while repairing - Keyspace keyspace = Keyspace.open(command.metadata().keyspace); - int blockFor = consistency.blockFor(keyspace); + int blockFor = consistency.blockFor(cfs.keyspace); if (Tracing.isTracing()) Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); else logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); - } - catch (InterruptedException | ExecutionException e) - { - throw new RuntimeException(e); - } - } - - public PartitionRepair startPartitionRepair() - { - BlockingPartitionRepair repair = new BlockingPartitionRepair(endpoints.size(), command, consistency); - repairs.add(repair); - return repair; - } - - public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) - { - ReadRepairMetrics.repairedBlocking.mark(); - - // Do a full data read to resolve the correct response (and repair node that need be) - Keyspace keyspace = Keyspace.open(command.metadata().keyspace); - DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, allEndpoints.size(), queryStartNanoTime, this); - ReadCallback readCallback = new ReadCallback(resolver, ConsistencyLevel.ALL, contactedEndpoints.size(), command, - keyspace, allEndpoints, queryStartNanoTime); - - digestRepair = new DigestRepair(resolver, readCallback, resultConsumer); - - for (InetAddressAndPort endpoint : contactedEndpoints) - { - Tracing.trace("Enqueuing full data read to {}", endpoint); - MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, readCallback); + throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); } } - public void awaitRepair() throws ReadTimeoutException + @Override + public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations) { - if (digestRepair != null) - { - digestRepair.readCallback.awaitResults(); - digestRepair.resultConsumer.accept(digestRepair.dataResolver.resolve()); - } + BlockingPartitionRepair blockingRepair = new BlockingPartitionRepair(cfs.keyspace, key, consistency, mutations, consistency.blockFor(cfs.keyspace), destinations); + blockingRepair.sendInitialRepairs(); + repairs.add(blockingRepair); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java new file mode 100644 index 0000000..e5f7179 --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.List; + +import com.google.common.collect.Iterables; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.tracing.Tracing; + +public class BlockingReadRepairs +{ + private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepairs.class); + + private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS = + Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); + + /** + * Returns all of the endpoints that are replicas for the given key. If the consistency level is datacenter + * local, only the endpoints in the local dc will be returned. + */ + static Iterable<InetAddressAndPort> getCandidateEndpoints(Keyspace keyspace, Token token, ConsistencyLevel consistency) + { + List<InetAddressAndPort> endpoints = StorageProxy.getLiveSortedEndpoints(keyspace, token); + return consistency.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy + ? Iterables.filter(endpoints, ConsistencyLevel::isLocal) + : endpoints; + } + + /** + * Create a read repair mutation from the given update, if the mutation is not larger than the maximum + * mutation size, otherwise return null. Or, if we're configured to be strict, throw an exception. + */ + public static Mutation createRepairMutation(PartitionUpdate update, ConsistencyLevel consistency, InetAddressAndPort destination, boolean suppressException) + { + if (update == null) + return null; + + DecoratedKey key = update.partitionKey(); + Mutation mutation = new Mutation(update); + Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + TableMetadata metadata = update.metadata(); + + int messagingVersion = MessagingService.instance().getVersion(destination); + + int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion); + int maxMutationSize = DatabaseDescriptor.getMaxMutationSize(); + + + if (mutationSize <= maxMutationSize) + { + return mutation; + } + else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS) + { + logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", + mutationSize, + maxMutationSize, + metadata, + metadata.partitionKeyType.getString(key.getKey()), + destination); + return null; + } + else + { + logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", + mutationSize, + maxMutationSize, + metadata, + metadata.partitionKeyType.getString(key.getKey()), + destination); + + if (!suppressException) + { + int blockFor = consistency.blockFor(keyspace); + Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); + throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java index 4436f3a..6e161a8 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java @@ -19,8 +19,11 @@ package org.apache.cassandra.service.reads.repair; import java.util.List; +import java.util.Map; import java.util.function.Consumer; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.exceptions.ReadTimeoutException; @@ -43,7 +46,31 @@ public class NoopReadRepair implements ReadRepair resultConsumer.accept(digestResolver.getData()); } - public void awaitRepair() throws ReadTimeoutException + public void awaitReads() throws ReadTimeoutException { } + + @Override + public void maybeSendAdditionalReads() + { + + } + + @Override + public void maybeSendAdditionalWrites() + { + + } + + @Override + public void awaitWrites() + { + + } + + @Override + public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations) + { + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java index 4ccdcbf..6cf761a 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java @@ -20,8 +20,8 @@ package org.apache.cassandra.service.reads.repair; import java.util.List; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Columns; +import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.RegularAndStaticColumns; @@ -34,18 +34,20 @@ public class PartitionIteratorMergeListener implements UnfilteredPartitionIterat { private final InetAddressAndPort[] sources; private final ReadCommand command; - private final RepairListener repairListener; + private final ConsistencyLevel consistency; + private final ReadRepair readRepair; - public PartitionIteratorMergeListener(InetAddressAndPort[] sources, ReadCommand command, RepairListener repairListener) + public PartitionIteratorMergeListener(InetAddressAndPort[] sources, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair) { this.sources = sources; this.command = command; - this.repairListener = repairListener; + this.consistency = consistency; + this.readRepair = readRepair; } public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) { - return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), sources, command, repairListener); + return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), sources, command, consistency, readRepair); } private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) @@ -81,7 +83,6 @@ public class PartitionIteratorMergeListener implements UnfilteredPartitionIterat public void close() { - repairListener.awaitRepairs(DatabaseDescriptor.getWriteRpcTimeout()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java index 289875d..a1a9546 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java @@ -18,9 +18,12 @@ package org.apache.cassandra.service.reads.repair; import java.util.List; +import java.util.Map; import java.util.function.Consumer; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; @@ -38,6 +41,10 @@ public interface ReadRepair /** * Called when the digests from the initial read don't match. Reads may block on the * repair started by this method. + * @param digestResolver supplied so we can get the original data response + * @param allEndpoints all available replicas for this read + * @param contactedEndpoints the replicas we actually sent requests to + * @param resultConsumer hook for the repair to set it's result on completion */ public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, @@ -45,12 +52,41 @@ public interface ReadRepair Consumer<PartitionIterator> resultConsumer); /** - * Wait for any operations started by {@link ReadRepair#startRepair} to complete + * Block on the reads (or timeout) sent out in {@link ReadRepair#startRepair} */ - public void awaitRepair() throws ReadTimeoutException; + public void awaitReads() throws ReadTimeoutException; - static ReadRepair create(ReadCommand command, List<InetAddressAndPort> endpoints, long queryStartNanoTime, ConsistencyLevel consistency) + /** + * if it looks like we might not receive data requests from everyone in time, send additional requests + * to additional replicas not contacted in the initial full data read. If the collection of nodes that + * end up responding in time end up agreeing on the data, and we don't consider the response from the + * disagreeing replica that triggered the read repair, that's ok, since the disagreeing data would not + * have been successfully written and won't be included in the response the the client, preserving the + * expectation of monotonic quorum reads + */ + public void maybeSendAdditionalReads(); + + /** + * If it looks like we might not receive acks for all the repair mutations we sent out, combine all + * the unacked mutations and send them to the minority of nodes not involved in the read repair data + * read / write cycle. We will accept acks from them in lieu of acks from the initial mutations sent + * out, so long as we receive the same number of acks as repair mutations transmitted. This prevents + * misbehaving nodes from killing a quorum read, while continuing to guarantee monotonic quorum reads + */ + public void maybeSendAdditionalWrites(); + + /** + * Hook for the merge listener to start repairs on individual partitions. + */ + void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations); + + /** + * Block on any mutations (or timeout) we sent out to repair replicas in {@link ReadRepair#repairPartition} + */ + public void awaitWrites(); + + static ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) { - return new BlockingReadRepair(command, endpoints, queryStartNanoTime, consistency); + return new BlockingReadRepair(command, queryStartNanoTime, consistency); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java b/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java deleted file mode 100644 index 174c0e7..0000000 --- a/src/java/org/apache/cassandra/service/reads/repair/RepairListener.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service.reads.repair; - -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.locator.InetAddressAndPort; - -public interface RepairListener -{ - interface PartitionRepair - { - void reportMutation(InetAddressAndPort endpoint, Mutation mutation); - void finish(); - } - - PartitionRepair startPartitionRepair(); - void awaitRepairs(long timeoutMillis); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java index f11d264..cb6707d 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java @@ -19,9 +19,13 @@ package org.apache.cassandra.service.reads.repair; import java.util.Arrays; +import java.util.Map; + +import com.google.common.collect.Maps; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.LivenessInfo; @@ -49,6 +53,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis private final boolean isReversed; private final InetAddressAndPort[] sources; private final ReadCommand command; + private final ConsistencyLevel consistency; private final PartitionUpdate.Builder[] repairs; @@ -64,9 +69,9 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis // For each source, record if there is an open range to send as repair, and from where. private final ClusteringBound[] markerToRepair; - private final RepairListener repairListener; + private final ReadRepair readRepair; - public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] sources, ReadCommand command, RepairListener repairListener) + public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] sources, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair) { this.partitionKey = partitionKey; this.columns = columns; @@ -77,7 +82,8 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis sourceDeletionTime = new DeletionTime[sources.length]; markerToRepair = new ClusteringBound[sources.length]; this.command = command; - this.repairListener = repairListener; + this.consistency = consistency; + this.readRepair = readRepair; this.diffListener = new RowDiffListener() { @@ -300,22 +306,25 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis public void close() { - RepairListener.PartitionRepair repair = null; + Map<InetAddressAndPort, Mutation> mutations = null; for (int i = 0; i < repairs.length; i++) { if (repairs[i] == null) continue; - if (repair == null) - { - repair = repairListener.startPartitionRepair(); - } - repair.reportMutation(sources[i], new Mutation(repairs[i].build())); + Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i], false); + if (mutation == null) + continue; + + if (mutations == null) + mutations = Maps.newHashMapWithExpectedSize(sources.length); + + mutations.put(sources[i], mutation); } - if (repair != null) + if (mutations != null) { - repair.finish(); + readRepair.repairPartition(partitionKey, mutations, sources); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java index 4d4d398..1a5aa7a 100644 --- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java @@ -22,16 +22,12 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.*; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.service.reads.DataResolver; -import org.apache.cassandra.service.reads.repair.BlockingReadRepair; -import org.apache.cassandra.service.reads.repair.NoopReadRepair; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.config.DatabaseDescriptor; @@ -47,8 +43,6 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.net.*; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.service.reads.repair.ReadRepair; -import org.apache.cassandra.service.reads.repair.RepairListener; import org.apache.cassandra.service.reads.repair.TestableReadRepair; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -118,7 +112,7 @@ public class DataResolverTest nowInSec = FBUtilities.nowInSeconds(); command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build(); - readRepair = new TestableReadRepair(command); + readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); } @Test @@ -346,7 +340,7 @@ public class DataResolverTest @Test public void testResolveWithBothEmpty() { - TestableReadRepair readRepair = new TestableReadRepair(command); + TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm))); resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm))); @@ -715,7 +709,7 @@ public class DataResolverTest public void testResolveComplexDelete() { ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - TestableReadRepair readRepair = new TestableReadRepair(cmd); + TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); long[] ts = {100, 200}; @@ -767,7 +761,7 @@ public class DataResolverTest public void testResolveDeletedCollection() { ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - TestableReadRepair readRepair = new TestableReadRepair(cmd); + TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); long[] ts = {100, 200}; @@ -810,7 +804,7 @@ public class DataResolverTest public void testResolveNewCollection() { ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - TestableReadRepair readRepair = new TestableReadRepair(cmd); + TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); long[] ts = {100, 200}; @@ -859,7 +853,7 @@ public class DataResolverTest public void testResolveNewCollectionOverwritingDeleted() { ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build(); - TestableReadRepair readRepair = new TestableReadRepair(cmd); + TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM); DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair); long[] ts = {100, 200}; http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java new file mode 100644 index 0000000..75d6e83 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.MigrationManager; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Tables; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class ReadRepairTest +{ + static Keyspace ks; + static ColumnFamilyStore cfs; + static TableMetadata cfm; + static InetAddressAndPort target1; + static InetAddressAndPort target2; + static InetAddressAndPort target3; + static List<InetAddressAndPort> targets; + + private static class InstrumentedReadRepairHandler extends BlockingPartitionRepair + { + public InstrumentedReadRepairHandler(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, InetAddressAndPort[] participants) + { + super(keyspace, key, consistency, repairs, maxBlockFor, participants); + } + + Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>(); + + protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort endpoint) + { + mutationsSent.put(endpoint, message.payload); + } + + List<InetAddressAndPort> candidates = targets; + + protected List<InetAddressAndPort> getCandidateEndpoints() + { + return candidates; + } + + @Override + protected boolean isLocal(InetAddressAndPort endpoint) + { + return targets.contains(endpoint); + } + } + + static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime()); + static DecoratedKey key; + static Cell cell1; + static Cell cell2; + static Cell cell3; + static Mutation resolved; + + private static void assertRowsEqual(Row expected, Row actual) + { + try + { + Assert.assertEquals(expected == null, actual == null); + if (expected == null) + return; + Assert.assertEquals(expected.clustering(), actual.clustering()); + Assert.assertEquals(expected.deletion(), actual.deletion()); + Assert.assertArrayEquals(Iterables.toArray(expected.cells(), Cell.class), Iterables.toArray(expected.cells(), Cell.class)); + } catch (Throwable t) + { + throw new AssertionError(String.format("Row comparison failed, expected %s got %s", expected, actual), t); + } + } + + @BeforeClass + public static void setUpClass() throws Throwable + { + SchemaLoader.loadSchema(); + String ksName = "ks"; + + cfm = CreateTableStatement.parse("CREATE TABLE tbl (k int primary key, v text)", ksName).build(); + KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, KeyspaceParams.simple(3), Tables.of(cfm)); + MigrationManager.announceNewKeyspace(ksm, false); + + ks = Keyspace.open(ksName); + cfs = ks.getColumnFamilyStore("tbl"); + + cfs.sampleLatencyNanos = 0; + + target1 = InetAddressAndPort.getByName("127.0.0.255"); + target2 = InetAddressAndPort.getByName("127.0.0.254"); + target3 = InetAddressAndPort.getByName("127.0.0.253"); + + targets = ImmutableList.of(target1, target2, target3); + + // default test values + key = dk(5); + cell1 = cell("v", "val1", now); + cell2 = cell("v", "val2", now); + cell3 = cell("v", "val3", now); + resolved = mutation(cell1, cell2); + } + + private static DecoratedKey dk(int v) + { + return DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(v)); + } + + private static Cell cell(String name, String value, long timestamp) + { + return BufferCell.live(cfm.getColumn(ColumnIdentifier.getInterned(name, false)), timestamp, ByteBufferUtil.bytes(value)); + } + + private static Mutation mutation(Cell... cells) + { + Row.Builder builder = BTreeRow.unsortedBuilder(0); + builder.newRow(Clustering.EMPTY); + for (Cell cell: cells) + { + builder.addCell(cell); + } + return new Mutation(PartitionUpdate.singleRowUpdate(cfm, key, builder.build())); + } + + private static InstrumentedReadRepairHandler createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, Collection<InetAddressAndPort> participants) + { + InetAddressAndPort[] participantArray = new InetAddressAndPort[participants.size()]; + participants.toArray(participantArray); + return new InstrumentedReadRepairHandler(ks, key, ConsistencyLevel.LOCAL_QUORUM, repairs, maxBlockFor, participantArray); + } + + private static InstrumentedReadRepairHandler createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor) + { + return createRepairHandler(repairs, maxBlockFor, repairs.keySet()); + } + + @Test + public void consistencyLevelTest() throws Exception + { + Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, ks)); + Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, ks)); + Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, ks)); + Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, ks)); + Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, ks)); + } + + private static void assertMutationEqual(Mutation expected, Mutation actual) + { + Assert.assertEquals(expected.getKeyspaceName(), actual.getKeyspaceName()); + Assert.assertEquals(expected.key(), actual.key()); + Assert.assertEquals(expected.key(), actual.key()); + PartitionUpdate expectedUpdate = Iterables.getOnlyElement(expected.getPartitionUpdates()); + PartitionUpdate actualUpdate = Iterables.getOnlyElement(actual.getPartitionUpdates()); + assertRowsEqual(Iterables.getOnlyElement(expectedUpdate), Iterables.getOnlyElement(actualUpdate)); + } + + @Test + public void additionalMutationRequired() throws Exception + { + + Mutation repair1 = mutation(cell2); + Mutation repair2 = mutation(cell1); + + // check that the correct repairs are calculated + Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); + repairs.put(target1, repair1); + repairs.put(target2, repair2); + + + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); + + Assert.assertTrue(handler.mutationsSent.isEmpty()); + + // check that the correct mutations are sent + handler.sendInitialRepairs(); + Assert.assertEquals(2, handler.mutationsSent.size()); + assertMutationEqual(repair1, handler.mutationsSent.get(target1)); + assertMutationEqual(repair2, handler.mutationsSent.get(target2)); + + // check that a combined mutation is speculatively sent to the 3rd target + handler.mutationsSent.clear(); + handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); + Assert.assertEquals(1, handler.mutationsSent.size()); + assertMutationEqual(resolved, handler.mutationsSent.get(target3)); + + // check repairs stop blocking after receiving 2 acks + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + handler.ack(target1); + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + handler.ack(target3); + Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + + } + + /** + * If we've received enough acks, we shouldn't send any additional mutations + */ + @Test + public void noAdditionalMutationRequired() throws Exception + { + Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); + repairs.put(target1, mutation(cell2)); + repairs.put(target2, mutation(cell1)); + + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); + handler.sendInitialRepairs(); + handler.ack(target1); + handler.ack(target2); + + // both replicas have acked, we shouldn't send anything else out + handler.mutationsSent.clear(); + handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); + Assert.assertTrue(handler.mutationsSent.isEmpty()); + } + + /** + * If there are no additional nodes we can send mutations to, we... shouldn't + */ + @Test + public void noAdditionalMutationPossible() throws Exception + { + Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); + repairs.put(target1, mutation(cell2)); + repairs.put(target2, mutation(cell1)); + + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); + handler.sendInitialRepairs(); + + // we've already sent mutations to all candidates, so we shouldn't send any more + handler.candidates = Lists.newArrayList(target1, target2); + handler.mutationsSent.clear(); + handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); + Assert.assertTrue(handler.mutationsSent.isEmpty()); + } + + /** + * If we didn't send a repair to a replica because there wasn't a diff with the + * resolved column family, we shouldn't send it a speculative mutation + */ + @Test + public void mutationsArentSentToInSyncNodes() throws Exception + { + Mutation repair1 = mutation(cell2); + + Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); + repairs.put(target1, repair1); + Collection<InetAddressAndPort> participants = Lists.newArrayList(target1, target2); + + // check that the correct initial mutations are sent out + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, participants); + handler.sendInitialRepairs(); + Assert.assertEquals(1, handler.mutationsSent.size()); + Assert.assertTrue(handler.mutationsSent.containsKey(target1)); + + // check that speculative mutations aren't sent to target2 + handler.mutationsSent.clear(); + handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); + Assert.assertEquals(1, handler.mutationsSent.size()); + Assert.assertTrue(handler.mutationsSent.containsKey(target3)); + } + + @Test + public void onlyBlockOnQuorum() + { + Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); + repairs.put(target1, mutation(cell1)); + repairs.put(target2, mutation(cell2)); + repairs.put(target3, mutation(cell3)); + Assert.assertEquals(3, repairs.size()); + + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); + handler.sendInitialRepairs(); + + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + handler.ack(target1); + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + + // here we should stop blocking, even though we've sent 3 repairs + handler.ack(target2); + Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + + } + + /** + * For dc local consistency levels, noop mutations and responses from remote dcs should not affect effective blockFor + */ + @Test + public void remoteDCTest() throws Exception + { + Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); + repairs.put(target1, mutation(cell1)); + + + InetAddressAndPort remote1 = InetAddressAndPort.getByName("10.0.0.1"); + InetAddressAndPort remote2 = InetAddressAndPort.getByName("10.0.0.2"); + repairs.put(remote1, mutation(cell1)); + + Collection<InetAddressAndPort> participants = Lists.newArrayList(target1, target2, remote1, remote2); + + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, participants); + handler.sendInitialRepairs(); + Assert.assertEquals(2, handler.mutationsSent.size()); + Assert.assertTrue(handler.mutationsSent.containsKey(target1)); + Assert.assertTrue(handler.mutationsSent.containsKey(remote1)); + + Assert.assertEquals(1, handler.waitingOn()); + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + + handler.ack(remote1); + Assert.assertEquals(1, handler.waitingOn()); + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + + handler.ack(target1); + Assert.assertEquals(0, handler.waitingOn()); + Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/644676b0/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java index 5664c9b..f97980b 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; @@ -31,60 +33,59 @@ import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.reads.DigestResolver; -public class TestableReadRepair implements ReadRepair, RepairListener +public class TestableReadRepair implements ReadRepair { public final Map<InetAddressAndPort, Mutation> sent = new HashMap<>(); private final ReadCommand command; + private final ConsistencyLevel consistency; - public TestableReadRepair(ReadCommand command) + public TestableReadRepair(ReadCommand command, ConsistencyLevel consistency) { this.command = command; + this.consistency = consistency; } - private class TestablePartitionRepair implements RepairListener.PartitionRepair + @Override + public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints) { - @Override - public void reportMutation(InetAddressAndPort endpoint, Mutation mutation) - { - sent.put(endpoint, mutation); - } + return new PartitionIteratorMergeListener(endpoints, command, consistency, this); + } - @Override - public void finish() - { + @Override + public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) + { - } } @Override - public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints) + public void awaitReads() throws ReadTimeoutException { - return new PartitionIteratorMergeListener(endpoints, command, this); + } @Override - public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) + public void maybeSendAdditionalReads() { } @Override - public void awaitRepair() throws ReadTimeoutException + public void maybeSendAdditionalWrites() { } @Override - public PartitionRepair startPartitionRepair() + public void awaitWrites() { - return new TestablePartitionRepair(); + } @Override - public void awaitRepairs(long timeoutMillis) + public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations) { - + sent.putAll(mutations); } public Mutation getForEndpoint(InetAddressAndPort endpoint) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
