Author: slebresne
Date: Tue Jul 26 08:30:06 2011
New Revision: 1151018

URL: http://svn.apache.org/viewvc?rev=1151018&view=rev
Log:
merge from 0.8

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/contrib/   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 08:30:06 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7:1026516-1149015,1149716
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1149016,1149217,1149235,1149332,1149725,1150103
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1149016,1149121,1149217,1149235,1149332,1149725,1150103
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1151018&r1=1151017&r2=1151018&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jul 26 08:30:06 2011
@@ -60,6 +60,7 @@
  * fix re-using index CF sstable names after drop/recreate (CASSANDRA-2872)
  * prepend CF to default index names (CASSANDRA-2903)
  * fix hint replay (CASSANDRA-2928)
+ * Properly synchronize merkle tree computation (CASSANDRA-2816)
 
 
 0.8.1

Modified: cassandra/trunk/conf/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1151018&r1=1151017&r2=1151018&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Tue Jul 26 08:30:06 2011
@@ -260,13 +260,15 @@ column_index_size_in_kb: 64
 # will be logged specifying the row key.
 in_memory_compaction_limit_in_mb: 64
 
-# Number of compaction threads. This default to the number of processors,
+# Number of compaction threads (NOT including validation "compactions"
+# for anti-entropy repair). This default to the number of processors,
 # enabling multiple compactions to execute at once. Using more than one
 # thread is highly recommended to preserve read performance in a mixed
 # read/write workload as this avoids sstables from accumulating during long
 # running compactions. The default is usually fine and if you experience
 # problems with compaction running too slowly or too fast, you should look at
 # compaction_throughput_mb_per_sec first.
+#
 # Uncomment to make compaction mono-threaded.
 #concurrent_compactors: 1
 
@@ -274,7 +276,8 @@ in_memory_compaction_limit_in_mb: 64
 # system. The faster you insert data, the faster you need to compact in
 # order to keep the sstable count down, but in general, setting this to
 # 16 to 32 times the rate you are inserting data is more than sufficient.
-# Setting this to 0 disables throttling.
+# Setting this to 0 disables throttling. Note that this account for all types
+# of compaction, including validation compaction.
 compaction_throughput_mb_per_sec: 16
 
 # Track cached row keys during compaction, and re-cache their new

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 08:30:06 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
 /cassandra/branches/cassandra-0.7/contrib:1026516-1149015,1149716
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1149016,1149217,1149235,1149332,1149725,1150103
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1149016,1149121,1149217,1149235,1149332,1149725,1150103
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 08:30:06 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1149015,1149716
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1149016,1149217,1149235,1149332,1149725,1150103
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1149016,1149121,1149217,1149235,1149332,1149725,1150103
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 08:30:06 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1149015,1149716
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1149016,1149217,1149235,1149332,1149725,1150103
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1149016,1149121,1149217,1149235,1149332,1149725,1150103
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 08:30:06 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1149015,1149716
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1149016,1149217,1149235,1149332,1149725,1150103
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1149016,1149121,1149217,1149235,1149332,1149725,1150103
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 08:30:06 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1149015,1149716
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1149016,1149217,1149235,1149332,1149725,1150103
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1149016,1149121,1149217,1149235,1149332,1149725,1150103
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jul 26 08:30:06 2011
@@ -1,7 +1,7 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1149015,1149716
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1149016,1149217,1149235,1149332,1149725,1150103
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1149016,1149121,1149217,1149235,1149332,1149725,1150103
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1151018&r1=1151017&r2=1151018&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 Tue Jul 26 08:30:06 2011
@@ -52,9 +52,14 @@ public class DebuggableThreadPoolExecuto
         this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, 
priority));
     }
 
-    public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
+    public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> queue, ThreadFactory factory)
     {
-        super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+        this(corePoolSize, corePoolSize, keepAliveTime, unit, queue, factory);
+    }
+
+    protected DebuggableThreadPoolExecutor(int corePoolSize, int maxPoolSize, 
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
ThreadFactory threadFactory)
+    {
+        super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
         allowCoreThreadTimeOut(true);
 
         // block task submissions until queue has room.

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1151018&r1=1151017&r2=1151018&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Tue Jul 26 08:30:06 2011
@@ -81,6 +81,7 @@ public class CompactionManager implement
     }
 
     private CompactionExecutor executor = new CompactionExecutor();
