Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 a5297f16e -> 0b99d33ee


Write hints for paxos commits

patch by Sankalp Kohli and Aleksey Yeschenko for CASSANDRA-7342


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/07a7e80c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/07a7e80c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/07a7e80c

Branch: refs/heads/cassandra-2.2
Commit: 07a7e80c321b6667cc44a236f999c7837fe48e20
Parents: 3b7934f
Author: sankalp kohli <kohlisank...@gmail.com>
Authored: Thu Aug 6 18:56:37 2015 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Sun Aug 9 22:52:03 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 src/java/org/apache/cassandra/net/MessagingService.java | 11 +++++------
 .../org/apache/cassandra/net/WriteCallbackInfo.java     | 11 ++++++++++-
 src/java/org/apache/cassandra/service/StorageProxy.java | 12 ++++++++----
 4 files changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/07a7e80c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b0241c..7151883 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.9
+ * Write hints for paxos commits (CASSANDRA-7342)
  * (cqlsh) Fix timestamps before 1970 on Windows, always
    use UTC for timestamp display (CASSANDRA-10000)
  * (cqlsh) Avoid overwriting new config file with old config

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07a7e80c/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 1ad0e7c..d7825d4 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -379,8 +379,7 @@ public final class MessagingService implements 
MessagingServiceMBean
 
                 if (expiredCallbackInfo.shouldHint())
                 {
-                    Mutation mutation = (Mutation) ((WriteCallbackInfo) 
expiredCallbackInfo).sentMessage.payload;
-
+                    Mutation mutation = ((WriteCallbackInfo) 
expiredCallbackInfo).mutation();
                     return StorageProxy.submitHint(mutation, 
expiredCallbackInfo.target, null);
                 }
 
@@ -594,13 +593,13 @@ public final class MessagingService implements 
MessagingServiceMBean
     }
 
     public int addCallback(IAsyncCallback cb,
-                           MessageOut<? extends IMutation> message,
+                           MessageOut<?> message,
                            InetAddress to,
                            long timeout,
                            ConsistencyLevel consistencyLevel,
                            boolean allowHints)
     {
-        assert message.verb == Verb.MUTATION || message.verb == 
Verb.COUNTER_MUTATION;
+        assert message.verb == Verb.MUTATION || message.verb == 
Verb.COUNTER_MUTATION || message.verb == Verb.PAXOS_COMMIT;
         int messageId = nextId();
 
         CallbackInfo previous = callbacks.put(messageId,
@@ -651,7 +650,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     }
 
     /**
-     * Send a mutation message to a given endpoint. This method specifies a 
callback
+     * Send a mutation message or a Paxos Commit to a given endpoint. This 
method specifies a callback
      * which is invoked with the actual response.
      * Also holds the message (only mutation messages) to determine if it
      * needs to trigger a hint (uses StorageProxy for that).
@@ -662,7 +661,7 @@ public final class MessagingService implements 
MessagingServiceMBean
      *                suggest that a timeout occurred to the invoker of the 
send().
      * @return an reference to message id used to match with the result
      */
-    public int sendRR(MessageOut<? extends IMutation> message,
+    public int sendRR(MessageOut<?> message,
                       InetAddress to,
                       AbstractWriteResponseHandler handler,
                       boolean allowHints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07a7e80c/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java 
b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 987ec15..0cf126f 100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -21,12 +21,14 @@ package org.apache.cassandra.net;
 import java.net.InetAddress;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.paxos.Commit;
 
 public class WriteCallbackInfo extends CallbackInfo
 {
-    public final MessageOut sentMessage;
+    private final MessageOut sentMessage;
     private final ConsistencyLevel consistencyLevel;
     private final boolean allowHints;
 
@@ -44,6 +46,13 @@ public class WriteCallbackInfo extends CallbackInfo
         this.allowHints = allowHints;
     }
 
+    Mutation mutation()
+    {
+        return sentMessage.verb == MessagingService.Verb.PAXOS_COMMIT
+             ? ((Commit) sentMessage.payload).makeMutation()
+             : (Mutation) sentMessage.payload;
+    }
+
     public boolean shouldHint()
     {
         return allowHints

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07a7e80c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0045006..1536427 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -258,7 +258,7 @@ public class StorageProxy implements StorageProxyMBean
                 Tracing.trace("CAS precondition is met; proposing 
client-requested updates for {}", ballot);
                 if (proposePaxos(proposal, liveEndpoints, 
requiredParticipants, true, consistencyForPaxos))
                 {
-                    commitPaxos(proposal, consistencyForCommit);
+                    commitPaxos(proposal, consistencyForCommit, true);
                     Tracing.trace("CAS successful");
                     return null;
                 }
@@ -394,7 +394,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     try
                     {
-                        commitPaxos(refreshedInProgress, consistencyForCommit);
+                        commitPaxos(refreshedInProgress, consistencyForCommit, 
false);
                     }
                     catch (WriteTimeoutException e)
                     {
@@ -474,7 +474,7 @@ public class StorageProxy implements StorageProxyMBean
         return false;
     }
 
-    private static void commitPaxos(Commit proposal, ConsistencyLevel 
consistencyLevel) throws WriteTimeoutException
+    private static void commitPaxos(Commit proposal, ConsistencyLevel 
consistencyLevel, boolean shouldHint) throws WriteTimeoutException
     {
         boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
         Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
@@ -496,10 +496,14 @@ public class StorageProxy implements StorageProxyMBean
             if (FailureDetector.instance.isAlive(destination))
             {
                 if (shouldBlock)
-                    MessagingService.instance().sendRR(message, destination, 
responseHandler);
+                    MessagingService.instance().sendRR(message, destination, 
responseHandler, shouldHint);
                 else
                     MessagingService.instance().sendOneWay(message, 
destination);
             }
+            else if (shouldHint)
+            {
+                submitHint(proposal.makeMutation(), destination, null);
+            }
         }
 
         if (shouldBlock)

Reply via email to