Author: jbellis
Date: Sun May  3 03:26:07 2009
New Revision: 771019

URL: http://svn.apache.org/viewvc?rev=771019&view=rev
Log:
make sendMessage only return true after ack by recipient.
patch by Jun Rao; reviewed by jbellis for CASSANDRA-34

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=771019&r1=771018&r2=771019&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 Sun May  3 03:26:07 2009
@@ -21,8 +21,10 @@
 import java.util.Collection;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.io.IOException;
 
 import org.apache.log4j.Logger;
 
@@ -33,8 +35,7 @@
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.IComponentShutdown;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.*;
 
 
 /**
@@ -83,26 +84,22 @@
                endPoint_ = endPoint;
         }
 
-        private boolean sendMessage(String endpointAddress, String key) throws 
Exception
+        private boolean sendMessage(String endpointAddress, String key) throws 
DigestMismatchException, TimeoutException, IOException
         {
-               boolean success = false; // TODO : fix the hack we need to make 
sure the data is written on the other end.
-               if(FailureDetector.instance().isAlive(new 
EndPoint(endpointAddress, DatabaseDescriptor.getControlPort())))
-               {
-                       success = true;
-               }
-               else
-               {
-                       return success;
-               }
-               Table table = Table.open(DatabaseDescriptor.getTables().get(0));
-               Row row = null;
-               row = table.get(key);
-               RowMutation rm = new 
RowMutation(DatabaseDescriptor.getTables().get(0), row);
-                       RowMutationMessage rmMsg = new RowMutationMessage(rm);
-                       Message message = 
RowMutationMessage.makeRowMutationMessage( rmMsg );
-                       EndPoint endPoint = new EndPoint(endpointAddress, 
DatabaseDescriptor.getStoragePort());
-                       
MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
-                       return success;
+            EndPoint endPoint = new EndPoint(endpointAddress, 
DatabaseDescriptor.getStoragePort());
+            if (!FailureDetector.instance().isAlive(endPoint))
+            {
+                return false;
+            }
+
+            Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+            Row row = table.get(key);
+            RowMutation rm = new 
RowMutation(DatabaseDescriptor.getTables().get(0), row);
+            Message message = rm.makeRowMutationMessage();
+
+            QuorumResponseHandler<Boolean> quorumResponseHandler = new 
QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
+            MessagingService.getMessagingInstance().sendRR(message, new 
EndPoint[]{ endPoint }, quorumResponseHandler);
+            return quorumResponseHandler.get();
         }
 
         private void deleteEndPoint(String endpointAddress, String key) throws 
Exception


Reply via email to