Author: jbellis
Date: Tue Aug 9 18:37:20 2011
New Revision: 1155460
URL: http://svn.apache.org/viewvc?rev=1155460&view=rev
Log:
avoid doing read forno-op replicate-on-write at CL=1
patch by slebresne and jbellis for CASSANDRA-2892
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1155460&r1=1155459&r2=1155460&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Aug 9 18:37:20 2011
@@ -1,6 +1,7 @@
0.8.4
* include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972)
* use JAVA env var in cassandra-env.sh (CASSANDRA-2785, 2992)
+ * avoid doing read for no-op replicate-on-write at CL=1 (CASSANDRA-2892)
0.8.3
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1155460&r1=1155459&r2=1155460&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Aug 9 18:37:20 2011
@@ -96,7 +96,7 @@ public class StorageProxy implements Sto
public void apply(IMutation mutation, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String
localDataCenter, ConsistencyLevel consistency_level) throws IOException
{
assert mutation instanceof RowMutation;
- sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints,
responseHandler, localDataCenter, true, consistency_level);
+ sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints,
responseHandler, localDataCenter, consistency_level);
}
};
@@ -110,7 +110,11 @@ public class StorageProxy implements Sto
{
public void apply(IMutation mutation, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String
localDataCenter, ConsistencyLevel consistency_level) throws IOException
{
- applyCounterMutation(mutation, hintedEndpoints,
responseHandler, localDataCenter, consistency_level, false);
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing local & replicate " +
mutation.toString(true));
+
+ Runnable runnable = counterWriteTask(mutation,
hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+ runnable.run();
}
};
@@ -118,7 +122,11 @@ public class StorageProxy implements Sto
{
public void apply(IMutation mutation, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String
localDataCenter, ConsistencyLevel consistency_level) throws IOException
{
- applyCounterMutation(mutation, hintedEndpoints,
responseHandler, localDataCenter, consistency_level, true);
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing local & replicate " +
mutation.toString(true));
+
+ Runnable runnable = counterWriteTask(mutation,
hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+ StageManager.getStage(Stage.MUTATION).execute(runnable);
}
};
}
@@ -218,7 +226,7 @@ public class StorageProxy implements Sto
return
ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key),
table, naturalEndpoints);
}
- private static void sendToHintedEndpoints(final RowMutation rm,
Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler
responseHandler, String localDataCenter, boolean insertLocalMessages,
ConsistencyLevel consistency_level)
+ private static void sendToHintedEndpoints(final RowMutation rm,
Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler
responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
throws IOException
{
// Multimap that holds onto all the messages and addresses meant for a
specific datacenter
@@ -237,8 +245,7 @@ public class StorageProxy implements Sto
// unhinted writes
if (destination.equals(FBUtilities.getLocalAddress()))
{
- if (insertLocalMessages)
- insertLocal(rm, responseHandler);
+ insertLocal(rm, responseHandler);
}
else
{
@@ -425,13 +432,9 @@ public class StorageProxy implements Sto
return performWrite(cm, cm.consistency(), localDataCenter,
counterWriteOnCoordinatorPerformer);
}
- private static void applyCounterMutation(final IMutation mutation, final
Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler
responseHandler, final String localDataCenter, final ConsistencyLevel
consistency_level, boolean executeOnMutationStage)
+ private static Runnable counterWriteTask(final IMutation mutation, final
Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler
responseHandler, final String localDataCenter, final ConsistencyLevel
consistency_level)
{
- // we apply locally first, then send it to other replica
- if (logger.isDebugEnabled())
- logger.debug("insert writing local & replicate " +
mutation.toString(true));
-
- Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
+ return new DroppableRunnable(StorageService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
{
@@ -440,10 +443,11 @@ public class StorageProxy implements Sto
// apply mutation
cm.apply();
-
responseHandler.response(null);
- if (cm.shouldReplicateOnWrite())
+ // then send to replicas, if any
+ hintedEndpoints.removeAll(FBUtilities.getLocalAddress());
+ if (cm.shouldReplicateOnWrite() && !hintedEndpoints.isEmpty())
{
// We do the replication on another stage because it
involves a read (see CM.makeReplicationMutation)
// and we want to avoid blocking too much the MUTATION
stage
@@ -452,16 +456,12 @@ public class StorageProxy implements Sto
public void runMayThrow() throws IOException
{
// send mutation to other replica
-
sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints,
responseHandler, localDataCenter, false, consistency_level);
+
sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints,
responseHandler, localDataCenter, consistency_level);
}
});
}
}
};
- if (executeOnMutationStage)
- StageManager.getStage(Stage.MUTATION).execute(runnable);
- else
- runnable.run();
}
/**