Updated Branches:
  refs/heads/trunk a57981650 -> 53f7c328a

fix merge from #6132
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-6154


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53f7c328
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53f7c328
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53f7c328

Branch: refs/heads/trunk
Commit: 53f7c328ae4aca0affaed4f0c73678011a4e152a
Parents: a579816
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Mon Oct 7 16:47:41 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Mon Oct 7 16:47:47 2013 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/net/MessagingService.java   | 15 ++++++++++-----
 .../org/apache/cassandra/service/StorageProxy.java   | 10 ++++------
 2 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f7c328/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 1776361..ff8a2c7 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -563,14 +563,16 @@ public final class MessagingService implements 
MessagingServiceMBean
         return idGen.incrementAndGet();
     }
 
-    /*
-     * @see #sendRR(Message message, InetAddress to, IAsyncCallback cb, long 
timeout)
-     */
     public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
     {
         return sendRR(message, to, cb, message.getTimeout());
     }
 
+    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, 
long timeout)
+    {
+        return sendRR(message, to, cb, timeout, null);
+    }
+
     /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
@@ -584,11 +586,14 @@ public final class MessagingService implements 
MessagingServiceMBean
      *                suggest that a timeout occurred to the invoker of the 
send().
      *                suggest that a timeout occurred to the invoker of the 
send().
      * @param timeout the timeout used for expiration
+     * @param consistencyLevel the consistency level, for mutations; must be 
null otherwise
      * @return an reference to message id used to match with the result
      */
-    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, 
long timeout)
+    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, 
long timeout, ConsistencyLevel consistencyLevel)
     {
-        int id = addCallback(cb, message, to, timeout);
+        int id = consistencyLevel == null
+               ? addCallback(cb, message, to, timeout)
+               : addCallback(cb, message, to, timeout, consistencyLevel);
         sendOneWay(message, id, to);
         return id;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f7c328/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 157079f..102d8b5 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -681,7 +681,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             MessageOut<RowMutation> message = rm.createMessage();
             for (InetAddress target : endpoints)
-                MessagingService.instance().sendRR(message, target, handler);
+                MessagingService.instance().sendRR(message, target, handler, 
message.getTimeout(), ConsistencyLevel.ONE);
         }
     }
 
@@ -866,7 +866,7 @@ public class StorageProxy implements StorageProxyMBean
                     // (1.1 knows how to forward old-style String message IDs; 
updated to int in 2.0)
                     if (localDataCenter.equals(dc))
                     {
-                        MessagingService.instance().sendRR(message, 
destination, responseHandler);
+                        MessagingService.instance().sendRR(message, 
destination, responseHandler, message.getTimeout(), consistency_level);
                     }
                     else
                     {
@@ -982,8 +982,7 @@ public class StorageProxy implements StorageProxyMBean
             }
             message = message.withParameter(RowMutation.FORWARD_TO, 
out.getData());
             // send the combined message + forward headers
-            int id = MessagingService.instance().addCallback(handler, message, 
target, message.getTimeout(), handler.consistencyLevel);
-            MessagingService.instance().sendOneWay(message, id, target);
+            int id = MessagingService.instance().sendRR(message, target, 
handler, message.getTimeout(), handler.consistencyLevel);
             logger.trace("Sending message to {}@{}", id, target);
         }
         catch (IOException e)
@@ -1044,8 +1043,7 @@ public class StorageProxy implements StorageProxyMBean
 
             Tracing.trace("Enqueuing counter update to {}", endpoint);
             MessageOut<CounterMutation> message = cm.makeMutationMessage();
-            int id = MessagingService.instance().addCallback(responseHandler, 
message, endpoint, message.getTimeout(), cm.consistency());
-            MessagingService.instance().sendOneWay(message, id, endpoint);
+            MessagingService.instance().sendRR(message, endpoint, 
responseHandler, message.getTimeout(), cm.consistency());
             return responseHandler;
         }
     }

Reply via email to