Author: jbellis Date: Tue Aug 9 18:37:20 2011 New Revision: 1155460 URL: http://svn.apache.org/viewvc?rev=1155460&view=rev Log: avoid doing read forno-op replicate-on-write at CL=1 patch by slebresne and jbellis for CASSANDRA-2892
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1155460&r1=1155459&r2=1155460&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Aug 9 18:37:20 2011 @@ -1,6 +1,7 @@ 0.8.4 * include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972) * use JAVA env var in cassandra-env.sh (CASSANDRA-2785, 2992) + * avoid doing read for no-op replicate-on-write at CL=1 (CASSANDRA-2892) 0.8.3 Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1155460&r1=1155459&r2=1155460&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Tue Aug 9 18:37:20 2011 @@ -96,7 +96,7 @@ public class StorageProxy implements Sto public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException { assert mutation instanceof RowMutation; - sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints, responseHandler, localDataCenter, true, consistency_level); + sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level); } }; @@ -110,7 +110,11 @@ public class StorageProxy implements Sto { public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException { - applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level, false); + if (logger.isDebugEnabled()) + logger.debug("insert writing local & replicate " + mutation.toString(true)); + + Runnable runnable = counterWriteTask(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level); + runnable.run(); } }; @@ -118,7 +122,11 @@ public class StorageProxy implements Sto { public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException { - applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level, true); + if (logger.isDebugEnabled()) + logger.debug("insert writing local & replicate " + mutation.toString(true)); + + Runnable runnable = counterWriteTask(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level); + StageManager.getStage(Stage.MUTATION).execute(runnable); } }; } @@ -218,7 +226,7 @@ public class StorageProxy implements Sto return ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key), table, naturalEndpoints); } - private static void sendToHintedEndpoints(final RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, boolean insertLocalMessages, ConsistencyLevel consistency_level) + private static void sendToHintedEndpoints(final RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException { // Multimap that holds onto all the messages and addresses meant for a specific datacenter @@ -237,8 +245,7 @@ public class StorageProxy implements Sto // unhinted writes if (destination.equals(FBUtilities.getLocalAddress())) { - if (insertLocalMessages) - insertLocal(rm, responseHandler); + insertLocal(rm, responseHandler); } else { @@ -425,13 +432,9 @@ public class StorageProxy implements Sto return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer); } - private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level, boolean executeOnMutationStage) + private static Runnable counterWriteTask(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level) { - // we apply locally first, then send it to other replica - if (logger.isDebugEnabled()) - logger.debug("insert writing local & replicate " + mutation.toString(true)); - - Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION) + return new DroppableRunnable(StorageService.Verb.MUTATION) { public void runMayThrow() throws IOException { @@ -440,10 +443,11 @@ public class StorageProxy implements Sto // apply mutation cm.apply(); - responseHandler.response(null); - if (cm.shouldReplicateOnWrite()) + // then send to replicas, if any + hintedEndpoints.removeAll(FBUtilities.getLocalAddress()); + if (cm.shouldReplicateOnWrite() && !hintedEndpoints.isEmpty()) { // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation) // and we want to avoid blocking too much the MUTATION stage @@ -452,16 +456,12 @@ public class StorageProxy implements Sto public void runMayThrow() throws IOException { // send mutation to other replica - sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints, responseHandler, localDataCenter, false, consistency_level); + sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints, responseHandler, localDataCenter, consistency_level); } }); } } }; - if (executeOnMutationStage) - StageManager.getStage(Stage.MUTATION).execute(runnable); - else - runnable.run(); } /**