Author: jbellis
Date: Tue Aug  9 18:37:20 2011
New Revision: 1155460

URL: http://svn.apache.org/viewvc?rev=1155460&view=rev
Log:
avoid doing read forno-op replicate-on-write at CL=1
patch by slebresne and jbellis for CASSANDRA-2892

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1155460&r1=1155459&r2=1155460&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Aug  9 18:37:20 2011
@@ -1,6 +1,7 @@
 0.8.4
  * include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972)
  * use JAVA env var in cassandra-env.sh (CASSANDRA-2785, 2992)
+ * avoid doing read for no-op replicate-on-write at CL=1 (CASSANDRA-2892)
 
 
 0.8.3

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1155460&r1=1155459&r2=1155460&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
 Tue Aug  9 18:37:20 2011
@@ -96,7 +96,7 @@ public class StorageProxy implements Sto
             public void apply(IMutation mutation, Multimap<InetAddress, 
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String 
localDataCenter, ConsistencyLevel consistency_level) throws IOException
             {
                 assert mutation instanceof RowMutation;
-                sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints, 
responseHandler, localDataCenter, true, consistency_level);
+                sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints, 
responseHandler, localDataCenter, consistency_level);
             }
         };
 
@@ -110,7 +110,11 @@ public class StorageProxy implements Sto
         {
             public void apply(IMutation mutation, Multimap<InetAddress, 
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String 
localDataCenter, ConsistencyLevel consistency_level) throws IOException
             {
-                applyCounterMutation(mutation, hintedEndpoints, 
responseHandler, localDataCenter, consistency_level, false);
+                if (logger.isDebugEnabled())
+                    logger.debug("insert writing local & replicate " + 
mutation.toString(true));
+
+                Runnable runnable = counterWriteTask(mutation, 
hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+                runnable.run();
             }
         };
 
@@ -118,7 +122,11 @@ public class StorageProxy implements Sto
         {
             public void apply(IMutation mutation, Multimap<InetAddress, 
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String 
localDataCenter, ConsistencyLevel consistency_level) throws IOException
             {
-                applyCounterMutation(mutation, hintedEndpoints, 
responseHandler, localDataCenter, consistency_level, true);
+                if (logger.isDebugEnabled())
+                    logger.debug("insert writing local & replicate " + 
mutation.toString(true));
+
+                Runnable runnable = counterWriteTask(mutation, 
hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+                StageManager.getStage(Stage.MUTATION).execute(runnable);
             }
         };
     }
@@ -218,7 +226,7 @@ public class StorageProxy implements Sto
         return 
ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key),
 table, naturalEndpoints);
     }
 
-    private static void sendToHintedEndpoints(final RowMutation rm, 
Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler 
responseHandler, String localDataCenter, boolean insertLocalMessages, 
ConsistencyLevel consistency_level)
+    private static void sendToHintedEndpoints(final RowMutation rm, 
Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler 
responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
     throws IOException
     {
         // Multimap that holds onto all the messages and addresses meant for a 
specific datacenter
@@ -237,8 +245,7 @@ public class StorageProxy implements Sto
                 // unhinted writes
                 if (destination.equals(FBUtilities.getLocalAddress()))
                 {
-                    if (insertLocalMessages)
-                        insertLocal(rm, responseHandler);
+                    insertLocal(rm, responseHandler);
                 }
                 else
                 {
@@ -425,13 +432,9 @@ public class StorageProxy implements Sto
         return performWrite(cm, cm.consistency(), localDataCenter, 
counterWriteOnCoordinatorPerformer);
     }
 
-    private static void applyCounterMutation(final IMutation mutation, final 
Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler 
responseHandler, final String localDataCenter, final ConsistencyLevel 
consistency_level, boolean executeOnMutationStage)
+    private static Runnable counterWriteTask(final IMutation mutation, final 
Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler 
responseHandler, final String localDataCenter, final ConsistencyLevel 
consistency_level)
     {
-        // we apply locally first, then send it to other replica
-        if (logger.isDebugEnabled())
-            logger.debug("insert writing local & replicate " + 
mutation.toString(true));
-
-        Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
+        return new DroppableRunnable(StorageService.Verb.MUTATION)
         {
             public void runMayThrow() throws IOException
             {
@@ -440,10 +443,11 @@ public class StorageProxy implements Sto
 
                 // apply mutation
                 cm.apply();
-
                 responseHandler.response(null);
 
-                if (cm.shouldReplicateOnWrite())
+                // then send to replicas, if any
+                hintedEndpoints.removeAll(FBUtilities.getLocalAddress());
+                if (cm.shouldReplicateOnWrite() && !hintedEndpoints.isEmpty())
                 {
                     // We do the replication on another stage because it 
involves a read (see CM.makeReplicationMutation)
                     // and we want to avoid blocking too much the MUTATION 
stage
@@ -452,16 +456,12 @@ public class StorageProxy implements Sto
                         public void runMayThrow() throws IOException
                         {
                             // send mutation to other replica
-                            
sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints, 
responseHandler, localDataCenter, false, consistency_level);
+                            
sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints, 
responseHandler, localDataCenter, consistency_level);
                         }
                     });
                 }
             }
         };
-        if (executeOnMutationStage)
-            StageManager.getStage(Stage.MUTATION).execute(runnable);
-        else
-            runnable.run();
     }
 
     /**


Reply via email to