Author: jbellis
Date: Thu Sep  8 16:04:20 2011
New Revision: 1166774

URL: http://svn.apache.org/viewvc?rev=1166774&view=rev
Log:
Improve caching of same-version Messages on digest and repair paths
patch by jbellis; reviewed by slebresne for CASSANDRA-3158

Modified:
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1166774&r1=1166773&r2=1166774&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
 Thu Sep  8 16:04:20 2011
@@ -516,50 +516,45 @@ public class StorageProxy implements Sto
             ReadCallback<Row> handler = getReadCallback(resolver, command, 
consistency_level, endpoints);
             handler.assureSufficientLiveNodes();
             assert !handler.endpoints.isEmpty();
+            readCallbacks.add(handler);
 
-            // The data-request message is sent to dataPoint, the node that 
will actually get
-            // the data for us. The other replicas are only sent a digest 
query.
-            ReadCommand digestCommand = null;
-            if (handler.endpoints.size() > 1)
-            {
-                digestCommand = command.copy();
-                digestCommand.setDigestQuery(true);
-            }
-
+            // The data-request message is sent to dataPoint, the node that 
will actually get the data for us
             InetAddress dataPoint = handler.endpoints.get(0);
             if (dataPoint.equals(FBUtilities.getLocalAddress()))
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("reading data locally");
+                logger.debug("reading data locally");
                 StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(command, handler));
             }
             else
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("reading data from " + dataPoint);
+                logger.debug("reading data from {}", dataPoint);
                 MessagingService.instance().sendRR(command, dataPoint, 
handler);
             }
 
-            // We lazy-construct the digest Message object since it may not be 
necessary if we
-            // are doing a local digest read, or no digest reads at all.
-            MessageProducer producer = new 
CachingMessageProducer(digestCommand);
+            if (handler.endpoints.size() == 1)
+                continue;
+
+            // send the other endpoints a digest request
+            ReadCommand digestCommand = command.copy();
+            digestCommand.setDigestQuery(true);
+            MessageProducer producer = null;
             for (InetAddress digestPoint : handler.endpoints.subList(1, 
handler.endpoints.size()))
             {
                 if (digestPoint.equals(FBUtilities.getLocalAddress()))
                 {
-                    if (logger.isDebugEnabled())
-                        logger.debug("reading digest locally");
+                    logger.debug("reading digest locally");
                     StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(digestCommand, handler));
                 }
                 else
                 {
-                    if (logger.isDebugEnabled())
-                        logger.debug("reading digest for from " + digestPoint);
+                    logger.debug("reading digest from {}", digestPoint);
+                    // (We lazy-construct the digest Message object since it 
may not be necessary if we
+                    // are doing a local digest read, or no digest reads at 
all.)
+                    if (producer == null)
+                        producer = new CachingMessageProducer(digestCommand);
                     MessagingService.instance().sendRR(producer, digestPoint, 
handler);
                 }
             }
-
-            readCallbacks.add(handler);
         }
 
         // read results and make a second pass for any digest mismatches
@@ -591,8 +586,9 @@ public class StorageProxy implements Sto
                     logger.debug("Digest mismatch: {}", ex.toString());
                 RowRepairResolver resolver = new 
RowRepairResolver(command.table, command.key);
                 RepairCallback<Row> repairHandler = new 
RepairCallback<Row>(resolver, handler.endpoints);
+                MessageProducer producer = new CachingMessageProducer(command);
                 for (InetAddress endpoint : handler.endpoints)
-                    MessagingService.instance().sendRR(command, endpoint, 
repairHandler);
+                    MessagingService.instance().sendRR(producer, endpoint, 
repairHandler);
 
                 if (repairResponseHandlers == null)
                     repairResponseHandlers = new 
ArrayList<RepairCallback<Row>>();


Reply via email to