Author: jbellis
Date: Mon May 24 18:10:07 2010
New Revision: 947737

URL: http://svn.apache.org/viewvc?rev=947737&view=rev
Log:
Add repair and move requests into AntiEntropyService.  patch by Stu Hood; 
reviewed by jbellis for CASSANDRA-1090

Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=947737&r1=947736&r2=947737&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
Mon May 24 18:10:07 2010
@@ -111,12 +111,41 @@ public class AntiEntropyService
     private final Map<CFPair, ExpiringMap<InetAddress, TreePair>> trees;
 
     /**
+     * A map of repair request ids to a Queue of TreeRequests that have been 
performed since the session was started.
+     */
+    private final ConcurrentMap<String, BlockingQueue<TreeRequest>> sessions;
+
+    /**
      * Protected constructor. Use AntiEntropyService.instance.
      */
     protected AntiEntropyService()
     {
         naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
         trees = new HashMap<CFPair, ExpiringMap<InetAddress, TreePair>>();
+        sessions = new ConcurrentHashMap<String, BlockingQueue<TreeRequest>>();
+    }
+
+    /**
+     * Requests repairs for the given table and column families, and blocks 
until all repairs have been completed.
+     * TODO: Should add retries: if nodes go offline before they respond to 
the requests, this could block forever.
+     */
+    public RepairSession getRepairSession(String tablename, String... cfnames)
+    {
+        return new RepairSession(tablename, cfnames);
+    }
+
+    /**
+     * Called by Differencer when a full repair round trip has been completed 
between the given CF and endpoints.
+     */
+    void completedRequest(CFPair cf, InetAddress... endpoints)
+    {
+        for (InetAddress endpoint : endpoints)
+        {
+            // indicate to each waiting session that this request completed
+            TreeRequest completed = new TreeRequest(cf, endpoint);
+            for (BlockingQueue<TreeRequest> session : sessions.values())
+                session.offer(completed);
+        }
     }
 
     /**
@@ -140,7 +169,7 @@ public class AntiEntropyService
     /**
      * Return all of the neighbors with whom we share data.
      */
