Author: jbellis
Date: Fri Jun 25 20:34:22 2010
New Revision: 958105

URL: http://svn.apache.org/viewvc?rev=958105&view=rev
Log:
allow multiple repair sessions per node.  patch by Stu Hood; reviewed by 
jbellis for CASSANDRA-1190

Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=958105&r1=958104&r2=958105&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Jun 25 20:34:22 2010
@@ -36,6 +36,7 @@ dev
  * revise HH schema to be per-endpoint (CASSANDRA-1142)
  * remove gossip message size limit (CASSANDRA-1138)
  * add joining/leaving status to nodetool ring (CASSANDRA-1115)
+ * allow multiple repair sessions per node (CASSANDRA-1190)
 
 
 0.6.3

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=958105&r1=958104&r2=958105&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
Fri Jun 25 20:34:22 2010
@@ -25,6 +25,10 @@ import java.security.NoSuchAlgorithmExce
 import java.util.*;
 import java.util.concurrent.*;
 
+import com.google.common.base.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.CompactionManager;
@@ -36,16 +40,15 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.AbstractCompactedRow;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.streaming.StreamIn;
 import org.apache.cassandra.streaming.StreamOut;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamOutManager;
 import org.apache.cassandra.utils.*;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * AntiEntropyService encapsulates "validating" (hashing) individual column 
families,
  * exchanging MerkleTrees with remote nodes via a TreeRequest/Response 
conversation,
@@ -82,24 +85,22 @@ public class AntiEntropyService
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AntiEntropyService.class);
 
-    // timeout for outstanding requests (48 hours)
-    public final static long REQUEST_TIMEOUT = 48*60*60*1000;
-
     // singleton enforcement
     public static final AntiEntropyService instance = new AntiEntropyService();
 
+    // timeout for outstanding requests (48 hours)
+    public final static long REQUEST_TIMEOUT = 48*60*60*1000;
+
     /**
-     * Map of column families to remote endpoints that need to rendezvous. The
-     * first endpoint to arrive at the rendezvous will store its tree in the
-     * appropriate slot of the TreePair object, and the second to arrive will
-     * remove the stored tree, and compare it.
+     * Map of outstanding sessions to requests. Once both trees reach the 
rendezvous, the local node
+     * will queue a Differencer to compare them.
      *
      * This map is only accessed from AE_SERVICE_STAGE, so it is not 
synchronized.
      */
-    private final Map<CFPair, ExpiringMap<InetAddress, TreePair>> trees;
+    private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests;
 
     /**
-     * A map of repair request ids to a Queue of TreeRequests that have been 
performed since the session was started.
+     * A map of repair session ids to a Queue of TreeRequests that have been 
performed since the session was started.
      */
     private final ConcurrentMap<String, BlockingQueue<TreeRequest>> sessions;
 
@@ -108,7 +109,7 @@ public class AntiEntropyService
      */
     protected AntiEntropyService()
     {
-        trees = new HashMap<CFPair, ExpiringMap<InetAddress, TreePair>>();
+        requests = new ExpiringMap<String, Map<TreeRequest, 
TreePair>>(REQUEST_TIMEOUT);
         sessions = new ConcurrentHashMap<String, BlockingQueue<TreeRequest>>();
     }
 
@@ -124,31 +125,27 @@ public class AntiEntropyService
     /**
      * Called by Differencer when a full repair round trip has been completed 
between the given CF and endpoints.
      */
-    void completedRequest(CFPair cf, InetAddress... endpoints)
+    void completedRequest(TreeRequest request)
     {
-        for (InetAddress endpoint : endpoints)
-        {
-            // indicate to each waiting session that this request completed
-            TreeRequest completed = new TreeRequest(cf, endpoint);
-            for (BlockingQueue<TreeRequest> session : sessions.values())
-                session.offer(completed);
-        }
+        // indicate to the waiting session that this request completed
+        BlockingQueue<TreeRequest> session = sessions.get(request.sessionid);
+        if (session == null)
+            // repair client disconnected: ignore
+            return;
+        session.offer(request);
     }
 
     /**
-     * Returns the map of waiting rendezvous endpoints to trees for the given 
cf.
+     * Returns the map of waiting rendezvous endpoints to trees for the given 
session.
      * Should only be called within AE_SERVICE_STAGE.
-     *
-     * @param cf Column family to fetch trees for.
-     * @return The store of trees for the given cf.
      */
