Author: jbellis
Date: Mon May 24 18:10:07 2010
New Revision: 947737
URL: http://svn.apache.org/viewvc?rev=947737&view=rev
Log:
Add repair and move requests into AntiEntropyService. patch by Stu Hood;
reviewed by jbellis for CASSANDRA-1090
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=947737&r1=947736&r2=947737&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Mon May 24 18:10:07 2010
@@ -111,12 +111,41 @@ public class AntiEntropyService
private final Map<CFPair, ExpiringMap<InetAddress, TreePair>> trees;
/**
+ * A map of repair request ids to a Queue of TreeRequests that have been
performed since the session was started.
+ */
+ private final ConcurrentMap<String, BlockingQueue<TreeRequest>> sessions;
+
+ /**
* Protected constructor. Use AntiEntropyService.instance.
*/
protected AntiEntropyService()
{
naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
trees = new HashMap<CFPair, ExpiringMap<InetAddress, TreePair>>();
+ sessions = new ConcurrentHashMap<String, BlockingQueue<TreeRequest>>();
+ }
+
+ /**
+ * Requests repairs for the given table and column families, and blocks
until all repairs have been completed.
+ * TODO: Should add retries: if nodes go offline before they respond to
the requests, this could block forever.
+ */
+ public RepairSession getRepairSession(String tablename, String... cfnames)
+ {
+ return new RepairSession(tablename, cfnames);
+ }
+
+ /**
+ * Called by Differencer when a full repair round trip has been completed
between the given CF and endpoints.
+ */
+ void completedRequest(CFPair cf, InetAddress... endpoints)
+ {
+ for (InetAddress endpoint : endpoints)
+ {
+ // indicate to each waiting session that this request completed
+ TreeRequest completed = new TreeRequest(cf, endpoint);
+ for (BlockingQueue<TreeRequest> session : sessions.values())
+ session.offer(completed);
+ }
}
/**
@@ -140,7 +169,7 @@ public class AntiEntropyService
/**
* Return all of the neighbors with whom we share data.
*/
- public static Set<InetAddress> getNeighbors(String table)
+ static Set<InetAddress> getNeighbors(String table)
{
StorageService ss = StorageService.instance;
Set<InetAddress> neighbors = new HashSet<InetAddress>();
@@ -579,20 +608,24 @@ public class AntiEntropyService
// choose a repair method based on the significance of the
difference
float difference = differenceFraction();
- try
+ if (difference == 0.0)
{
- if (difference == 0.0)
- {
- logger.debug("Endpoints " + local + " and " + remote + "
are consistent for " + cf);
- return;
- }
-
- performStreamingRepair();
+ logger.info("Endpoints " + local + " and " + remote + " are
consistent for " + cf);
}
- catch(IOException e)
+ else
{
- throw new RuntimeException(e);
+ try
+ {
+ performStreamingRepair();
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
+
+ // repair was completed successfully: notify any waiting sessions
+ AntiEntropyService.instance.completedRequest(cf, local, remote);
}
/**
@@ -778,4 +811,83 @@ public class AntiEntropyService
assert local != null ^ remote != null;
}
}
+
+ /**
+ * A triple of table, cf and address that represents a location we have an
outstanding TreeRequest for.
+ */
+ static class TreeRequest extends Pair<CFPair,InetAddress>
+ {
+ public TreeRequest(CFPair cf, InetAddress target)
+ {
+ super(cf, target);
+ assert cf != null && target != null;
+ }
+ }
+
+ /**
+ * Triggers repairs with all neighbors for the given table and cfs.
Typical lifecycle is: start() then join().
+ */
+ class RepairSession extends Thread
+ {
+ private final String tablename;
+ private final String[] cfnames;
+ private final SimpleCondition requestsMade;
+ public RepairSession(String tablename, String... cfnames)
+ {
+ super("manual-repair-" + UUID.randomUUID());
+ this.tablename = tablename;
+ this.cfnames = cfnames;
+ this.requestsMade = new SimpleCondition();
+ }
+
+ /**
+ * Waits until all requests for the session have been sent out: to
wait for the session to end, call join().
+ */
+ public void blockUntilRunning() throws InterruptedException
+ {
+ requestsMade.await();
+ }
+
+ @Override
+ public void run()
+ {
+ // begin a repair session
+ BlockingQueue<TreeRequest> completed = new
LinkedBlockingQueue<TreeRequest>();
+ AntiEntropyService.this.sessions.put(getName(), completed);
+ try
+ {
+ // request that all relevant endpoints generate trees
+ Set<TreeRequest> requests = new HashSet<TreeRequest>();
+ Set<InetAddress> endpoints =
AntiEntropyService.getNeighbors(tablename);
+ endpoints.add(FBUtilities.getLocalAddress());
+ for (String cfname : cfnames)
+ {
+ Message request =
TreeRequestVerbHandler.makeVerb(tablename, cfname);
+ for (InetAddress endpoint : endpoints)
+ {
+ requests.add(new TreeRequest(new CFPair(tablename,
cfname), endpoint));
+ MessagingService.instance.sendOneWay(request,
endpoint);
+ }
+ }
+ requestsMade.signalAll();
+
+ // block until all requests have been returned by
completedRequest calls
+ logger.info("Waiting for repair requests to: " + requests);
+ while (!requests.isEmpty())
+ {
+ TreeRequest request = completed.take();
+ logger.info("Repair request to " + request + " completed
successfully.");
+ requests.remove(request);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Interrupted while waiting for
repair: repair will continue in the background.");
+ }
+ finally
+ {
+ AntiEntropyService.this.sessions.remove(getName());
+ }
+ }
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=947737&r1=947736&r2=947737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Mon May 24 18:10:07 2010
@@ -1112,15 +1112,17 @@ public class StorageService implements I
*/
public void forceTableRepair(final String tableName, final String...
columnFamilies) throws IOException
{
- // request that all relevant endpoints generate trees
- final MessagingService ms = MessagingService.instance;
- final Set<InetAddress> endpoints =
AntiEntropyService.getNeighbors(tableName);
- endpoints.add(FBUtilities.getLocalAddress());
- for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName,
columnFamilies))
- {
- Message request = TreeRequestVerbHandler.makeVerb(tableName,
cfStore.getColumnFamilyName());
- for (InetAddress endpoint : endpoints)
- ms.sendOneWay(request, endpoint);
+ AntiEntropyService.RepairSession sess =
AntiEntropyService.instance.getRepairSession(tableName, columnFamilies);
+
+ try
+ {
+ sess.start();
+ // block until the repair has completed
+ sess.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Repair session " + sess + " failed.", e);
}
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=947737&r1=947736&r2=947737&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Mon May 24 18:10:07 2010
@@ -193,6 +193,25 @@ public class AntiEntropyServiceTest exte
}
@Test
+ public void testManualRepair() throws Throwable
+ {
+ AntiEntropyService.RepairSession sess =
AntiEntropyService.instance.getRepairSession(tablename, cfname);
+ sess.start();
+ sess.blockUntilRunning();
+
+ // ensure that the session doesn't end without a response from REMOTE
+ sess.join(100);
+ assert sess.isAlive();
+
+ // deliver fake responses from LOCAL and REMOTE
+ AntiEntropyService.instance.completedRequest(new CFPair(tablename,
cfname), LOCAL);
+ AntiEntropyService.instance.completedRequest(new CFPair(tablename,
cfname), REMOTE);
+
+ // block until the repair has completed
+ sess.join();
+ }
+
+ @Test
public void testNotifyNeighbors() throws Throwable
{
// generate empty tree