Author: jbellis
Date: Mon Jul 25 18:41:30 2011
New Revision: 1150837
URL: http://svn.apache.org/viewvc?rev=1150837&view=rev
Log:
add ability to drop local reads/writes that are going to timeout
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2943
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1150837&r1=1150836&r2=1150837&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Jul 25 18:41:30 2011
@@ -1,3 +1,8 @@
+0.8.3
+ * add ability to drop local reads/writes that are going to timeout
+ (CASSANDRA-2943)
+
+
0.8.2
* CQL:
- include only one row per unique key for IN queries (CASSANDRA-2717)
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1150837&r1=1150836&r2=1150837&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Mon Jul 25 18:41:30 2011
@@ -349,7 +349,7 @@ public class StorageProxy implements Sto
{
if (logger.isDebugEnabled())
logger.debug("insert writing local " + rm.toString(true));
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
{
@@ -431,7 +431,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("insert writing local & replicate " +
mutation.toString(true));
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
{
@@ -447,7 +447,7 @@ public class StorageProxy implements Sto
{
// We do the replication on another stage because it
involves a read (see CM.makeReplicationMutation)
// and we want to avoid blocking too much the MUTATION
stage
-
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new WrappedRunnable()
+
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new
DroppableRunnable(StorageService.Verb.READ)
{
public void runMayThrow() throws IOException
{
@@ -620,7 +620,7 @@ public class StorageProxy implements Sto
return rows;
}
- static class LocalReadRunnable extends WrappedRunnable
+ static class LocalReadRunnable extends DroppableRunnable
{
private final ReadCommand command;
private final ReadCallback<Row> handler;
@@ -628,6 +628,7 @@ public class StorageProxy implements Sto
LocalReadRunnable(ReadCommand command, ReadCallback<Row> handler)
{
+ super(StorageService.Verb.READ);
this.command = command;
this.handler = handler;
}
@@ -1082,4 +1083,35 @@ public class StorageProxy implements Sto
{
public void apply(IMutation mutation, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String
localDataCenter, ConsistencyLevel consistency_level) throws IOException;
}
+
+ private static abstract class DroppableRunnable implements Runnable
+ {
+ private final long constructionTime = System.currentTimeMillis();
+ private final StorageService.Verb verb;
+
+ public DroppableRunnable(StorageService.Verb verb)
+ {
+ this.verb = verb;
+ }
+
+ public final void run()
+ {
+ if (System.currentTimeMillis() > constructionTime +
DatabaseDescriptor.getRpcTimeout())
+ {
+ MessagingService.instance().incrementDroppedMessages(verb);
+ return;
+ }
+
+ try
+ {
+ runMayThrow();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ abstract protected void runMayThrow() throws Exception;
+ }
}