Repository: cassandra Updated Branches: refs/heads/trunk a792a7bae -> 9c6cf3cdf
Clean up ARE sendXRequests() methods and ReadCommand#setDigestQuery() patch by Aleksey Yeschenko; reveiwed by Jonathan Ellis for CASSANDRA-8647 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2d140ff Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2d140ff Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2d140ff Branch: refs/heads/trunk Commit: e2d140fff752e757f40c812dcd1b7bed3ea5fed2 Parents: 2445d4d Author: Aleksey Yeschenko <[email protected]> Authored: Wed Jan 21 00:52:53 2015 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Jan 21 00:54:34 2015 +0300 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ReadCommand.java | 3 +- .../db/RetriedSliceFromReadCommand.java | 4 +- .../cassandra/db/SliceByNamesReadCommand.java | 8 +-- .../cassandra/db/SliceFromReadCommand.java | 8 +-- .../cassandra/service/AbstractReadExecutor.java | 56 +++++++++----------- 5 files changed, 31 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 299693e..dedff6f 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -89,9 +89,10 @@ public abstract class ReadCommand implements IReadCommand, Pageable return isDigestQuery; } - public void setDigestQuery(boolean isDigestQuery) + public ReadCommand setIsDigestQuery(boolean isDigestQuery) { this.isDigestQuery = isDigestQuery; + return this; } public String getColumnFamilyName() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java index fe54917..41f5a50 100644 --- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java +++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java @@ -38,9 +38,7 @@ public class RetriedSliceFromReadCommand extends SliceFromReadCommand @Override public ReadCommand copy() { - ReadCommand readCommand = new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount); - readCommand.setDigestQuery(isDigestQuery()); - return readCommand; + return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount).setIsDigestQuery(isDigestQuery()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java index b1829f3..22f795e 100644 --- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java +++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java @@ -44,9 +44,7 @@ public class SliceByNamesReadCommand extends ReadCommand public ReadCommand copy() { - ReadCommand readCommand= new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter); - readCommand.setDigestQuery(isDigestQuery()); - return readCommand; + return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery()); } public Row getRow(Keyspace keyspace) @@ -97,9 +95,7 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm long timestamp = in.readLong(); CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); NamesQueryFilter filter = metadata.comparator.namesQueryFilterSerializer().deserialize(in, version); - ReadCommand command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter); - command.setDigestQuery(isDigest); - return command; + return new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest); } public long serializedSize(ReadCommand cmd, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/SliceFromReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java index f06b9dc..2259f22 100644 --- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java +++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java @@ -48,9 +48,7 @@ public class SliceFromReadCommand extends ReadCommand public ReadCommand copy() { - ReadCommand readCommand = new SliceFromReadCommand(ksName, key, cfName, timestamp, filter); - readCommand.setDigestQuery(isDigestQuery()); - return readCommand; + return new SliceFromReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery()); } public Row getRow(Keyspace keyspace) @@ -151,9 +149,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand long timestamp = in.readLong(); CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version); - ReadCommand command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter); - command.setDigestQuery(isDigest); - return command; + return new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest); } public long serializedSize(ReadCommand cmd, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 2c3261f..0546e27 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -77,43 +77,38 @@ public abstract class AbstractReadExecutor protected void makeDataRequests(Iterable<InetAddress> endpoints) { - boolean readLocal = false; - for (InetAddress endpoint : endpoints) - { - if (isLocalRequest(endpoint)) - { - readLocal = true; - } - else - { - logger.trace("reading data from {}", endpoint); - MessagingService.instance().sendRR(command.createMessage(), endpoint, handler); - } - } - if (readLocal) - { - logger.trace("reading data locally"); - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler)); - } + makeRequests(command, endpoints); } protected void makeDigestRequests(Iterable<InetAddress> endpoints) { - ReadCommand digestCommand = command.copy(); - digestCommand.setDigestQuery(true); - MessageOut<?> message = digestCommand.createMessage(); + makeRequests(command.copy().setIsDigestQuery(true), endpoints); + } + + private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints) + { + MessageOut<ReadCommand> message = null; + boolean hasLocalEndpoint = false; + for (InetAddress endpoint : endpoints) { if (isLocalRequest(endpoint)) { - logger.trace("reading digest locally"); - StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler)); - } - else - { - logger.trace("reading digest from {}", endpoint); - MessagingService.instance().sendRR(message, endpoint, handler); + hasLocalEndpoint = true; + continue; } + + logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); + if (message == null) + message = readCommand.createMessage(); + MessagingService.instance().sendRR(message, endpoint, handler); + } + + // We delay the local (potentially blocking) read till the end to avoid stalling remote requests. + if (hasLocalEndpoint) + { + logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data"); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler)); } } @@ -278,10 +273,7 @@ public abstract class AbstractReadExecutor // Could be waiting on the data, or on enough digests. ReadCommand retryCommand = command; if (resolver.getData() != null) - { - retryCommand = command.copy(); - retryCommand.setDigestQuery(true); - } + retryCommand = command.copy().setIsDigestQuery(true); InetAddress extraReplica = Iterables.getLast(targetReplicas); logger.trace("speculating read retry on {}", extraReplica);