-    public static Set<InetAddress> getNeighbors(String table)
+    static Set<InetAddress> getNeighbors(String table)
     {
         StorageService ss = StorageService.instance;
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
@@ -579,20 +608,24 @@ public class AntiEntropyService
             
             // choose a repair method based on the significance of the 
difference
             float difference = differenceFraction();
-            try
+            if (difference == 0.0)
             {
-                if (difference == 0.0)
-                {
-                    logger.debug("Endpoints " + local + " and " + remote + " 
are consistent for " + cf);
-                    return;
-                }
-                
-                performStreamingRepair();
+                logger.info("Endpoints " + local + " and " + remote + " are 
consistent for " + cf);
             }
-            catch(IOException e)
+            else
             {
-                throw new RuntimeException(e);
+                try
+                {
+                    performStreamingRepair();
+                }
+                catch(IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
+
+            // repair was completed successfully: notify any waiting sessions
+            AntiEntropyService.instance.completedRequest(cf, local, remote);
         }
         
         /**
@@ -778,4 +811,83 @@ public class AntiEntropyService
             assert local != null ^ remote != null;
         }
     }
+
+    /**
+     * A triple of table, cf and address that represents a location we have an 
outstanding TreeRequest for.
+     */
+    static class TreeRequest extends Pair<CFPair,InetAddress>
+    {
+        public TreeRequest(CFPair cf, InetAddress target)
+        {
+            super(cf, target);
+            assert cf != null && target != null;
+        }
+    }
+
+    /**
+     * Triggers repairs with all neighbors for the given table and cfs. 
Typical lifecycle is: start() then join().
+     */
+    class RepairSession extends Thread
+    {
+        private final String tablename;
+        private final String[] cfnames;
+        private final SimpleCondition requestsMade;
+        public RepairSession(String tablename, String... cfnames)
+        {
+            super("manual-repair-" + UUID.randomUUID());
+            this.tablename = tablename;
+            this.cfnames = cfnames;
+            this.requestsMade = new SimpleCondition();
+        }
+
+        /**
+         * Waits until all requests for the session have been sent out: to 
wait for the session to end, call join().
+         */
+        public void blockUntilRunning() throws InterruptedException
+        {
+            requestsMade.await();
+        }
+
+        @Override
+        public void run()
+        {
+            // begin a repair session
+            BlockingQueue<TreeRequest> completed = new 
LinkedBlockingQueue<TreeRequest>();
+            AntiEntropyService.this.sessions.put(getName(), completed);
+            try
+            {
+                // request that all relevant endpoints generate trees
+                Set<TreeRequest> requests = new HashSet<TreeRequest>();
+                Set<InetAddress> endpoints = 
AntiEntropyService.getNeighbors(tablename);
+                endpoints.add(FBUtilities.getLocalAddress());
+                for (String cfname : cfnames)
+                {
+                    Message request = 
TreeRequestVerbHandler.makeVerb(tablename, cfname);
+                    for (InetAddress endpoint : endpoints)
+                    {
+                        requests.add(new TreeRequest(new CFPair(tablename, 
cfname), endpoint));
+                        MessagingService.instance.sendOneWay(request, 
endpoint);
+                    }
+                }
+                requestsMade.signalAll();
+
+                // block until all requests have been returned by 
completedRequest calls
+                logger.info("Waiting for repair requests to: " + requests);
+                while (!requests.isEmpty())
+                {
+                    TreeRequest request = completed.take();
+                    logger.info("Repair request to " + request + " completed 
successfully.");
+                    requests.remove(request);
+                }
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException("Interrupted while waiting for 
repair: repair will continue in the background.");
+            }
+            finally
+            {
+                AntiEntropyService.this.sessions.remove(getName());
+            }
+        }
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=947737&r1=947736&r2=947737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Mon May 24 18:10:07 2010
@@ -1112,15 +1112,17 @@ public class StorageService implements I
      */
     public void forceTableRepair(final String tableName, final String... 
columnFamilies) throws IOException
     {
-        // request that all relevant endpoints generate trees
-        final MessagingService ms = MessagingService.instance;
-        final Set<InetAddress> endpoints = 
AntiEntropyService.getNeighbors(tableName);
-        endpoints.add(FBUtilities.getLocalAddress());
-        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
-        {
-            Message request = TreeRequestVerbHandler.makeVerb(tableName, 
cfStore.getColumnFamilyName());
-            for (InetAddress endpoint : endpoints)
-                ms.sendOneWay(request, endpoint);
+        AntiEntropyService.RepairSession sess = 
AntiEntropyService.instance.getRepairSession(tableName, columnFamilies);
+        
+        try
+        {
+            sess.start();
+            // block until the repair has completed
+            sess.join();
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException("Repair session " + sess + " failed.", e);
         }
     }
 

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=947737&r1=947736&r2=947737&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 Mon May 24 18:10:07 2010
@@ -193,6 +193,25 @@ public class AntiEntropyServiceTest exte
     }
 
     @Test
+    public void testManualRepair() throws Throwable
+    {
+        AntiEntropyService.RepairSession sess = 
AntiEntropyService.instance.getRepairSession(tablename, cfname);
+        sess.start();
+        sess.blockUntilRunning();
+
+        // ensure that the session doesn't end without a response from REMOTE
+        sess.join(100);
+        assert sess.isAlive();
+
+        // deliver fake responses from LOCAL and REMOTE
+        AntiEntropyService.instance.completedRequest(new CFPair(tablename, 
cfname), LOCAL);
+        AntiEntropyService.instance.completedRequest(new CFPair(tablename, 
cfname), REMOTE);
+
+        // block until the repair has completed
+        sess.join();
+    }
+
+    @Test
     public void testNotifyNeighbors() throws Throwable
     {
         // generate empty tree


Reply via email to