-    private ExpiringMap<InetAddress, TreePair> rendezvousPairs(CFPair cf)
+    private Map<TreeRequest, TreePair> rendezvousPairs(String sessionid)
     {
-        ExpiringMap<InetAddress, TreePair> ctrees = trees.get(cf);
+        Map<TreeRequest, TreePair> ctrees = requests.get(sessionid);
         if (ctrees == null)
         {
-            ctrees = new ExpiringMap<InetAddress, TreePair>(REQUEST_TIMEOUT);
-            trees.put(cf, ctrees);
+            ctrees = new HashMap<TreeRequest, TreePair>();
+            requests.put(sessionid, ctrees);
         }
         return ctrees;
     }
@@ -171,55 +168,50 @@ public class AntiEntropyService
     }
 
     /**
-     * Register a tree from the given endpoint to be compared to the 
appropriate trees
-     * in AE_SERVICE_STAGE when they become available.
-     *
-     * @param cf The column family of the tree.
-     * @param endpoint The endpoint which owns the given tree.
-     * @param tree The tree for the endpoint.
+     * Register a tree for the given request to be compared to the appropriate 
trees in AE_SERVICE_STAGE when they become available.
      */
-    private void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree)
+    private void rendezvous(TreeRequest request, MerkleTree tree)
     {
         InetAddress LOCAL = FBUtilities.getLocalAddress();
 
-        // return the rendezvous pairs for this cf
-        ExpiringMap<InetAddress, TreePair> ctrees = rendezvousPairs(cf);
+        // the rendezvous pairs for this session
+        Map<TreeRequest, TreePair> ctrees = rendezvousPairs(request.sessionid);
 
         List<Differencer> differencers = new ArrayList<Differencer>();
-        if (LOCAL.equals(endpoint))
+        if (LOCAL.equals(request.endpoint))
         {
-            // we're registering a local tree: rendezvous with all remote trees
-            for (InetAddress neighbor : getNeighbors(cf.left))
+            // we're registering a local tree: rendezvous with remote requests 
for the session
+            for (InetAddress neighbor : getNeighbors(request.cf.left))
             {
-                TreePair waiting = ctrees.remove(neighbor);
+                TreeRequest remotereq = new TreeRequest(request.sessionid, 
neighbor, request.cf);
+                TreePair waiting = ctrees.remove(remotereq);
                 if (waiting != null && waiting.right != null)
                 {
                     // the neighbor beat us to the rendezvous: queue 
differencing
-                    differencers.add(new Differencer(cf, LOCAL, neighbor,
-                                                     tree, waiting.right));
+                    // FIXME: Differencer should take a TreeRequest
+                    differencers.add(new Differencer(remotereq, tree, 
waiting.right));
                     continue;
                 }
 
                 // else, the local tree is first to the rendezvous: store and 
wait
-                ctrees.put(neighbor, new TreePair(tree, null));
-                logger.debug("Stored local tree for " + cf + " to wait for " + 
neighbor);
+                ctrees.put(remotereq, new TreePair(tree, null));
+                logger.debug("Stored local tree for " + request + " to wait 
for " + remotereq);
             }
         }
         else
         {
             // we're registering a remote tree: rendezvous with the local tree
-            TreePair waiting = ctrees.remove(endpoint);
+            TreePair waiting = ctrees.remove(request);
             if (waiting != null && waiting.left != null)
             {
                 // the local tree beat us to the rendezvous: queue differencing
-                differencers.add(new Differencer(cf, LOCAL, endpoint,
-                                                 waiting.left, tree));
+                differencers.add(new Differencer(request, waiting.left, tree));
             }
             else
             {
                 // else, the remote tree is first to the rendezvous: store and 
wait
-                ctrees.put(endpoint, new TreePair(null, tree));
-                logger.debug("Stored remote tree for " + cf + " from " + 
endpoint);
+                ctrees.put(request, new TreePair(null, tree));
+                logger.debug("Stored remote tree for " + request + " to wait 
for local tree.");
             }
         }
 
@@ -231,41 +223,43 @@ public class AntiEntropyService
     }
 
     /**
-     * Called by a Validator to send a valid tree to endpoints storing
-     * replicas of local data.
-     *
-     * @param validator A locally generated validator.
-     * @param local The local endpoint.
-     * @param neighbors A list of neighbor endpoints to send the tree to.
+     * Requests a tree from the given node, and returns the request that was 
sent.
+     */
+    TreeRequest request(String sessionid, InetAddress remote, String ksname, 
String cfname)
+    {
+        TreeRequest request = new TreeRequest(sessionid, remote, new 
CFPair(ksname, cfname));
+        
MessagingService.instance.sendOneWay(TreeRequestVerbHandler.makeVerb(request), 
remote);
+        return request;
+    }
+
+    /**
+     * Responds to the node that requested the given valid tree.
+     * @param validator A locally generated validator
+     * @param local localhost (parameterized for testing)
      */
-    void notifyNeighbors(Validator validator, InetAddress local, 
Collection<InetAddress> neighbors)
+    void respond(Validator validator, InetAddress local)
     {
         MessagingService ms = MessagingService.instance;
 
         try
         {
             Message message = TreeResponseVerbHandler.makeVerb(local, 
validator);
-            logger.info("Sending AEService tree for " + validator.cf + " to: " 
+ neighbors);
-            for (InetAddress neighbor : neighbors)
-                ms.sendOneWay(message, neighbor);
+            logger.info("Sending AEService tree for " + validator.request);
+            ms.sendOneWay(message, validator.request.endpoint);
         }
         catch (Exception e)
         {
-            logger.error("Could not send valid tree to endpoints: " + 
neighbors, e);
+            logger.error("Could not send valid tree for request " + 
validator.request, e);
         }
     }
 
     /**
-     * Should only be used in AE_SERVICE_STAGE or for testing.
-     *
-     * @param table Table containing cf.
-     * @param cf The column family.
-     * @param remote The remote endpoint for the rendezvous.
-     * @return The tree pair for the given rendezvous if it exists, else  null.
+     * @return The tree pair for the given request if it exists.
      */
-    TreePair getRendezvousPair_TestsOnly(String table, String cf, InetAddress 
remote)
+    TreePair getRendezvousPair_TestsOnly(TreeRequest request)
     {
-        return rendezvousPairs(new CFPair(table, cf)).get(remote);
+        System.out.println(request + "\tvs\t" + 
rendezvousPairs(request.sessionid).keySet());
+        return rendezvousPairs(request.sessionid).get(request);
     }
 
     /**
@@ -278,7 +272,7 @@ public class AntiEntropyService
      */
     public static class Validator implements Callable<Object>
     {
-        public final CFPair cf; // TODO keep a CFS reference as a field 
instead of its string representation
+        public final TreeRequest request;
         public final MerkleTree tree;
 
         // the minimum token sorts first, but falls into the last range
@@ -291,18 +285,17 @@ public class AntiEntropyService
 
         public final static MerkleTree.RowHash EMPTY_ROW = new 
MerkleTree.RowHash(null, new byte[0]);
         
-        Validator(CFPair cf)
+        Validator(TreeRequest request)
         {
-            this(cf,
+            this(request,
                  // TODO: memory usage (maxsize) should either be tunable per
                  // CF, globally, or as shared for all CFs in a cluster
                  new MerkleTree(DatabaseDescriptor.getPartitioner(), 
MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)));
         }
 
-        Validator(CFPair cf, MerkleTree tree)
+        Validator(TreeRequest request, MerkleTree tree)
         {
-            assert cf != null && tree != null;
-            this.cf = cf;
+            this.request = request;
             this.tree = tree;
             minrows = new ArrayList<MerkleTree.RowHash>();
             mintoken = null;
@@ -334,7 +327,7 @@ public class AntiEntropyService
                         break;
                 }
             }
-            logger.debug("Prepared AEService tree of size " + tree.size() + " 
for " + cf);
+            logger.debug("Prepared AEService tree of size " + tree.size() + " 
for " + request);
             mintoken = tree.partitioner().getMinimumToken();
             ranges = tree.invalids(new Range(mintoken, mintoken));
         }
@@ -425,25 +418,18 @@ public class AntiEntropyService
                     range.addHash(minrow);
 
             StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(this);
-            logger.debug("Validated " + validated + " rows into AEService tree 
for " + cf);
+            logger.debug("Validated " + validated + " rows into AEService tree 
for " + request);
         }
         
         /**
-         * Called after the validation lifecycle to trigger additional action
-         * with the now valid tree. Runs in AE_SERVICE_STAGE.
+         * Called after the validation lifecycle to respond with the now valid 
tree. Runs in AE_SERVICE_STAGE.
          *
          * @return A meaningless object.
          */
         public Object call() throws Exception
         {
-            AntiEntropyService aes = AntiEntropyService.instance;
-            InetAddress local = FBUtilities.getLocalAddress();
-
-            Collection<InetAddress> neighbors = getNeighbors(cf.left);
-
-            // store the local tree and then broadcast it to our neighbors
-            aes.rendezvous(cf, local, tree);
-            aes.notifyNeighbors(this, local, neighbors);
+            // respond to the request that triggered this validation
+            AntiEntropyService.instance.respond(this, 
FBUtilities.getLocalAddress());
 
             // return any old object
             return AntiEntropyService.class;
@@ -451,22 +437,18 @@ public class AntiEntropyService
     }
 
     /**
-     * Compares two trees, and launches repairs for disagreeing ranges.
+     * Runs on the node that initiated a request to compares two trees, and 
launch repairs for disagreeing ranges.
      */
     public static class Differencer implements Runnable
     {
-        public final CFPair cf;
-        public final InetAddress local;
-        public final InetAddress remote;
+        public final TreeRequest request;
         public final MerkleTree ltree;
         public final MerkleTree rtree;
         public final List<MerkleTree.TreeRange> differences;
 
-        public Differencer(CFPair cf, InetAddress local, InetAddress remote, 
MerkleTree ltree, MerkleTree rtree)
+        public Differencer(TreeRequest request, MerkleTree ltree, MerkleTree 
rtree)
         {
-            this.cf = cf;
-            this.local = local;
-            this.remote = remote;
+            this.request = request;
             this.ltree = ltree;
             this.rtree = rtree;
             differences = new ArrayList<MerkleTree.TreeRange>();
@@ -477,6 +459,7 @@ public class AntiEntropyService
          */
         public void run()
         {
+            InetAddress local = FBUtilities.getLocalAddress();
             StorageService ss = StorageService.instance;
 
             // restore partitioners (in case we were serialized)
@@ -486,8 +469,8 @@ public class AntiEntropyService
                 rtree.partitioner(StorageService.getPartitioner());
 
             // determine the ranges where responsibility overlaps
-            Set<Range> interesting = new 
HashSet(ss.getRangesForEndpoint(cf.left, local));
-            interesting.retainAll(ss.getRangesForEndpoint(cf.left, remote));
+            Set<Range> interesting = new 
HashSet(ss.getRangesForEndpoint(request.cf.left, local));
+            interesting.retainAll(ss.getRangesForEndpoint(request.cf.left, 
request.endpoint));
 
             // compare trees, and filter out uninteresting differences
             for (MerkleTree.TreeRange diff : MerkleTree.difference(ltree, 
rtree))
@@ -506,7 +489,7 @@ public class AntiEntropyService
             float difference = differenceFraction();
             if (difference == 0.0)
             {
-                logger.info("Endpoints " + local + " and " + remote + " are 
consistent for " + cf);
+                logger.info("Endpoints " + local + " and " + request.endpoint 
+ " are consistent for " + request.cf);
             }
             else
             {
@@ -521,7 +504,7 @@ public class AntiEntropyService
             }
 
             // repair was completed successfully: notify any waiting sessions
-            AntiEntropyService.instance.completedRequest(cf, local, remote);
+            AntiEntropyService.instance.completedRequest(request);
         }
         
         /**
@@ -542,32 +525,39 @@ public class AntiEntropyService
          */
         void performStreamingRepair() throws IOException
         {
-            logger.info("Performing streaming repair of " + differences.size() 
+ " ranges to " + remote + " for " + cf);
-            ColumnFamilyStore cfstore = 
Table.open(cf.left).getColumnFamilyStore(cf.right);
+            logger.info("Performing streaming repair of " + differences.size() 
+ " ranges for " + request);
+            ColumnFamilyStore cfstore = 
Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
             try
             {
                 final List<Range> ranges = new ArrayList<Range>(differences);
                 final Collection<SSTableReader> sstables = 
cfstore.getSSTables();
+                // send ranges to the remote node
                 Future f = 
StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable() 
                 {
                     protected void runMayThrow() throws Exception
                     {
-                        StreamOut.transferSSTables(remote, cf.left, sstables, 
ranges);
-                        StreamOutManager.remove(remote);
+                        StreamOut.transferSSTables(request.endpoint, 
request.cf.left, sstables, ranges);
+                        StreamOutManager.remove(request.endpoint);
                     }
                 });
+                // request ranges from the remote node
+                // FIXME: no way to block for the 'requestRanges' call to 
complete, or to request a
+                // particular cf: see CASSANDRA-1189
+                StreamIn.requestRanges(request.endpoint, request.cf.left, 
ranges);
+                
+                // wait until streaming has completed
                 f.get();
             }
             catch(Exception e)
             {
                 throw new IOException("Streaming repair failed.", e);
             }
-            logger.info("Finished streaming repair to " + remote + " for " + 
cf);
+            logger.info("Finished streaming repair for " + request);
         }
 
         public String toString()
         {
-            return "#<Differencer " + cf + " local=" + local + " remote=" + 
remote + ">";
+            return "#<Differencer " + request + ">";
         }
     }
 
@@ -575,17 +565,20 @@ public class AntiEntropyService
      * Handler for requests from remote nodes to generate a valid tree.
      * The payload is a CFPair representing the columnfamily to validate.
      */
-    public static class TreeRequestVerbHandler implements IVerbHandler, 
ICompactSerializer<CFPair>
+    public static class TreeRequestVerbHandler implements IVerbHandler, 
ICompactSerializer<TreeRequest>
     {
         public static final TreeRequestVerbHandler SERIALIZER = new 
TreeRequestVerbHandler();
-        static Message makeVerb(String table, String cf)
+        static Message makeVerb(TreeRequest request)
         {
             try
             {
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
-                SERIALIZER.serialize(new CFPair(table, cf), dos);
-                return new Message(FBUtilities.getLocalAddress(), 
StageManager.AE_SERVICE_STAGE, StorageService.Verb.TREE_REQUEST, 
bos.toByteArray());
+                SERIALIZER.serialize(request, dos);
+                return new Message(FBUtilities.getLocalAddress(),
+                                   StageManager.AE_SERVICE_STAGE,
+                                   StorageService.Verb.TREE_REQUEST,
+                                   bos.toByteArray());
             }
             catch(IOException e)
             {
@@ -593,15 +586,19 @@ public class AntiEntropyService
             }
         }
 
-        public void serialize(CFPair treerequest, DataOutputStream dos) throws 
IOException
+        public void serialize(TreeRequest request, DataOutputStream dos) 
throws IOException
         {
-            dos.writeUTF(treerequest.left);
-            dos.writeUTF(treerequest.right);
+            dos.writeUTF(request.sessionid);
+            CompactEndpointSerializationHelper.serialize(request.endpoint, 
dos);
+            dos.writeUTF(request.cf.left);
+            dos.writeUTF(request.cf.right);
         }
 
-        public CFPair deserialize(DataInputStream dis) throws IOException
+        public TreeRequest deserialize(DataInputStream dis) throws IOException
         {
-            return new CFPair(dis.readUTF(), dis.readUTF());
+            return new TreeRequest(dis.readUTF(),
+                                   
CompactEndpointSerializationHelper.deserialize(dis),
+                                   new CFPair(dis.readUTF(), dis.readUTF()));
         }
 
         /**
@@ -611,19 +608,16 @@ public class AntiEntropyService
         { 
             byte[] bytes = message.getMessageBody();
             
-            ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+            DataInputStream buffer = new DataInputStream(new 
ByteArrayInputStream(bytes));
             try
             {
-                CFPair cf = this.deserialize(new DataInputStream(buffer));
-                TreeRequest request = new TreeRequest(cf, message.getFrom());
-                // FIXME: 0.7 should send the actual RepairSession id across 
with the request
-                String sessionid = request.toString();
+                TreeRequest remotereq = this.deserialize(buffer);
+                TreeRequest request = new TreeRequest(remotereq.sessionid, 
message.getFrom(), remotereq.cf);
 
                 // trigger readonly-compaction
-                logger.debug("Queueing validation compaction for session " + 
sessionid + " request " + request);
-                ColumnFamilyStore store = 
Table.open(cf.left).getColumnFamilyStore(cf.right);
-                // FIXME: should take session id and request
-                Validator validator = new Validator(request.left);
+                ColumnFamilyStore store = 
Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
+                Validator validator = new Validator(request);
+                logger.debug("Queueing validation compaction for " + request);
                 CompactionManager.instance.submitValidation(store, validator);
             }
             catch (IOException e)
@@ -657,7 +651,7 @@ public class AntiEntropyService
 
         public void serialize(Validator v, DataOutputStream dos) throws 
IOException
         {
-            TreeRequestVerbHandler.SERIALIZER.serialize(v.cf, dos);
+            TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos);
             ObjectOutputStream oos = new ObjectOutputStream(dos);
             oos.writeObject(v.tree);
             oos.flush();
@@ -665,11 +659,11 @@ public class AntiEntropyService
 
         public Validator deserialize(DataInputStream dis) throws IOException
         {
-            final CFPair cf = 
TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
+            final TreeRequest request = 
TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
             ObjectInputStream ois = new ObjectInputStream(dis);
             try
             {
-                return new Validator(cf, (MerkleTree)ois.readObject());
+                return new Validator(request, (MerkleTree)ois.readObject());
             }
             catch(Exception e)
             {
@@ -680,13 +674,14 @@ public class AntiEntropyService
         public void doVerb(Message message)
         { 
             byte[] bytes = message.getMessageBody();
-            ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+            DataInputStream buffer = new DataInputStream(new 
ByteArrayInputStream(bytes));
 
             try
             {
                 // deserialize the remote tree, and register it
-                Validator rvalidator = this.deserialize(new 
DataInputStream(buffer));
-                AntiEntropyService.instance.rendezvous(rvalidator.cf, 
message.getFrom(), rvalidator.tree);
+                Validator response = this.deserialize(buffer);
+                TreeRequest request = new 
TreeRequest(response.request.sessionid, message.getFrom(), response.request.cf);
+                AntiEntropyService.instance.rendezvous(request, response.tree);
             }
             catch (IOException e)
             {
@@ -708,27 +703,52 @@ public class AntiEntropyService
     }
 
     /**
-     * A tuple of a local and remote tree. One of the trees should be null, but
-     * not both.
+     * A tuple of a local and remote tree.
      */
     static class TreePair extends Pair<MerkleTree,MerkleTree>
     {
         public TreePair(MerkleTree local, MerkleTree remote)
         {
             super(local, remote);
-            assert local != null ^ remote != null;
         }
     }
 
     /**
      * A triple of table, cf and address that represents a location we have an 
outstanding TreeRequest for.
      */
-    static class TreeRequest extends Pair<CFPair,InetAddress>
+    public static class TreeRequest
     {
-        public TreeRequest(CFPair cf, InetAddress target)
+        public final String sessionid;
+        public final InetAddress endpoint;
+        public final CFPair cf;
+
+        public TreeRequest(String sessionid, InetAddress endpoint, CFPair cf)
+        {
+            this.sessionid = sessionid;
+            this.endpoint = endpoint;
+            this.cf = cf;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            return Objects.hashCode(sessionid, endpoint, cf);
+        }
+        
+        @Override
+        public final boolean equals(Object o)
+        {
+            if(!(o instanceof TreeRequest))
+                return false;
+            TreeRequest that = (TreeRequest)o;
+            // handles nulls properly
+            return Objects.equal(sessionid, that.sessionid) && 
Objects.equal(endpoint, that.endpoint) && Objects.equal(cf, that.cf);
+        }
+        
+        @Override
+        public String toString()
         {
-            super(cf, target);
-            assert cf != null && target != null;
+            return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + cf 
+ ">";
         }
     }
 
@@ -767,15 +787,13 @@ public class AntiEntropyService
                 // request that all relevant endpoints generate trees
                 Set<TreeRequest> requests = new HashSet<TreeRequest>();
                 Set<InetAddress> endpoints = 
AntiEntropyService.getNeighbors(tablename);
-                endpoints.add(FBUtilities.getLocalAddress());
                 for (String cfname : cfnames)
                 {
-                    Message request = 
TreeRequestVerbHandler.makeVerb(tablename, cfname);
+                    // send requests to remote nodes and record them
                     for (InetAddress endpoint : endpoints)
-                    {
-                        requests.add(new TreeRequest(new CFPair(tablename, 
cfname), endpoint));
-                        MessagingService.instance.sendOneWay(request, 
endpoint);
-                    }
+                        
requests.add(AntiEntropyService.this.request(getName(), endpoint, tablename, 
cfname));
+                    // send but don't record an outstanding request to the 
local node
+                    AntiEntropyService.this.request(getName(), 
FBUtilities.getLocalAddress(), tablename, cfname);
                 }
                 requestsMade.signalAll();
 

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=958105&r1=958104&r2=958105&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 Fri Jun 25 20:34:22 2010
@@ -53,6 +53,7 @@ public class AntiEntropyServiceTest exte
 
     public static String tablename;
     public static String cfname;
+    public static TreeRequest request;
     public static ColumnFamilyStore store;
     public static InetAddress LOCAL, REMOTE;
 
@@ -77,6 +78,9 @@ public class AntiEntropyServiceTest exte
         
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), LOCAL);
         
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), 
REMOTE);
         assert tmd.isMember(REMOTE);
+        
+        // random session id for each test
+        request = new TreeRequest(UUID.randomUUID().toString(), LOCAL, new 
CFPair(tablename, cfname));
     }
 
     @After
@@ -90,13 +94,6 @@ public class AntiEntropyServiceTest exte
     }
 
     @Test
