Author: jbellis
Date: Sun May 3 03:26:07 2009
New Revision: 771019
URL: http://svn.apache.org/viewvc?rev=771019&view=rev
Log:
make sendMessage only return true after ack by recipient.
patch by Jun Rao; reviewed by jbellis for CASSANDRA-34
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=771019&r1=771018&r2=771019&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Sun May 3 03:26:07 2009
@@ -21,8 +21,10 @@
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.io.IOException;
import org.apache.log4j.Logger;
@@ -33,8 +35,7 @@
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.IComponentShutdown;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.*;
/**
@@ -83,26 +84,22 @@
endPoint_ = endPoint;
}
- private boolean sendMessage(String endpointAddress, String key) throws
Exception
+ private boolean sendMessage(String endpointAddress, String key) throws
DigestMismatchException, TimeoutException, IOException
{
- boolean success = false; // TODO : fix the hack we need to make
sure the data is written on the other end.
- if(FailureDetector.instance().isAlive(new
EndPoint(endpointAddress, DatabaseDescriptor.getControlPort())))
- {
- success = true;
- }
- else
- {
- return success;
- }
- Table table = Table.open(DatabaseDescriptor.getTables().get(0));
- Row row = null;
- row = table.get(key);
- RowMutation rm = new
RowMutation(DatabaseDescriptor.getTables().get(0), row);
- RowMutationMessage rmMsg = new RowMutationMessage(rm);
- Message message =
RowMutationMessage.makeRowMutationMessage( rmMsg );
- EndPoint endPoint = new EndPoint(endpointAddress,
DatabaseDescriptor.getStoragePort());
-
MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
- return success;
+ EndPoint endPoint = new EndPoint(endpointAddress,
DatabaseDescriptor.getStoragePort());
+ if (!FailureDetector.instance().isAlive(endPoint))
+ {
+ return false;
+ }
+
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ Row row = table.get(key);
+ RowMutation rm = new
RowMutation(DatabaseDescriptor.getTables().get(0), row);
+ Message message = rm.makeRowMutationMessage();
+
+ QuorumResponseHandler<Boolean> quorumResponseHandler = new
QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
+ MessagingService.getMessagingInstance().sendRR(message, new
EndPoint[]{ endPoint }, quorumResponseHandler);
+ return quorumResponseHandler.get();
}
private void deleteEndPoint(String endpointAddress, String key) throws
Exception