Author: slebresne
Date: Fri Sep  2 10:24:21 2011
New Revision: 1164463

URL: http://svn.apache.org/viewvc?rev=1164463&view=rev
Log:
Make repair of a range sync all replica pairs for this range
patch by slebresne; reviewed by jbellis for CASSANDRA-2610

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Sep  2 10:24:21 2011
@@ -55,6 +55,7 @@
    to be down before starting (CASSANDRA-2034)
  * Make the compression algorithm and chunk length configurable 
(CASSANDRA-3001)
  * Add throttling for internode streaming (CASSANDRA-3080)
+ * make the repair of a range repair all replica (CASSANDRA-2610)
 
 0.8.5
  * fix NPE when encryption_options is unspecified (CASSANDRA-3007)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri 
Sep  2 10:24:21 2011
@@ -63,9 +63,11 @@ public final class MessagingService impl
 {
     public static final String MBEAN_NAME = 
"org.apache.cassandra.net:type=MessagingService";
 
+    // 8 bits version, so don't waste versions
     public static final int VERSION_07 = 1;
     public static final int VERSION_080 = 2;
-    public static final int version_ = 3; // 8 bits, so don't waste versions
+    public static final int VERSION_10 = 3;
+    public static final int version_ = VERSION_10;
 
     static SerializerType serializerType_ = SerializerType.BINARY;
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
Fri Sep  2 10:24:21 2011
@@ -24,7 +24,6 @@ import java.security.MessageDigest;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 
 import com.google.common.base.Objects;
@@ -44,17 +43,13 @@ import org.apache.cassandra.dht.RandomPa
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.streaming.StreamIn;
-import org.apache.cassandra.streaming.StreamOut;
-import org.apache.cassandra.streaming.StreamOutSession;
+import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.utils.*;
 
 /**
@@ -130,6 +125,8 @@ public class AntiEntropyService
         return futureTask;
     }
 
+    // for testing only. Create a session corresponding to a fake request and
+    // add it to the sessions (avoid NPE in tests)
     RepairFuture submitArtificialRepairSession(TreeRequest req, String 
tablename, String... cfnames)
     {
         RepairFuture futureTask = new RepairSession(req, tablename, 
cfnames).getFuture();
@@ -171,6 +168,8 @@ public class AntiEntropyService
         RepairSession session = sessions.get(request.sessionid);
         assert session != null;
 
+        logger.info(String.format("[repair #%s] Received merkle tree for %s 
from %s", session.getName(), request.cf.right, request.endpoint));
+
         RepairSession.RepairJob job = session.jobs.peek();
         assert job != null : "A repair should have at least some jobs 
scheduled";
 
@@ -214,12 +213,13 @@ public class AntiEntropyService
         try
         {
             Message message = TreeResponseVerbHandler.makeVerb(local, 
validator);
-            logger.info("Sending AEService tree for " + validator.request);
+            if 
(!validator.request.endpoint.equals(FBUtilities.getBroadcastAddress()))
+                logger.info(String.format("[repair #%s] Sending completed 
merkle tree to %s for %s", validator.request.sessionid, 
validator.request.endpoint, validator.request.cf));
             ms.sendOneWay(message, validator.request.endpoint);
         }
         catch (Exception e)
         {
-            logger.error("Could not send valid tree for request " + 
validator.request, e);
+            logger.error(String.format("[repair #%s] Error sending completed 
merkle tree to %s for %s ", validator.request.sessionid, 
validator.request.endpoint, validator.request.cf), e);
         }
     }
 
@@ -534,17 +534,6 @@ public class AntiEntropyService
     }
 
     /**
-     * A tuple of a local and remote tree.
-     */
-    static class TreePair extends Pair<MerkleTree,MerkleTree>
-    {
-        public TreePair(MerkleTree local, MerkleTree remote)
-        {
-            super(local, remote);
-        }
-    }
-
-    /**
      * A tuple of table, cf, address and range that represents a location we 
have an outstanding TreeRequest for.
      */
     public static class TreeRequest
@@ -613,7 +602,7 @@ public class AntiEntropyService
 
         public RepairSession(Range range, String tablename, String... cfnames)
         {
-            this("manual-repair-" + UUID.randomUUID(), range, tablename, 
cfnames);
+            
this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(),
 range, tablename, cfnames);
         }
 
         private RepairSession(String id, Range range, String tablename, 
String[] cfnames)
@@ -636,13 +625,24 @@ public class AntiEntropyService
             return new RepairFuture(this);
         }
 