-    public void testInstance() throws Throwable
-    {
-        assert null != aes;
-        assert aes == AntiEntropyService.instance;
-    }
-
-    @Test
     public void testValidatorPrepare() throws Throwable
     {
         Validator validator;
@@ -110,7 +107,7 @@ public class AntiEntropyServiceTest exte
         Util.writeColumnFamily(rms);
 
         // sample
-        validator = new Validator(new CFPair(tablename, cfname));
+        validator = new Validator(request);
         validator.prepare(store);
 
         // and confirm that the tree was split
@@ -120,7 +117,7 @@ public class AntiEntropyServiceTest exte
     @Test
     public void testValidatorComplete() throws Throwable
     {
-        Validator validator = new Validator(new CFPair(tablename, cfname));
+        Validator validator = new Validator(request);
         validator.prepare(store);
         validator.complete();
 
@@ -135,7 +132,7 @@ public class AntiEntropyServiceTest exte
     @Test
     public void testValidatorAdd() throws Throwable
     {
-        Validator validator = new Validator(new CFPair(tablename, cfname));
+        Validator validator = new Validator(request);
         IPartitioner part = validator.tree.partitioner();
         Token min = part.getMinimumToken();
         Token mid = part.midpoint(min, min);
@@ -154,31 +151,6 @@ public class AntiEntropyServiceTest exte
         assert null != validator.tree.hash(new Range(min, min));
     }
 
-    /**
-     * Build a column family with 2 or more SSTables, and then force a major 
compaction
-     */
-    @Test
-    public void testTreeStore() throws Throwable
-    {
-        // populate column family
-        List<RowMutation> rms = new LinkedList<RowMutation>();
-        RowMutation rm = new RowMutation(tablename, "key".getBytes());
-        rm.add(new QueryPath(cfname, null, "Column1".getBytes()), 
"asdf".getBytes(), new TimestampClock(0));
-        rms.add(rm);
-        // with two SSTables
-        Util.writeColumnFamily(rms);
-        Util.writeColumnFamily(rms);
-        
-        TreePair old = aes.getRendezvousPair_TestsOnly(tablename, cfname, 
REMOTE);
-        // force a readonly compaction, and wait for it to finish
-        Validator validator = new Validator(new CFPair(tablename, cfname));
-        CompactionManager.instance.submitValidation(store, 
validator).get(5000, TimeUnit.MILLISECONDS);
-
-        // check that a tree was created and stored
-        flushAES().get(5000, TimeUnit.MILLISECONDS);
-        assert old != aes.getRendezvousPair_TestsOnly(tablename, cfname, 
REMOTE);
-    }
-
     @Test
     public void testManualRepair() throws Throwable
     {
@@ -190,35 +162,14 @@ public class AntiEntropyServiceTest exte
         sess.join(100);
         assert sess.isAlive();
 
-        // deliver fake responses from LOCAL and REMOTE
-        AntiEntropyService.instance.completedRequest(new CFPair(tablename, 
cfname), LOCAL);
-        AntiEntropyService.instance.completedRequest(new CFPair(tablename, 
cfname), REMOTE);
+        // deliver a fake response from REMOTE
+        AntiEntropyService.instance.completedRequest(new 
TreeRequest(sess.getName(), REMOTE, request.cf));
 
         // block until the repair has completed
         sess.join();
     }
 
     @Test
-    public void testNotifyNeighbors() throws Throwable
-    {
-        // generate empty tree
-        Validator validator = new Validator(new CFPair(tablename, cfname));
-        validator.prepare(store);
-        validator.complete();
-
-        // grab reference to the tree
-        MerkleTree tree = validator.tree;
-
-        // notify ourself (should immediately be delivered into AE_STAGE)
-        aes.notifyNeighbors(validator, LOCAL, Arrays.asList(LOCAL));
-        flushAES().get(5, TimeUnit.SECONDS);
-        
-        // confirm that our reference is not equal to the original due
-        // to (de)serialization
-        assert tree != aes.getRendezvousPair_TestsOnly(tablename, cfname, 
REMOTE).left;
-    }
-
-    @Test
     public void testGetNeighborsPlusOne() throws Throwable
     {
         // generate rf+1 nodes, and ensure that all nodes are returned
@@ -248,13 +199,13 @@ public class AntiEntropyServiceTest exte
     public void testDifferencer() throws Throwable
     {
         // generate a tree
-        Validator validator = new Validator(new CFPair(tablename, cfname));
+        Validator validator = new Validator(request);
         validator.prepare(store);
         validator.complete();
         MerkleTree ltree = validator.tree;
 
         // and a clone
-        validator = new Validator(new CFPair(tablename, cfname));
+        validator = new Validator(request);
         validator.prepare(store);
         validator.complete();
         MerkleTree rtree = validator.tree;
@@ -266,8 +217,7 @@ public class AntiEntropyServiceTest exte
         changed.hash("non-empty hash!".getBytes());
 
         // difference the trees
-        Differencer diff = new Differencer(new CFPair(tablename, cfname),
-                                           LOCAL, LOCAL, ltree, rtree);
+        Differencer diff = new Differencer(request, ltree, rtree);
         diff.run();
         
         // ensure that the changed range was recorded


Reply via email to