+    private CompactionExecutor validationExecutor = new ValidationExecutor();
 
     /**
      * @return A lock, for which acquisition means no compactions can run.
@@ -426,7 +427,7 @@ public class CompactionManager implement
                 }
             }
         };
-        return executor.submit(callable);
+        return validationExecutor.submit(callable);
     }
 
     /* Used in tests. */
@@ -794,7 +795,7 @@ public class CompactionManager implement
 
         Collection<SSTableReader> sstables = 
cfs.markCurrentSSTablesReferenced();
         CompactionIterator ci = new ValidationCompactionIterator(cfs, 
sstables, validator.request.range);
-        executor.beginCompaction(ci);
+        validationExecutor.beginCompaction(ci);
         try
         {
             Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, 
Predicates.notNull());
@@ -812,7 +813,7 @@ public class CompactionManager implement
         {
             SSTableReader.releaseReferences(sstables);
             ci.close();
-            executor.finishCompaction(ci);
+            validationExecutor.finishCompaction(ci);
         }
     }
 
@@ -940,28 +941,32 @@ public class CompactionManager implement
 
     public int getActiveCompactions()
     {
-        return executor.getActiveCount();
+        return executor.getActiveCount() + validationExecutor.getActiveCount();
     }
 
     private static class CompactionExecutor extends 
DebuggableThreadPoolExecutor implements CompactionExecutorStatsCollector
     {
         // a synchronized identity set of running tasks to their compaction 
info
-        private final Set<CompactionInfo.Holder> compactions;
+        private static final Set<CompactionInfo.Holder> compactions = 
Collections.synchronizedSet(Collections.newSetFromMap(new 
IdentityHashMap<CompactionInfo.Holder, Boolean>()));
 
-        public CompactionExecutor()
+        protected CompactionExecutor(int minThreads, int maxThreads, String 
name, BlockingQueue<Runnable> queue)
         {
-            super(getThreadCount(),
+            super(minThreads,
+                  maxThreads,
                   60,
                   TimeUnit.SECONDS,
-                  new LinkedBlockingQueue<Runnable>(),
-                  new NamedThreadFactory("CompactionExecutor", 
DatabaseDescriptor.getCompactionThreadPriority()));
-            Map<CompactionInfo.Holder, Boolean> cmap = new 
IdentityHashMap<CompactionInfo.Holder, Boolean>();
-            compactions = 
Collections.synchronizedSet(Collections.newSetFromMap(cmap));
+                  queue,
+                  new NamedThreadFactory(name, 
DatabaseDescriptor.getCompactionThreadPriority()));
+        }
+
+        private CompactionExecutor(int threadCount, String name)
+        {
+            this(threadCount, threadCount, name, new 
LinkedBlockingQueue<Runnable>());
         }
 
-        private static int getThreadCount()
+        public CompactionExecutor()
         {
-            return Math.max(1, DatabaseDescriptor.getConcurrentCompactors());
+            this(Math.max(1, DatabaseDescriptor.getConcurrentCompactors()), 
"CompactionExecutor");
         }
 
         public void beginCompaction(CompactionInfo.Holder ci)
@@ -974,12 +979,20 @@ public class CompactionManager implement
             compactions.remove(ci);
         }
 
-        public List<CompactionInfo.Holder> getCompactions()
+        public static List<CompactionInfo.Holder> getCompactions()
         {
             return new ArrayList<CompactionInfo.Holder>(compactions);
         }
     }
 
+    private static class ValidationExecutor extends CompactionExecutor
+    {
+        public ValidationExecutor()
+        {
+            super(1, Integer.MAX_VALUE, "ValidationExecutor", new 
SynchronousQueue<Runnable>());
+        }
+    }
+
     public interface CompactionExecutorStatsCollector
     {
         void beginCompaction(CompactionInfo.Holder ci);
@@ -989,7 +1002,7 @@ public class CompactionManager implement
     public List<CompactionInfo> getCompactions()
     {
         List<CompactionInfo> out = new ArrayList<CompactionInfo>();
-        for (CompactionInfo.Holder ci : executor.getCompactions())
+        for (CompactionInfo.Holder ci : CompactionExecutor.getCompactions())
             out.add(ci.getCompactionInfo());
         return out;
     }
@@ -997,7 +1010,7 @@ public class CompactionManager implement
     public List<String> getCompactionSummary()
     {
         List<String> out = new ArrayList<String>();
-        for (CompactionInfo.Holder ci : executor.getCompactions())
+        for (CompactionInfo.Holder ci : CompactionExecutor.getCompactions())
             out.add(ci.getCompactionInfo().toString());
         return out;
     }
@@ -1012,12 +1025,12 @@ public class CompactionManager implement
                 n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
             }
         }
