avoid including non-queried nodes in rangeslice read repair patch by jbellis; reviewed by Vijay for CASSANDRA-3843
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3dc7894 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3dc7894 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3dc7894 Branch: refs/heads/cassandra-1.1 Commit: c3dc7894159ad413f9c8fa0cc0024c6ed0984831 Parents: 22b8a97 Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Feb 8 22:28:47 2012 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Feb 9 15:33:31 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 7 +++---- .../service/RangeSliceResponseResolver.java | 10 +++++++--- .../org/apache/cassandra/service/StorageProxy.java | 6 ++++-- 3 files changed, 14 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3dc7894/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cca24a9..0875da5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,9 +1,8 @@ -1.0.9 +1.0.8 + * avoid including non-queried nodes in rangeslice read repair + (CASSANDRA-3843) * Only snapshot CF being compacted for snapshot_before_compaction (CASSANDRA-3803) - - -1.0.8 * Log active compactions in StatusLogger (CASSANDRA-3703) * Compute more accurate compaction score per level (CASSANDRA-3790) * Return InvalidRequest when using a keyspace that doesn't exist http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3dc7894/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java index 3be61d1..a870d5c 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java +++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java @@ -56,16 +56,20 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro }; private final String table; - private final List<InetAddress> sources; + private List<InetAddress> sources; protected final Collection<Message> responses = new LinkedBlockingQueue<Message>();; public final List<IAsyncResult> repairResults = new ArrayList<IAsyncResult>(); - public RangeSliceResponseResolver(String table, List<InetAddress> sources) + public RangeSliceResponseResolver(String table) { - this.sources = sources; this.table = table; } + public void setSources(List<InetAddress> endpoints) + { + this.sources = endpoints; + } + public List<Row> getData() throws IOException { Message response = responses.iterator().next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3dc7894/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 0672b3f..27db551 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -814,9 +814,10 @@ public class StorageProxy implements StorageProxyMBean RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys); // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints); + RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace); ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints); handler.assureSufficientLiveNodes(); + resolver.setSources(handler.endpoints); for (InetAddress endpoint : handler.endpoints) { MessagingService.instance().sendRR(c2, endpoint, handler); @@ -1071,7 +1072,7 @@ public class StorageProxy implements StorageProxyMBean DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints); // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints); + RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace); IReadCommand iCommand = new IReadCommand() { public String getKeyspace() @@ -1081,6 +1082,7 @@ public class StorageProxy implements StorageProxyMBean }; ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints); handler.assureSufficientLiveNodes(); + resolver.setSources(handler.endpoints); IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range); MessageProducer producer = new CachingMessageProducer(command);