+        private String repairedNodes()
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append(FBUtilities.getBroadcastAddress());
+            for (InetAddress ep : endpoints)
+                sb.append(", ").append(ep);
+            return sb.toString();
+        }
+
         // we don't care about the return value but care about it throwing 
exception
         public void runMayThrow() throws Exception
         {
+            logger.info(String.format("[repair #%s] new session: will sync %s 
on range %s for %s.%s", getName(), repairedNodes(), range, tablename, 
Arrays.toString(cfnames)));
+
             if (endpoints.isEmpty())
             {
                 differencingDone.signalAll();
-                logger.info("No neighbors to repair with for " + tablename + " 
on " + range + ": " + getName() + " completed.");
+                logger.info("[repair #%s] No neighbors to repair with on range 
%s: session completed", getName(), range);
                 return;
             }
 
@@ -652,7 +652,7 @@ public class AntiEntropyService
                 if (!FailureDetector.instance.isAlive(endpoint))
                 {
                     differencingDone.signalAll();
-                    logger.info("Could not proceed on repair because a 
neighbor (" + endpoint + ") is dead: " + getName() + " failed.");
+                    logger.info("[repair #%s] Could not proceed on repair 
because a neighbor (%s) is dead: session failed", getName(), endpoint);
                     return;
                 }
             }
@@ -675,8 +675,15 @@ public class AntiEntropyService
                 // block whatever thread started this session until all 
requests have been returned:
                 // if this thread dies, the session will still complete in the 
background
                 completed.await();
-                if (exception != null)
+                if (exception == null)
+                {
+                    logger.info(String.format("[repair #%s] session completed 
successfully", getName()));
+                }
+                else
+                {
+                    logger.error(String.format("[repair #%s] session completed 
with the following error", getName()), exception);
                     throw exception;
+                }
             }
             catch (InterruptedException e)
             {
@@ -690,13 +697,19 @@ public class AntiEntropyService
             }
         }
 
-        void completed(InetAddress remote, String cfname)
+        void completed(Differencer differencer)
         {
-            logger.debug("Repair completed for {} on {}", remote, cfname);
-            RepairJob job = activeJobs.get(cfname);
-            if (job.completedSynchronizationJob(remote))
-            {
-                activeJobs.remove(cfname);
+            logger.debug(String.format("[repair #%s] Repair completed between 
%s and %s on %s",
+                                       getName(),
+                                       differencer.r1.endpoint,
+                                       differencer.r2.endpoint,
+                                       differencer.cfname));
+            RepairJob job = activeJobs.get(differencer.cfname);
+            if (job.completedSynchronization(differencer))
+            {
+                activeJobs.remove(differencer.cfname);
+                String remaining = activeJobs.size() == 0 ? "" : 
String.format(" (%d remaining column family to sync for this session)", 
activeJobs.size());
+                logger.info(String.format("[repair #%s] %s is fully synced%s", 
getName(), differencer.cfname, remaining));
                 if (activeJobs.isEmpty())
                     completed.signalAll();
             }
@@ -704,8 +717,7 @@ public class AntiEntropyService
 
         void failedNode(InetAddress remote)
         {
-            String errorMsg = String.format("Problem during repair session %s, 
endpoint %s died", sessionName, remote);
-            logger.error(errorMsg);
+            String errorMsg = String.format("Endpoint %s died", remote);
             exception = new IOException(errorMsg);
             // If a node failed, we stop everything (though there could still 
be some activity in the background)
             jobs.clear();
@@ -749,9 +761,13 @@ public class AntiEntropyService
         class RepairJob
         {
             private final String cfname;
-            private final Set<InetAddress> requestedEndpoints = new 
HashSet<InetAddress>();
-            private final Map<InetAddress, MerkleTree> trees = new 
HashMap<InetAddress, MerkleTree>();
-            private final Set<InetAddress> syncJobs = new 
HashSet<InetAddress>();
+            // first we send tree requests.  this tracks the endpoints 
remaining to hear from
+            private final Set<InetAddress> remainingEndpoints = new 
HashSet<InetAddress>();
+            // tree responses are then tracked here
+            private final List<TreeResponse> trees = new 
ArrayList<TreeResponse>(endpoints.size() + 1);
+            // once all responses are received, each tree is compared with 
each other, and differencer tasks
+            // are submitted.  the job is done when all differencers are 
complete.
+            private final Set<Differencer> remainingDifferencers = new 
HashSet<Differencer>();
 
             public RepairJob(String cfname)
             {
@@ -763,24 +779,28 @@ public class AntiEntropyService
              */
             public void sendTreeRequests()
             {
-                requestedEndpoints.addAll(endpoints);
-                requestedEndpoints.add(FBUtilities.getBroadcastAddress());
+                remainingEndpoints.addAll(endpoints);
+                remainingEndpoints.add(FBUtilities.getBroadcastAddress());
 
                 // send requests to all nodes
-                for (InetAddress endpoint : requestedEndpoints)
+                for (InetAddress endpoint : remainingEndpoints)
                     AntiEntropyService.instance.request(getName(), endpoint, 
range, tablename, cfname);
+
+                logger.info(String.format("[repair #%s] requests for merkle 
tree sent for %s (to %s)", getName(), cfname, remainingEndpoints));
             }
 
             /**
              * Add a new received tree and return the number of remaining tree 
to
              * be received for the job to be complete.
+             *
+             * Callers may assume exactly one addTree call will result in zero 
remaining endpoints.
              */
             public synchronized int addTree(TreeRequest request, MerkleTree 
tree)
             {
                 assert request.cf.right.equals(cfname);
-                trees.put(request.endpoint, tree);
-                requestedEndpoints.remove(request.endpoint);
-                return requestedEndpoints.size();
+                trees.add(new TreeResponse(request.endpoint, tree));
+                remainingEndpoints.remove(request.endpoint);
+                return remainingEndpoints.size();
             }
 
             /**
@@ -789,30 +809,31 @@ public class AntiEntropyService
              */
             public void submitDifferencers()
             {
-                assert requestedEndpoints.size() == 0;
+                assert remainingEndpoints.isEmpty();
 
-                // Right now, we only difference local host against each 
other. CASSANDRA-2610 will fix that.
-                // In the meantime ugly special casing will work good enough.
-                MerkleTree localTree =
-                trees.get(FBUtilities.getBroadcastAddress());
-                assert localTree != null;
-                for (Map.Entry<InetAddress, MerkleTree> entry : 
trees.entrySet())
+                // We need to difference all trees one against another
+                for (int i = 0; i < trees.size() - 1; ++i)
                 {
-                    if 
(entry.getKey().equals(FBUtilities.getBroadcastAddress()))
-                        continue;
-
-                    Differencer differencer = new Differencer(cfname, 
entry.getKey(), entry.getValue(), localTree);
-                    syncJobs.add(entry.getKey());
-                    logger.debug("Queueing comparison " + differencer);
-                    
StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+                    TreeResponse r1 = trees.get(i);
+                    for (int j = i + 1; j < trees.size(); ++j)
+                    {
+                        TreeResponse r2 = trees.get(j);
+                        Differencer differencer = new Differencer(cfname, r1, 
r2);
+                        logger.debug("Queueing comparison {}", differencer);
+                        remainingDifferencers.add(differencer);
+                        
StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+                    }
                 }
                 trees.clear(); // allows gc to do its thing
             }
 
-            synchronized boolean completedSynchronizationJob(InetAddress 
remote)
+            /**
+             * @return true if the @param differencer was the last remaining
+             */
+            synchronized boolean completedSynchronization(Differencer 
differencer)
             {
-                syncJobs.remove(remote);
-                return syncJobs.isEmpty();
+                remainingDifferencers.remove(differencer);
+                return remainingDifferencers.isEmpty();
             }
         }
 
@@ -822,17 +843,15 @@ public class AntiEntropyService
         class Differencer implements Runnable
         {
             public final String cfname;
-            public final InetAddress remote;
-            public final MerkleTree ltree;
-            public final MerkleTree rtree;
+            public final TreeResponse r1;
+            public final TreeResponse r2;
             public List<Range> differences;
 
-            Differencer(String cfname, InetAddress remote, MerkleTree ltree, 
MerkleTree rtree)
+            Differencer(String cfname, TreeResponse r1, TreeResponse r2)
             {
                 this.cfname = cfname;
-                this.remote = remote;
-                this.ltree = ltree;
-                this.rtree = rtree;
+                this.r1 = r1;
+                this.r2 = r2;
                 this.differences = new ArrayList<Range>();
             }
 
@@ -841,88 +860,65 @@ public class AntiEntropyService
              */
             public void run()
             {
-                InetAddress local = FBUtilities.getBroadcastAddress();
-
                 // restore partitioners (in case we were serialized)
-                if (ltree.partitioner() == null)
-                    ltree.partitioner(StorageService.getPartitioner());
-                if (rtree.partitioner() == null)
-                    rtree.partitioner(StorageService.getPartitioner());
+                if (r1.tree.partitioner() == null)
+                    r1.tree.partitioner(StorageService.getPartitioner());
+                if (r2.tree.partitioner() == null)
+                    r2.tree.partitioner(StorageService.getPartitioner());
 
                 // compare trees, and collect differences
-                differences.addAll(MerkleTree.difference(ltree, rtree));
+                differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
 
                 // choose a repair method based on the significance of the 
difference
-                String format = "Endpoints " + local + " and " + remote + " %s 
for " + cfname + " on " + range;
+                String format = String.format("[repair #%s] Endpoints %s and 
%s %%s for %s", getName(), r1.endpoint, r2.endpoint, cfname);
                 if (differences.isEmpty())
                 {
                     logger.info(String.format(format, "are consistent"));
-                    completed(remote, cfname);
+                    completed(this);
                     return;
                 }
 
                 // non-0 difference: perform streaming repair
                 logger.info(String.format(format, "have " + differences.size() 
+ " range(s) out of sync"));
-                try
-                {
-                    performStreamingRepair();
-                }
-                catch(IOException e)
-                {
-                    throw new RuntimeException(e);
-                }
+                performStreamingRepair();
             }
 
             /**
              * Starts sending/receiving our list of differences to/from the 
remote endpoint: creates a callback
              * that will be called out of band once the streams complete.
              */
-            void performStreamingRepair() throws IOException
+            void performStreamingRepair()
             {
-                logger.info("Performing streaming repair of " + 
differences.size() + " ranges with " + remote + " for " + range);
-                ColumnFamilyStore cfstore = 
Table.open(tablename).getColumnFamilyStore(cfname);
-                try
-                {
-                    // We acquire references for transferSSTables
-                    Collection<SSTableReader> sstables = 
cfstore.markCurrentSSTablesReferenced();
-                    Callback callback = new Callback();
-                    // send ranges to the remote node
-                    StreamOutSession outsession = 
StreamOutSession.create(tablename, remote, callback);
-                    StreamOut.transferSSTables(outsession, sstables, 
differences, OperationType.AES);
-                    // request ranges from the remote node
-                    StreamIn.requestRanges(remote, tablename, differences, 
callback, OperationType.AES);
-                }
-                catch(Exception e)
+                Runnable callback = new Runnable()
                 {
-                    throw new IOException("Streaming repair failed.", e);
-                }
+                    public void run()
+                    {
+                        completed(Differencer.this);
+                    }
+                };
+                StreamingRepairTask task = 
StreamingRepairTask.create(r1.endpoint, r2.endpoint, tablename, cfname, 
differences, callback);
+
+                // Pre 1.0, nodes don't know how to handle forwarded streaming 
task so don't bother
+                if (task.isLocalTask() || 
Gossiper.instance.getVersion(task.dst) >= MessagingService.VERSION_10)
+                    task.run();
             }
 
             public String toString()
             {
-                return "#<Differencer " + remote + "/" + range + ">";
+                return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint + 
"/" + range + ">";
             }
+        }
+    }
 
-            /**
-             * When a repair is necessary, this callback is created to wait 
for the inbound
-             * and outbound streams to complete.
-             */
-            class Callback extends WrappedRunnable
-            {
-                // we expect one callback for the receive, and one for the send
-                private final AtomicInteger outstanding = new AtomicInteger(2);
+    static class TreeResponse
+    {
+        public final InetAddress endpoint;
+        public final MerkleTree tree;
 
-                protected void runMayThrow() throws Exception
-                {
-                    if (outstanding.decrementAndGet() > 0)
-                        // waiting on more calls
-                        return;
-
-                    // all calls finished successfully
-                    completed(remote, cfname);
-                    logger.info(String.format("Finished streaming repair with 
%s for %s", remote, range));
-                }
-            }
+        TreeResponse(InetAddress endpoint, MerkleTree tree)
+        {
+            this.endpoint = endpoint;
+            this.tree = tree;
         }
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Fri Sep  2 10:24:21 2011
@@ -107,6 +107,8 @@ public class StorageService implements I
         REPLICATION_FINISHED,
         INTERNAL_RESPONSE, // responses to internal calls
         COUNTER_MUTATION,
+        STREAMING_REPAIR_REQUEST,
+        STREAMING_REPAIR_RESPONSE,
         // use as padding for backwards compatability where a previous version 
needs to validate a verb from the future.
         UNUSED_1,
         UNUSED_2,
@@ -129,6 +131,8 @@ public class StorageService implements I
         put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
         put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
         put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
+        put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY);
+        put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY);
         put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
@@ -245,6 +249,8 @@ public class StorageService implements I
         
MessagingService.instance().registerVerbHandlers(Verb.INTERNAL_RESPONSE, new 
ResponseVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.TREE_REQUEST, 
new TreeRequestVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.TREE_RESPONSE, 
new AntiEntropyService.TreeResponseVerbHandler());
+        
MessagingService.instance().registerVerbHandlers(Verb.STREAMING_REPAIR_REQUEST, 
new StreamingRepairTask.StreamingRepairRequest());
+        
MessagingService.instance().registerVerbHandlers(Verb.STREAMING_REPAIR_RESPONSE,
 new StreamingRepairTask.StreamingRepairResponse());
 
         
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new 
GossipDigestSynVerbHandler());
         
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new 
GossipDigestAckVerbHandler());

Added: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java?rev=1164463&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
 Fri Sep  2 10:24:21 2011
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * Task that make two nodes exchange (stream) some ranges (for a given 
table/cf).
+ * This handle the case where the local node is neither of the two nodes that
+ * must stream their range, and allow to register a callback to be called on
+ * completion.
+ */
+public class StreamingRepairTask implements Runnable
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamingRepairTask.class);
+
+    // maps of tasks created on this node
+    private static final ConcurrentMap<UUID, StreamingRepairTask> tasks = new 
ConcurrentHashMap<UUID, StreamingRepairTask>();
+    private static final StreamingRepairTaskSerializer serializer = new 
StreamingRepairTaskSerializer();
+
+    public final UUID id;
+    private final InetAddress owner; // the node where the task is created; 
can be == src but don't need to
+    public final InetAddress src;
+    public final InetAddress dst;
+
+    private final String tableName;
+    private final String cfName;
+    private final Collection<Range> ranges;
+    private final Runnable callback;
+
+    private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, 
InetAddress dst, String tableName, String cfName, Collection<Range> ranges, 
Runnable callback)
+    {
+        this.id = id;
+        this.owner = owner;
+        this.src = src;
+        this.dst = dst;
+        this.tableName = tableName;
+        this.cfName = cfName;
+        this.ranges = ranges;
+        this.callback = callback;
+    }
+
+    public static StreamingRepairTask create(InetAddress ep1, InetAddress ep2, 
String tableName, String cfName, Collection<Range> ranges, Runnable callback)
+    {
+        InetAddress local = FBUtilities.getLocalAddress();
+        UUID id = UUIDGen.makeType1UUIDFromHost(local);
+        // We can take anyone of the node as source or destination, however if 
one is localhost, we put at source to avoid a forwarding
+        InetAddress src = ep2.equals(local) ? ep2 : ep1;
+        InetAddress dst = ep2.equals(local) ? ep1 : ep2;
+        StreamingRepairTask task = new StreamingRepairTask(id, local, src, 
dst, tableName, cfName, ranges, wrapCallback(callback, id, local.equals(src)));
+        tasks.put(id, task);
+        return task;
+    }
+
+    /**
+     * Returns true if the task if the task can be executed locally, false if
+     * it has to be forwarded.
+     */
+    public boolean isLocalTask()
+    {
+        return owner.equals(src);
+    }
+
+    public void run()
+    {
+        if (src.equals(FBUtilities.getLocalAddress()))
+        {
+            initiateStreaming();
+        }
+        else
+        {
+            forwardToSource();
+        }
+    }
+
+    private void initiateStreaming()
+    {
+        ColumnFamilyStore cfstore = 
Table.open(tableName).getColumnFamilyStore(cfName);
+        try
+        {
+            logger.info(String.format("[streaming task #%s] Performing 
streaming repair of %d ranges with %s", id, ranges.size(), dst));
+            Collection<SSTableReader> sstables = cfstore.getSSTables();
+            // send ranges to the remote node
+            StreamOutSession outsession = StreamOutSession.create(tableName, 
dst, callback);
+            StreamOut.transferSSTables(outsession, sstables, ranges, 
OperationType.AES);
+            // request ranges from the remote node
+            StreamIn.requestRanges(dst, tableName, ranges, callback, 
OperationType.AES);
+        }
+        catch(Exception e)
+        {
+            throw new RuntimeException("Streaming repair failed", e);
+        }
+    }
+
+    private void forwardToSource()
+    {
+        try
+        {
+            logger.info(String.format("[streaming task #%s] Forwarding 
streaming repair of %d ranges to %s (to be streamed with %s)", id, 
ranges.size(), src, dst));
+            StreamingRepairRequest.send(this);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Error forwarding streaming task to " + 
src, e);
+        }
+    }
+
+    private static Runnable makeReplyingCallback(final InetAddress taskOwner, 
final UUID taskId)
+    {
+        return new Runnable()
+        {
+            // we expect one callback for the receive, and one for the send
+            private final AtomicInteger outstanding = new AtomicInteger(2);
+
+            public void run()
+            {
+                if (outstanding.decrementAndGet() > 0)
+                    // waiting on more calls
+                    return;
+
+                try
+                {
+                    StreamingRepairResponse.reply(taskOwner, taskId);
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
+            }
+        };
+    }
+
+    // wrap a given callback so as to unregister the streaming repair task on 
completion
+    private static Runnable wrapCallback(final Runnable callback, final UUID 
taskid, final boolean isLocalTask)
+    {
+        return new Runnable()
+        {
+            // we expect one callback for the receive, and one for the send
+            private final AtomicInteger outstanding = new 
AtomicInteger(isLocalTask ? 2 : 1);
+
+            public void run()
+            {
+                if (outstanding.decrementAndGet() > 0)
+                    // waiting on more calls
+                    return;
+
+                tasks.remove(taskid);
+                if (callback != null)
+                    callback.run();
+            }
+        };
+    }
+
+    public static class StreamingRepairRequest implements IVerbHandler
+    {
+        public void doVerb(Message message, String id)
+        {
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(bytes));
+
+            StreamingRepairTask task;
+            try
+            {
+                task = StreamingRepairTask.serializer.deserialize(dis, 
message.getVersion());
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+
+            assert task.src.equals(FBUtilities.getLocalAddress());
+            assert task.owner.equals(message.getFrom());
+
+            logger.info(String.format("[streaming task #%s] Received task from 
%s to stream %d ranges to %s", task.id, message.getFrom(), task.ranges.size(), 
task.dst));
+
+            task.run();
+        }
+
+        private static void send(StreamingRepairTask task) throws IOException
+        {
+            int version = Gossiper.instance.getVersion(task.src);
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+            StreamingRepairTask.serializer.serialize(task, dos, version);
+            Message msg = new Message(FBUtilities.getLocalAddress(), 
StorageService.Verb.STREAMING_REPAIR_REQUEST, bos.toByteArray(), version);
+            MessagingService.instance().sendOneWay(msg, task.src);
+        }
+    }
+
+    public static class StreamingRepairResponse implements IVerbHandler
+    {
+        public void doVerb(Message message, String id)
+        {
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(bytes));
+
+            UUID taskid;
+            try
+            {
+                 taskid = UUIDGen.read(dis);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(new IOException("Error reading stream repair 
response from " + message.getFrom(), e));
+            }
+
+            StreamingRepairTask task = tasks.get(taskid);
+            if (task == null)
+            {
+                logger.error(String.format("Received a stream repair response 
from %s for unknow taks %s (have this node been restarted recently?)", 
message.getFrom(), taskid));
+                return;
+            }
+
+            assert task.owner.equals(FBUtilities.getLocalAddress());
+
+            logger.info(String.format("[streaming task #%s] task succeeded", 
task.id));
+            if (task.callback != null)
+                task.callback.run();
+        }
+
+        private static void reply(InetAddress remote, UUID taskid) throws 
IOException
+        {
+            logger.info(String.format("[streaming task #%s] task suceed, 
forwarding response to %s", taskid, remote));
+            int version = Gossiper.instance.getVersion(remote);
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+            UUIDGen.write(taskid, dos);
+            Message msg = new Message(FBUtilities.getLocalAddress(), 
StorageService.Verb.STREAMING_REPAIR_RESPONSE, bos.toByteArray(), version);
+            MessagingService.instance().sendOneWay(msg, remote);
+        }
+    }
+
+    private static class StreamingRepairTaskSerializer implements 
ICompactSerializer<StreamingRepairTask>
+    {
+        public void serialize(StreamingRepairTask task, DataOutputStream dos, 
int version) throws IOException
+        {
+            UUIDGen.write(task.id, dos);
+            CompactEndpointSerializationHelper.serialize(task.owner, dos);
+            CompactEndpointSerializationHelper.serialize(task.src, dos);
+            CompactEndpointSerializationHelper.serialize(task.dst, dos);
+            dos.writeUTF(task.tableName);
+            dos.writeUTF(task.cfName);
+            dos.writeInt(task.ranges.size());
+            for (Range range : task.ranges)
+            {
+                AbstractBounds.serializer().serialize(range, dos);
+            }
+            // We don't serialize the callback on purpose
+        }
+
+        public StreamingRepairTask deserialize(DataInputStream dis, int 
version) throws IOException
+        {
+            UUID id = UUIDGen.read(dis);
+            InetAddress owner = 
CompactEndpointSerializationHelper.deserialize(dis);
+            InetAddress src = 
CompactEndpointSerializationHelper.deserialize(dis);
+            InetAddress dst = 
CompactEndpointSerializationHelper.deserialize(dis);
+            String tableName = dis.readUTF(dis);
+            String cfName = dis.readUTF(dis);
+            int rangesCount = dis.readInt();
+            List<Range> ranges = new ArrayList<Range>(rangesCount);
+            for (int i = 0; i < rangesCount; ++i)
+            {
+                ranges.add((Range) 
AbstractBounds.serializer().deserialize(dis));
+            }
+            return new StreamingRepairTask(id, owner, src, dst, tableName, 
cfName, ranges, makeReplyingCallback(owner, id));
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java Fri Sep  2 
10:24:21 2011
@@ -20,7 +20,9 @@ package org.apache.cassandra.utils;
  * 
  */
 
-
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
@@ -68,6 +70,19 @@ public class UUIDGen
         return new UUID(raw.getLong(raw.position()), 
raw.getLong(raw.position() + 8));
     }
 
+    /** reads a uuid from an input stream. */
+    public static UUID read(DataInputStream dis) throws IOException
+    {
+        return new UUID(dis.readLong(), dis.readLong());
+    }
+
+    /** writes a uuid to an output stream. */
+    public static void write(UUID uuid, DataOutputStream dos) throws 
IOException
+    {
+        dos.writeLong(uuid.getMostSignificantBits());
+        dos.writeLong(uuid.getLeastSignificantBits());
+    }
+
     /** decomposes a uuid into raw bytes. */
     public static byte[] decompose(UUID uuid)
     {

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java 
Fri Sep  2 10:24:21 2011
@@ -73,6 +73,7 @@ public class CompactSerializerTest exten
         expectedClassNames.add("InnerSerializer");
         expectedClassNames.add("LeafSerializer");
         expectedClassNames.add("MerkleTreeSerializer");
+        expectedClassNames.add("StreamingRepairTaskSerializer");
         
         discoveredClassNames = new ArrayList<String>();
         String cp = System.getProperty("java.class.path");

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
 Fri Sep  2 10:24:21 2011
@@ -103,8 +103,10 @@ public abstract class AntiEntropyService
 
         local_range = StorageService.instance.getLocalPrimaryRange();
 
-        // random session id for each test
-        request = new TreeRequest(UUID.randomUUID().toString(), LOCAL, 
local_range, new CFPair(tablename, cfname));
+        // (we use REMOTE instead of LOCAL so that the reponses for the 
validator.complete() get lost)
+        request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, 
local_range, new CFPair(tablename, cfname));
+        // Set a fake session corresponding to this fake request
+        AntiEntropyService.instance.submitArtificialRepairSession(request, 
tablename, cfname);
     }
 
     @After
@@ -159,26 +161,6 @@ public abstract class AntiEntropyService
     }
 
     @Test
-    public void testManualRepair() throws Throwable
-    {
-        RepairFuture sess = 
AntiEntropyService.instance.submitRepairSession(local_range, tablename, cfname);
-
-        // ensure that the session doesn't end without a response from REMOTE
-        try
-        {
-            sess.get(500, TimeUnit.MILLISECONDS);
-            fail("Repair session should not end without response from REMOTE");
-        }
-        catch (TimeoutException e) {}
-
-        // deliver a fake response from REMOTE
-        sess.session.completed(REMOTE, request.cf.right);
-
-        // block until the repair has completed
-        sess.get();
-    }
-
-    @Test
     public void testGetNeighborsPlusOne() throws Throwable
     {
         // generate rf+1 nodes, and ensure that all nodes are returned
@@ -244,7 +226,10 @@ public abstract class AntiEntropyService
         interesting.add(changed);
 
         // difference the trees
-        AntiEntropyService.RepairSession.Differencer diff = sess.session.new 
Differencer(cfname, request.endpoint, ltree, rtree);
+        // note: we reuse the same endpoint which is bogus in theory but fine 
here
+        AntiEntropyService.TreeResponse r1 = new 
AntiEntropyService.TreeResponse(REMOTE, ltree);
+        AntiEntropyService.TreeResponse r2 = new 
AntiEntropyService.TreeResponse(REMOTE, rtree);
+        AntiEntropyService.RepairSession.Differencer diff = sess.session.new 
Differencer(cfname, r1, r2);
         diff.run();
         
         // ensure that the changed range was recorded


Reply via email to