-        return (int) (executor.getTaskCount() - 
executor.getCompletedTaskCount()) + n;
+        return (int) (executor.getTaskCount() + 
validationExecutor.getTaskCount() - executor.getCompletedTaskCount() - 
validationExecutor.getCompletedTaskCount()) + n;
     }
 
     public long getCompletedTasks()
     {
-        return executor.getCompletedTaskCount();
+        return executor.getCompletedTaskCount() + 
validationExecutor.getCompletedTaskCount();
     }
     
     private static class SimpleFuture implements Future

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1151018&r1=1151017&r2=1151018&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
Tue Jul 26 08:30:06 2011
@@ -22,9 +22,9 @@ import java.io.*;
 import java.net.InetAddress;
 import java.security.MessageDigest;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 
 import com.google.common.base.Objects;
 
@@ -109,7 +109,7 @@ public class AntiEntropyService
     /**
      * A map of repair session ids to a Queue of TreeRequests that have been 
performed since the session was started.
      */
-    private final ConcurrentMap<String, RepairSession.Callback> sessions;
+    private final ConcurrentMap<String, RepairSession> sessions;
 
     /**
      * Protected constructor. Use AntiEntropyService.instance.
@@ -117,7 +117,7 @@ public class AntiEntropyService
     protected AntiEntropyService()
     {
         requests = new ExpiringMap<String, Map<TreeRequest, 
TreePair>>(REQUEST_TIMEOUT);
-        sessions = new ConcurrentHashMap<String, RepairSession.Callback>();
+        sessions = new ConcurrentHashMap<String, RepairSession>();
     }
 
     /**
@@ -128,37 +128,13 @@ public class AntiEntropyService
     {
         return new RepairSession(range, tablename, cfnames);
     }
-    
+
     RepairSession getArtificialRepairSession(TreeRequest req, String 
tablename, String... cfnames)
     {
         return new RepairSession(req, tablename, cfnames);
     }
 
     /**
-     * Called by Differencer when a full repair round trip has been completed 
between the given CF and endpoints.
-     */
-    void completedRequest(TreeRequest request)
-    {
-        // indicate to the waiting session that this request completed
-        sessions.get(request.sessionid).completed(request);
-    }
-
-    /**
-     * Returns the map of waiting rendezvous endpoints to trees for the given 
session.
-     * Should only be called within Stage.ANTIENTROPY.
-     */
-    private Map<TreeRequest, TreePair> rendezvousPairs(String sessionid)
-    {
-        Map<TreeRequest, TreePair> ctrees = requests.get(sessionid);
-        if (ctrees == null)
-        {
-            ctrees = new HashMap<TreeRequest, TreePair>();
-            requests.put(sessionid, ctrees);
-        }
-        return ctrees;
-    }
-
-    /**
      * Return all of the neighbors with whom we share data.
      */
     static Set<InetAddress> getNeighbors(String table, Range range)
