Author: jbellis
Date: Wed Jan 5 07:15:06 2011
New Revision: 1055325
URL: http://svn.apache.org/viewvc?rev=1055325&view=rev
Log:
commit RepairCallback.java
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1055325&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
(added)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
Wed Jan 5 07:15:06 2011
@@ -0,0 +1,55 @@
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public class RepairCallback<T> implements IAsyncCallback
+{
+ public final IResponseResolver<T> resolver;
+ private final List<InetAddress> endpoints;
+ protected final SimpleCondition condition = new SimpleCondition();
+ private final long startTime;
+
+ public RepairCallback(IResponseResolver<T> resolver, List<InetAddress>
endpoints)
+ {
+ this.resolver = resolver;
+ this.endpoints = endpoints;
+ this.startTime = System.currentTimeMillis();
+ }
+
+ /**
+ * The main difference between this and ReadCallback is, ReadCallback has
a ConsistencyLevel
+ * it needs to achieve. Repair on the other hand is happy to repair
whoever replies within the timeout.
+ */
+ public T get() throws TimeoutException, DigestMismatchException,
IOException
+ {
+ long timeout = DatabaseDescriptor.getRpcTimeout() -
(System.currentTimeMillis() - startTime);
+ try
+ {
+ condition.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
+ }
+
+ return resolver.resolve();
+ }
+
+
+ public void response(Message message)
+ {
+ resolver.preprocess(message);
+ if (resolver.getMessageCount() == endpoints.size())
+ condition.signal();
+ }
+
+}