Author: jbellis
Date: Thu Sep 8 16:04:20 2011
New Revision: 1166774
URL: http://svn.apache.org/viewvc?rev=1166774&view=rev
Log:
Improve caching of same-version Messages on digest and repair paths
patch by jbellis; reviewed by slebresne for CASSANDRA-3158
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1166774&r1=1166773&r2=1166774&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Thu Sep 8 16:04:20 2011
@@ -516,50 +516,45 @@ public class StorageProxy implements Sto
ReadCallback<Row> handler = getReadCallback(resolver, command,
consistency_level, endpoints);
handler.assureSufficientLiveNodes();
assert !handler.endpoints.isEmpty();
+ readCallbacks.add(handler);
- // The data-request message is sent to dataPoint, the node that
will actually get
- // the data for us. The other replicas are only sent a digest
query.
- ReadCommand digestCommand = null;
- if (handler.endpoints.size() > 1)
- {
- digestCommand = command.copy();
- digestCommand.setDigestQuery(true);
- }
-
+ // The data-request message is sent to dataPoint, the node that
will actually get the data for us
InetAddress dataPoint = handler.endpoints.get(0);
if (dataPoint.equals(FBUtilities.getLocalAddress()))
{
- if (logger.isDebugEnabled())
- logger.debug("reading data locally");
+ logger.debug("reading data locally");
StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(command, handler));
}
else
{
- if (logger.isDebugEnabled())
- logger.debug("reading data from " + dataPoint);
+ logger.debug("reading data from {}", dataPoint);
MessagingService.instance().sendRR(command, dataPoint,
handler);
}
- // We lazy-construct the digest Message object since it may not be
necessary if we
- // are doing a local digest read, or no digest reads at all.
- MessageProducer producer = new
CachingMessageProducer(digestCommand);
+ if (handler.endpoints.size() == 1)
+ continue;
+
+ // send the other endpoints a digest request
+ ReadCommand digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
+ MessageProducer producer = null;
for (InetAddress digestPoint : handler.endpoints.subList(1,
handler.endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getLocalAddress()))
{
- if (logger.isDebugEnabled())
- logger.debug("reading digest locally");
+ logger.debug("reading digest locally");
StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(digestCommand, handler));
}
else
{
- if (logger.isDebugEnabled())
- logger.debug("reading digest for from " + digestPoint);
+ logger.debug("reading digest from {}", digestPoint);
+ // (We lazy-construct the digest Message object since it
may not be necessary if we
+ // are doing a local digest read, or no digest reads at
all.)
+ if (producer == null)
+ producer = new CachingMessageProducer(digestCommand);
MessagingService.instance().sendRR(producer, digestPoint,
handler);
}
}
-
- readCallbacks.add(handler);
}
// read results and make a second pass for any digest mismatches
@@ -591,8 +586,9 @@ public class StorageProxy implements Sto
logger.debug("Digest mismatch: {}", ex.toString());
RowRepairResolver resolver = new
RowRepairResolver(command.table, command.key);
RepairCallback<Row> repairHandler = new
RepairCallback<Row>(resolver, handler.endpoints);
+ MessageProducer producer = new CachingMessageProducer(command);
for (InetAddress endpoint : handler.endpoints)
- MessagingService.instance().sendRR(command, endpoint,
repairHandler);
+ MessagingService.instance().sendRR(producer, endpoint,
repairHandler);
if (repairResponseHandlers == null)
repairResponseHandlers = new
ArrayList<RepairCallback<Row>>();