@@ -189,53 +165,27 @@ public class AntiEntropyService
      */
     private void rendezvous(TreeRequest request, MerkleTree tree)
     {
-        InetAddress LOCAL = FBUtilities.getBroadcastAddress();
+        RepairSession session = sessions.get(request.sessionid);
+        assert session != null;
 
-        // the rendezvous pairs for this session
-        Map<TreeRequest, TreePair> ctrees = rendezvousPairs(request.sessionid);
-
-        List<Differencer> differencers = new ArrayList<Differencer>();
-        if (LOCAL.equals(request.endpoint))
-        {
-            // we're registering a local tree: rendezvous with remote requests 
for the session
-            for (InetAddress neighbor : getNeighbors(request.cf.left, 
request.range))
-            {
-                TreeRequest remotereq = new TreeRequest(request.sessionid, 
neighbor, request.range, request.cf);
-                TreePair waiting = ctrees.remove(remotereq);
-                if (waiting != null && waiting.right != null)
-                {
-                    // the neighbor beat us to the rendezvous: queue 
differencing
-                    // FIXME: Differencer should take a TreeRequest
-                    differencers.add(new Differencer(remotereq, tree, 
waiting.right));
-                    continue;
-                }
+        RepairSession.RepairJob job = session.jobs.peek();
+        assert job != null : "A repair should have at least some jobs 
scheduled";
 
-                // else, the local tree is first to the rendezvous: store and 
wait
-                ctrees.put(remotereq, new TreePair(tree, null));
-                logger.debug("Stored local tree for " + request + " to wait 
for " + remotereq);
-            }
-        }
-        else
+        if (job.addTree(request, tree) == 0)
         {
-            // we're registering a remote tree: rendezvous with the local tree
-            TreePair waiting = ctrees.remove(request);
-            if (waiting != null && waiting.left != null)
-            {
-                // the local tree beat us to the rendezvous: queue differencing
-                differencers.add(new Differencer(request, waiting.left, tree));
-            }
+            logger.debug("All trees received for " + session.getName() + "/" + 
request.cf.right);
+            job.submitDifferencers();
+
+            // This job is complete, switching to next in line (note that only
+            // one thread will can ever do this)
+            session.jobs.poll();
+            RepairSession.RepairJob nextJob = session.jobs.peek();
+            if (nextJob == null)
+                // We are done with this repair session as far as differencing
+                // is considern. Just inform the session
+                session.differencingDone.signalAll();
             else
-            {
-                // else, the remote tree is first to the rendezvous: store and 
wait
-                ctrees.put(request, new TreePair(null, tree));
-                logger.debug("Stored remote tree for " + request + " to wait 
for local tree.");
-            }
-        }
-
-        for (Differencer differencer : differencers)
-        {
-            logger.info("Queueing comparison " + differencer);
-            StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+                nextJob.sendTreeRequests();
         }
     }
 
@@ -403,6 +353,14 @@ public class AntiEntropyService
          */
         public void complete()
         {
+            completeTree();
+
+            StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
+            logger.debug("Validated " + validated + " rows into AEService tree 
for " + request);
+        }
+
+        void completeTree()
+        {
             assert ranges != null : "Validator was not prepared()";
 
             if (range != null)
@@ -412,11 +370,8 @@ public class AntiEntropyService
                 range = ranges.next();
                 range.addHash(EMPTY_ROW);
             }
-
-            StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
-            logger.debug("Validated " + validated + " rows into AEService tree 
for " + request);
         }
-        
+
         /**
          * Called after the validation lifecycle to respond with the now valid 
tree. Runs in Stage.ANTIENTROPY.
          *
@@ -430,113 +385,6 @@ public class AntiEntropyService
     }
 
     /**
-     * Runs on the node that initiated a request to compare two trees, and 
launch repairs for disagreeing ranges.
-     */
-    public static class Differencer implements Runnable
-    {
-        public final TreeRequest request;
-        public final MerkleTree ltree;
-        public final MerkleTree rtree;
-        public List<Range> differences;
-
-        public Differencer(TreeRequest request, MerkleTree ltree, MerkleTree 
rtree)
-        {
-            this.request = request;
-            this.ltree = ltree;
-            this.rtree = rtree;
-            this.differences = new ArrayList<Range>();
-        }
-
-        /**
-         * Compares our trees, and triggers repairs for any ranges that 
mismatch.
-         */
-        public void run()
-        {
-            InetAddress local = FBUtilities.getBroadcastAddress();
-
-            // restore partitioners (in case we were serialized)
-            if (ltree.partitioner() == null)
-                ltree.partitioner(StorageService.getPartitioner());
-            if (rtree.partitioner() == null)
-                rtree.partitioner(StorageService.getPartitioner());
-
-            // compare trees, and collect differences
-            differences.addAll(MerkleTree.difference(ltree, rtree));
-
-            // choose a repair method based on the significance of the 
difference
-            String format = "Endpoints " + local + " and " + request.endpoint 
+ " %s for " + request.cf + " on " + request.range;
-            if (differences.isEmpty())
-            {
-                logger.info(String.format(format, "are consistent"));
-                AntiEntropyService.instance.completedRequest(request);
-                return;
-            }
-
-            // non-0 difference: perform streaming repair
-            logger.info(String.format(format, "have " + differences.size() + " 
range(s) out of sync"));
-            try
-            {
-                performStreamingRepair();
-            }
-            catch(IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-        
-        /**
-         * Starts sending/receiving our list of differences to/from the remote 
endpoint: creates a callback
-         * that will be called out of band once the streams complete.
-         */
-        void performStreamingRepair() throws IOException
-        {
-            logger.info("Performing streaming repair of " + differences.size() 
+ " ranges for " + request);
-            ColumnFamilyStore cfstore = 
Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
-            try
-            {
-                // We acquire references for transferSSTables
-                Collection<SSTableReader> sstables = 
cfstore.markCurrentSSTablesReferenced();
-                Callback callback = new Callback();
-                // send ranges to the remote node
-                StreamOutSession outsession = 
StreamOutSession.create(request.cf.left, request.endpoint, callback);
-                StreamOut.transferSSTables(outsession, sstables, differences, 
OperationType.AES);
-                // request ranges from the remote node
-                StreamIn.requestRanges(request.endpoint, request.cf.left, 
Collections.singletonList(cfstore), differences, callback, OperationType.AES);
-            }
-            catch(Exception e)
-            {
-                throw new IOException("Streaming repair failed.", e);
-            }
-        }
-
-        public String toString()
-        {
-            return "#<Differencer " + request + ">";
-        }
-
-        /**
-         * When a repair is necessary, this callback is created to wait for 
the inbound
-         * and outbound streams to complete.
-         */
-        class Callback extends WrappedRunnable
-        {
-            // we expect one callback for the receive, and one for the send
-            private final AtomicInteger outstanding = new AtomicInteger(2);
-
-            protected void runMayThrow() throws Exception
-            {
-                if (outstanding.decrementAndGet() > 0)
-                    // waiting on more calls
-                    return;
-
-                // all calls finished successfully
-                logger.info("Finished streaming repair for " + request);
-                AntiEntropyService.instance.completedRequest(request);
-            }
-        }
-    }
-
-    /**
      * Handler for requests from remote nodes to generate a valid tree.
      * The payload is a CFPair representing the columnfamily to validate.
      */
@@ -744,48 +592,44 @@ public class AntiEntropyService
     {
         private final String tablename;
         private final String[] cfnames;
-        private final SimpleCondition requestsMade;
-        private final ConcurrentHashMap<TreeRequest,Object> requests;
+        private final ConcurrentHashMap<TreeRequest,Object> requests = new 
ConcurrentHashMap<TreeRequest,Object>();
         private final Range range;
-        
+        private final Set<InetAddress> endpoints;
+
+        private CountDownLatch completedLatch;
+        final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
+
+        public final Condition differencingDone = new SimpleCondition();
+
         public RepairSession(TreeRequest req, String tablename, String... 
cfnames)
         {
-            super(req.sessionid);
-            this.range = req.range;
-            this.tablename = tablename;
-            this.cfnames = cfnames;
-            requestsMade = new SimpleCondition();
-            this.requests = new ConcurrentHashMap<TreeRequest,Object>();
+            this(req.sessionid, req.range, tablename, cfnames);
             requests.put(req, this);
-            Callback callback = new Callback();
-            AntiEntropyService.instance.sessions.put(getName(), callback);
+            completedLatch = new CountDownLatch(cfnames.length);
+            AntiEntropyService.instance.sessions.put(getName(), this);
         }
-        
+
         public RepairSession(Range range, String tablename, String... cfnames)
         {
-            super("manual-repair-" + UUID.randomUUID());
-            this.tablename = tablename;
-            this.cfnames = cfnames;
-            this.range = range;
-            this.requestsMade = new SimpleCondition();
-            this.requests = new ConcurrentHashMap<TreeRequest,Object>();
+            this("manual-repair-" + UUID.randomUUID(), range, tablename, 
cfnames);
         }
 
-        /**
-         * Waits until all requests for the session have been sent out: to 
wait for the session to end, call join().
-         */
-        public void blockUntilRunning() throws InterruptedException
+        private RepairSession(String id, Range range, String tablename, 
String[] cfnames)
         {
-            requestsMade.await();
+            super(id);
+            this.tablename = tablename;
+            this.cfnames = cfnames;
+            assert cfnames.length > 0 : "Repairing no column families seems 
pointless, doesn't it";
+            this.range = range;
+            this.endpoints = AntiEntropyService.getNeighbors(tablename, range);
         }
 
         @Override
         public void run()
         {
-            Set<InetAddress> endpoints = 
AntiEntropyService.getNeighbors(tablename, range);
             if (endpoints.isEmpty())
             {
-                requestsMade.signalAll();
+                differencingDone.signalAll();
                 logger.info("No neighbors to repair with for " + tablename + " 
on " + range + ": " + getName() + " completed.");
                 return;
             }
@@ -795,65 +639,211 @@ public class AntiEntropyService
             {
                 if (!FailureDetector.instance.isAlive(endpoint))
                 {
+                    differencingDone.signalAll();
                     logger.info("Could not proceed on repair because a 
neighbor (" + endpoint + ") is dead: " + getName() + " failed.");
                     return;
                 }
             }
 
-            // begin a repair session
-            Callback callback = new Callback();
-            AntiEntropyService.instance.sessions.put(getName(), callback);
+            AntiEntropyService.instance.sessions.put(getName(), this);
             try
             {
-                // request that all relevant endpoints generate trees
+                // Create and queue a RepairJob for each column family
                 for (String cfname : cfnames)
-                {
-                    // send requests to remote nodes and record them
-                    for (InetAddress endpoint : endpoints)
-                        
requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, 
tablename, cfname), this);
-                    // send but don't record an outstanding request to the 
local node
-                    AntiEntropyService.instance.request(getName(), 
FBUtilities.getBroadcastAddress(), range, tablename, cfname);
-                }
-                logger.info("Waiting for repair requests: " + 
requests.keySet());
-                requestsMade.signalAll();
+                    jobs.offer(new RepairJob(cfname));
+
+                // We'll repair once by endpoints and column family
+                completedLatch = new CountDownLatch(endpoints.size() * 
cfnames.length);
+
+                jobs.peek().sendTreeRequests();
 
                 // block whatever thread started this session until all 
requests have been returned:
                 // if this thread dies, the session will still complete in the 
background
-                callback.completed.await();
+                completedLatch.await();
             }
             catch (InterruptedException e)
             {
                 throw new RuntimeException("Interrupted while waiting for 
repair: repair will continue in the background.");
             }
+            finally
+            {
+                AntiEntropyService.instance.sessions.remove(getName());
+            }
+        }
+
+        void completed(InetAddress remote, String cfname)
+        {
+            logger.debug("Repair completed for {} on {}", remote, cfname);
+            completedLatch.countDown();
+        }
+
+        class RepairJob
+        {
+            private final String cfname;
+            private final AtomicInteger remaining;
+            private final Map<InetAddress, MerkleTree> trees;
+
+            public RepairJob(String cfname)
+            {
+                this.cfname = cfname;
+                this.remaining = new AtomicInteger(endpoints.size() + 1); // 
all neighbor + local host
+                this.trees = new ConcurrentHashMap<InetAddress, MerkleTree>();
+            }
+
+            /**
+             * Send merkle tree request to every involved neighbor.
+             */
+            public void sendTreeRequests()
+            {
+                // send requests to remote nodes and record them
+                for (InetAddress endpoint : endpoints)
+                    
requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, 
tablename, cfname), RepairSession.this);
+                // send but don't record an outstanding request to the local 
node
+                AntiEntropyService.instance.request(getName(), 
FBUtilities.getBroadcastAddress(), range, tablename, cfname);
+            }
+
+            /**
+             * Add a new received tree and return the number of remaining tree 
to
+             * be received for the job to be complete.
+             */
+            public int addTree(TreeRequest request, MerkleTree tree)
+            {
+                assert request.cf.right.equals(cfname);
+                trees.put(request.endpoint, tree);
+                return remaining.decrementAndGet();
+            }
+
+            /**
+             * Submit differencers for running.
+             * All tree *must* have been received before this is called.
+             */
+            public void submitDifferencers()
+            {
+                assert remaining.get() == 0;
+
+                // Right now, we only difference local host against each 
other. CASSANDRA-2610 will fix that.
+                // In the meantime ugly special casing will work good enough.
+                MerkleTree localTree =
+                trees.get(FBUtilities.getBroadcastAddress());
+                assert localTree != null;
+                for (Map.Entry<InetAddress, MerkleTree> entry : 
trees.entrySet())
+                {
+                    if 
(entry.getKey().equals(FBUtilities.getBroadcastAddress()))
+                        continue;
+
+                    Differencer differencer = new Differencer(cfname, 
entry.getKey(), entry.getValue(), localTree);
+                    logger.debug("Queueing comparison " + differencer);
+                    
StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+                }
+            }
         }
 
         /**
-         * Receives notifications of completed requests, and sets a condition 
when all requests
-         * triggered by this session have completed.
+         * Runs on the node that initiated a request to compare two trees, and 
launch repairs for disagreeing ranges.
          */
-        class Callback
+        class Differencer implements Runnable
         {
-            public final SimpleCondition completed = new SimpleCondition();
-            public void completed(TreeRequest request)
+            public final String cfname;
+            public final InetAddress remote;
+            public final MerkleTree ltree;
+            public final MerkleTree rtree;
+            public List<Range> differences;
+
+            Differencer(String cfname, InetAddress remote, MerkleTree ltree, 
MerkleTree rtree)
+            {
+                this.cfname = cfname;
+                this.remote = remote;
+                this.ltree = ltree;
+                this.rtree = rtree;
+                this.differences = new ArrayList<Range>();
+            }
+
+            /**
+             * Compares our trees, and triggers repairs for any ranges that 
mismatch.
+             */
+            public void run()
+            {
+                InetAddress local = FBUtilities.getBroadcastAddress();
+
+                // restore partitioners (in case we were serialized)
+                if (ltree.partitioner() == null)
+                    ltree.partitioner(StorageService.getPartitioner());
+                if (rtree.partitioner() == null)
+                    rtree.partitioner(StorageService.getPartitioner());
+
+                // compare trees, and collect differences
+                differences.addAll(MerkleTree.difference(ltree, rtree));
+
+                // choose a repair method based on the significance of the 
difference
+                String format = "Endpoints " + local + " and " + remote + " %s 
for " + cfname + " on " + range;
+                if (differences.isEmpty())
+                {
+                    logger.info(String.format(format, "are consistent"));
+                    completed(remote, cfname);
+                    return;
+                }
+
+                // non-0 difference: perform streaming repair
+                logger.info(String.format(format, "have " + differences.size() 
+ " range(s) out of sync"));
+                try
+                {
+                    performStreamingRepair();
+                }
+                catch(IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            /**
+             * Starts sending/receiving our list of differences to/from the 
remote endpoint: creates a callback
+             * that will be called out of band once the streams complete.
+             */
+            void performStreamingRepair() throws IOException
             {
-                // don't mark any requests completed until all requests have 
been made
+                logger.info("Performing streaming repair of " + 
differences.size() + " ranges with " + remote + " for " + range);
+                ColumnFamilyStore cfstore = 
Table.open(tablename).getColumnFamilyStore(cfname);
                 try
                 {
-                    blockUntilRunning();
+                    Collection<SSTableReader> sstables = cfstore.getSSTables();
+                    Callback callback = new Callback();
+                    // send ranges to the remote node
+                    StreamOutSession outsession = 
StreamOutSession.create(tablename, remote, callback);
+                    StreamOut.transferSSTables(outsession, sstables, 
differences, OperationType.AES);
+                    // request ranges from the remote node
+                    StreamIn.requestRanges(remote, tablename, differences, 
callback, OperationType.AES);
                 }
-                catch (InterruptedException e)
+                catch(Exception e)
                 {
-                    throw new AssertionError(e);
+                    throw new IOException("Streaming repair failed.", e);
                 }
-                requests.remove(request);
-                logger.info("{} completed successfully: {} outstanding.", 
request, requests.size());
-                if (!requests.isEmpty())
-                    return;
+            }
 
-                // all requests completed
-                logger.info("Repair session " + getName() + " completed 
successfully.");
-                AntiEntropyService.instance.sessions.remove(getName());
-                completed.signalAll();
+            public String toString()
+            {
+                return "#<Differencer " + remote + "/" + range + ">";
+            }
+
+            /**
+             * When a repair is necessary, this callback is created to wait 
for the inbound
+             * and outbound streams to complete.
+             */
+            class Callback extends WrappedRunnable
+            {
+                // we expect one callback for the receive, and one for the send
+                private final AtomicInteger outstanding = new AtomicInteger(2);
+
+                protected void runMayThrow() throws Exception
+                {
+                    if (outstanding.decrementAndGet() > 0)
+                        // waiting on more calls
+                        return;
+
+                    // all calls finished successfully
+                    //
+                    completed(remote, cfname);
+                    logger.info(String.format("Finished streaming repair with 
%s for %s: %d oustanding to complete session", remote, range, 
completedLatch.getCount()));
+                }
             }
         }
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1151018&r1=1151017&r2=1151018&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Tue Jul 26 08:30:06 2011
@@ -1497,7 +1497,17 @@ public class StorageService implements I
         List<AntiEntropyService.RepairSession> sessions = new 
ArrayList<AntiEntropyService.RepairSession>();
         for (Range range : getLocalRanges(tableName))
         {
-            sessions.add(forceTableRepair(range, tableName, columnFamilies));
+            AntiEntropyService.RepairSession session = forceTableRepair(range, 
tableName, columnFamilies);
+            sessions.add(session);
+            // wait for a session to be done with its differencing before 
starting the next one
+            try
+            {
+                session.differencingDone.await();
+            }
+            catch (InterruptedException e)
+            {
+                logger_.error("Interrupted while waiting for the differencing 
of repair session " + session + " to be done. Repair may be imprecise.", e);
+            }
         }
 
         boolean failedSession = false;

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1151018&r1=1151017&r2=1151018&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 Tue Jul 26 08:30:06 2011
@@ -134,7 +134,7 @@ public abstract class AntiEntropyService
     {
         Validator validator = new Validator(request);
         validator.prepare(store);
-        validator.complete();
+        validator.completeTree();
 
         // confirm that the tree was validated
         Token min = validator.tree.partitioner().getMinimumToken();
@@ -151,7 +151,7 @@ public abstract class AntiEntropyService
 
         // add a row
         validator.add(new PrecompactedRow(new DecoratedKey(mid, 
ByteBufferUtil.bytes("inconceivable!")), null));
-        validator.complete();
+        validator.completeTree();
 
         // confirm that the tree was validated
         assert null != validator.tree.hash(local_range);
@@ -162,14 +162,13 @@ public abstract class AntiEntropyService
     {
         AntiEntropyService.RepairSession sess = 
AntiEntropyService.instance.getRepairSession(local_range, tablename, cfname);
         sess.start();
-        sess.blockUntilRunning();
 
         // ensure that the session doesn't end without a response from REMOTE
-        sess.join(100);
+        sess.join(500);
         assert sess.isAlive();
 
         // deliver a fake response from REMOTE
-        AntiEntropyService.instance.completedRequest(new 
TreeRequest(sess.getName(), REMOTE, local_range, request.cf));
+        sess.completed(REMOTE, request.cf.right);
 
         // block until the repair has completed
         sess.join();
@@ -222,13 +221,13 @@ public abstract class AntiEntropyService
         // generate a tree
         Validator validator = new Validator(request);
         validator.prepare(store);
-        validator.complete();
+        validator.completeTree();
         MerkleTree ltree = validator.tree;
 
         // and a clone
         validator = new Validator(request);
         validator.prepare(store);
-        validator.complete();
+        validator.completeTree();
         MerkleTree rtree = validator.tree;
 
         // change a range in one of the trees
@@ -241,7 +240,7 @@ public abstract class AntiEntropyService
         interesting.add(changed);
 
         // difference the trees
-        Differencer diff = new Differencer(request, ltree, rtree);
+        AntiEntropyService.RepairSession.Differencer diff = sess.new 
Differencer(cfname, request.endpoint, ltree, rtree);
         diff.run();
         
         // ensure that the changed range was recorded


Reply via email to