use correct list of replicas for LOCAL_QUORUM reads when read repair is disabled patch by jbellis; reviewed by Vijay for CASSANDRA-3696
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70a350e6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70a350e6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70a350e6 Branch: refs/heads/cassandra-0.8 Commit: 70a350e6e975c8c1aaaa5ed732f090974621355c Parents: 8d6b6f6 Author: Jonathan Ellis <[email protected]> Authored: Wed Jan 4 21:10:10 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Thu Jan 5 13:06:21 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ .../cassandra/service/DatacenterReadCallback.java | 27 +++++++++----- .../org/apache/cassandra/service/ReadCallback.java | 14 +++++-- 3 files changed, 30 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/70a350e6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 01159aa..42218a7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,8 @@ 0.8.10 * prevent new nodes from thinking down nodes are up forever (CASSANDRA-3626) + * use correct list of replicas for LOCAL_QUORUM reads when read repair + is disabled (CASSANDRA-3696) + 0.8.9 * avoid logging (harmless) exception when GC takes < 1ms (CASSANDRA-3656) http://git-wip-us.apache.org/repos/asf/cassandra/blob/70a350e6/src/java/org/apache/cassandra/service/DatacenterReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java index 1f2e43f..916c996 100644 --- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java +++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java @@ -23,6 +23,8 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import org.apache.cassandra.config.DatabaseDescriptor; @@ -42,6 +44,19 @@ public class DatacenterReadCallback<T> extends ReadCallback<T> { private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress()); + private static final Comparator<InetAddress> localComparator = new Comparator<InetAddress>() + { + public int compare(InetAddress endpoint1, InetAddress endpoint2) + { + boolean local1 = localdc.equals(snitch.getDatacenter(endpoint1)); + boolean local2 = localdc.equals(snitch.getDatacenter(endpoint2)); + if (local1 && !local2) + return -1; + if (local2 && !local1) + return 1; + return 0; + } + }; public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints) { @@ -49,17 +64,9 @@ public class DatacenterReadCallback<T> extends ReadCallback<T> } @Override - protected List<InetAddress> preferredEndpoints(List<InetAddress> endpoints) + protected void sortForConsistencyLevel(List<InetAddress> endpoints) { - ArrayList<InetAddress> preferred = new ArrayList<InetAddress>(blockfor); - for (InetAddress endpoint : endpoints) - { - if (localdc.equals(snitch.getDatacenter(endpoint))) - preferred.add(endpoint); - if (preferred.size() == blockfor) - break; - } - return preferred; + Collections.sort(endpoints, localComparator); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/70a350e6/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 9416a01..3773c19 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -21,7 +21,6 @@ package org.apache.cassandra.service; import java.io.IOException; import java.net.InetAddress; import java.util.List; -import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -68,18 +67,25 @@ public class ReadCallback<T> implements IAsyncCallback this.resolver = resolver; this.startTime = System.currentTimeMillis(); boolean repair = randomlyReadRepair(); + sortForConsistencyLevel(endpoints); this.endpoints = repair || resolver instanceof RowRepairResolver ? endpoints - : preferredEndpoints(endpoints); + : endpoints.subList(0, Math.min(endpoints.size(), blockfor)); if (logger.isDebugEnabled()) logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s", blockfor, repair, StringUtils.join(this.endpoints, ","))); } - protected List<InetAddress> preferredEndpoints(List<InetAddress> endpoints) + /** + * Endpoints is already restricted to live replicas, sorted by snitch preference. This is a hook for + * DatacenterReadCallback to move local-DC replicas to the front of the list. We need this both + * when doing read repair (because the first replica gets the data read) and otherwise (because + * only the first 1..blockfor replicas will get digest reads). + */ + protected void sortForConsistencyLevel(List<InetAddress> endpoints) { - return endpoints.subList(0, Math.min(endpoints.size(), blockfor)); // min so as to not throw exception until assureSufficient is called + // no-op except in DRC } private boolean randomlyReadRepair()
