Author: slebresne
Date: Fri Sep 2 10:24:21 2011
New Revision: 1164463
URL: http://svn.apache.org/viewvc?rev=1164463&view=rev
Log:
Make repair of a range sync all replica pairs for this range
patch by slebresne; reviewed by jbellis for CASSANDRA-2610
Added:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java
cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Sep 2 10:24:21 2011
@@ -55,6 +55,7 @@
to be down before starting (CASSANDRA-2034)
* Make the compression algorithm and chunk length configurable
(CASSANDRA-3001)
* Add throttling for internode streaming (CASSANDRA-3080)
+ * make the repair of a range repair all replica (CASSANDRA-2610)
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri
Sep 2 10:24:21 2011
@@ -63,9 +63,11 @@ public final class MessagingService impl
{
public static final String MBEAN_NAME =
"org.apache.cassandra.net:type=MessagingService";
+ // 8 bits version, so don't waste versions
public static final int VERSION_07 = 1;
public static final int VERSION_080 = 2;
- public static final int version_ = 3; // 8 bits, so don't waste versions
+ public static final int VERSION_10 = 3;
+ public static final int version_ = VERSION_10;
static SerializerType serializerType_ = SerializerType.BINARY;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Fri Sep 2 10:24:21 2011
@@ -24,7 +24,6 @@ import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import com.google.common.base.Objects;
@@ -44,17 +43,13 @@ import org.apache.cassandra.dht.RandomPa
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.streaming.StreamIn;
-import org.apache.cassandra.streaming.StreamOut;
-import org.apache.cassandra.streaming.StreamOutSession;
+import org.apache.cassandra.streaming.*;
import org.apache.cassandra.utils.*;
/**
@@ -130,6 +125,8 @@ public class AntiEntropyService
return futureTask;
}
+ // for testing only. Create a session corresponding to a fake request and
+ // add it to the sessions (avoid NPE in tests)
RepairFuture submitArtificialRepairSession(TreeRequest req, String
tablename, String... cfnames)
{
RepairFuture futureTask = new RepairSession(req, tablename,
cfnames).getFuture();
@@ -171,6 +168,8 @@ public class AntiEntropyService
RepairSession session = sessions.get(request.sessionid);
assert session != null;
+ logger.info(String.format("[repair #%s] Received merkle tree for %s
from %s", session.getName(), request.cf.right, request.endpoint));
+
RepairSession.RepairJob job = session.jobs.peek();
assert job != null : "A repair should have at least some jobs
scheduled";
@@ -214,12 +213,13 @@ public class AntiEntropyService
try
{
Message message = TreeResponseVerbHandler.makeVerb(local,
validator);
- logger.info("Sending AEService tree for " + validator.request);
+ if
(!validator.request.endpoint.equals(FBUtilities.getBroadcastAddress()))
+ logger.info(String.format("[repair #%s] Sending completed
merkle tree to %s for %s", validator.request.sessionid,
validator.request.endpoint, validator.request.cf));
ms.sendOneWay(message, validator.request.endpoint);
}
catch (Exception e)
{
- logger.error("Could not send valid tree for request " +
validator.request, e);
+ logger.error(String.format("[repair #%s] Error sending completed
merkle tree to %s for %s ", validator.request.sessionid,
validator.request.endpoint, validator.request.cf), e);
}
}
@@ -534,17 +534,6 @@ public class AntiEntropyService
}
/**
- * A tuple of a local and remote tree.
- */
- static class TreePair extends Pair<MerkleTree,MerkleTree>
- {
- public TreePair(MerkleTree local, MerkleTree remote)
- {
- super(local, remote);
- }
- }
-
- /**
* A tuple of table, cf, address and range that represents a location we
have an outstanding TreeRequest for.
*/
public static class TreeRequest
@@ -613,7 +602,7 @@ public class AntiEntropyService
public RepairSession(Range range, String tablename, String... cfnames)
{
- this("manual-repair-" + UUID.randomUUID(), range, tablename,
cfnames);
+
this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(),
range, tablename, cfnames);
}
private RepairSession(String id, Range range, String tablename,
String[] cfnames)
@@ -636,13 +625,24 @@ public class AntiEntropyService
return new RepairFuture(this);
}
+ private String repairedNodes()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(FBUtilities.getBroadcastAddress());
+ for (InetAddress ep : endpoints)
+ sb.append(", ").append(ep);
+ return sb.toString();
+ }
+
// we don't care about the return value but care about it throwing
exception
public void runMayThrow() throws Exception
{
+ logger.info(String.format("[repair #%s] new session: will sync %s
on range %s for %s.%s", getName(), repairedNodes(), range, tablename,
Arrays.toString(cfnames)));
+
if (endpoints.isEmpty())
{
differencingDone.signalAll();
- logger.info("No neighbors to repair with for " + tablename + "
on " + range + ": " + getName() + " completed.");
+ logger.info("[repair #%s] No neighbors to repair with on range
%s: session completed", getName(), range);
return;
}
@@ -652,7 +652,7 @@ public class AntiEntropyService
if (!FailureDetector.instance.isAlive(endpoint))
{
differencingDone.signalAll();
- logger.info("Could not proceed on repair because a
neighbor (" + endpoint + ") is dead: " + getName() + " failed.");
+ logger.info("[repair #%s] Could not proceed on repair
because a neighbor (%s) is dead: session failed", getName(), endpoint);
return;
}
}
@@ -675,8 +675,15 @@ public class AntiEntropyService
// block whatever thread started this session until all
requests have been returned:
// if this thread dies, the session will still complete in the
background
completed.await();
- if (exception != null)
+ if (exception == null)
+ {
+ logger.info(String.format("[repair #%s] session completed
successfully", getName()));
+ }
+ else
+ {
+ logger.error(String.format("[repair #%s] session completed
with the following error", getName()), exception);
throw exception;
+ }
}
catch (InterruptedException e)
{
@@ -690,13 +697,19 @@ public class AntiEntropyService
}
}
- void completed(InetAddress remote, String cfname)
+ void completed(Differencer differencer)
{
- logger.debug("Repair completed for {} on {}", remote, cfname);
- RepairJob job = activeJobs.get(cfname);
- if (job.completedSynchronizationJob(remote))
- {
- activeJobs.remove(cfname);
+ logger.debug(String.format("[repair #%s] Repair completed between
%s and %s on %s",
+ getName(),
+ differencer.r1.endpoint,
+ differencer.r2.endpoint,
+ differencer.cfname));
+ RepairJob job = activeJobs.get(differencer.cfname);
+ if (job.completedSynchronization(differencer))
+ {
+ activeJobs.remove(differencer.cfname);
+ String remaining = activeJobs.size() == 0 ? "" :
String.format(" (%d remaining column family to sync for this session)",
activeJobs.size());
+ logger.info(String.format("[repair #%s] %s is fully synced%s",
getName(), differencer.cfname, remaining));
if (activeJobs.isEmpty())
completed.signalAll();
}
@@ -704,8 +717,7 @@ public class AntiEntropyService
void failedNode(InetAddress remote)
{
- String errorMsg = String.format("Problem during repair session %s,
endpoint %s died", sessionName, remote);
- logger.error(errorMsg);
+ String errorMsg = String.format("Endpoint %s died", remote);
exception = new IOException(errorMsg);
// If a node failed, we stop everything (though there could still
be some activity in the background)
jobs.clear();
@@ -749,9 +761,13 @@ public class AntiEntropyService
class RepairJob
{
private final String cfname;
- private final Set<InetAddress> requestedEndpoints = new
HashSet<InetAddress>();
- private final Map<InetAddress, MerkleTree> trees = new
HashMap<InetAddress, MerkleTree>();
- private final Set<InetAddress> syncJobs = new
HashSet<InetAddress>();
+ // first we send tree requests. this tracks the endpoints
remaining to hear from
+ private final Set<InetAddress> remainingEndpoints = new
HashSet<InetAddress>();
+ // tree responses are then tracked here
+ private final List<TreeResponse> trees = new
ArrayList<TreeResponse>(endpoints.size() + 1);
+ // once all responses are received, each tree is compared with
each other, and differencer tasks
+ // are submitted. the job is done when all differencers are
complete.
+ private final Set<Differencer> remainingDifferencers = new
HashSet<Differencer>();
public RepairJob(String cfname)
{
@@ -763,24 +779,28 @@ public class AntiEntropyService
*/
public void sendTreeRequests()
{
- requestedEndpoints.addAll(endpoints);
- requestedEndpoints.add(FBUtilities.getBroadcastAddress());
+ remainingEndpoints.addAll(endpoints);
+ remainingEndpoints.add(FBUtilities.getBroadcastAddress());
// send requests to all nodes
- for (InetAddress endpoint : requestedEndpoints)
+ for (InetAddress endpoint : remainingEndpoints)
AntiEntropyService.instance.request(getName(), endpoint,
range, tablename, cfname);
+
+ logger.info(String.format("[repair #%s] requests for merkle
tree sent for %s (to %s)", getName(), cfname, remainingEndpoints));
}
/**
* Add a new received tree and return the number of remaining tree
to
* be received for the job to be complete.
+ *
+ * Callers may assume exactly one addTree call will result in zero
remaining endpoints.
*/
public synchronized int addTree(TreeRequest request, MerkleTree
tree)
{
assert request.cf.right.equals(cfname);
- trees.put(request.endpoint, tree);
- requestedEndpoints.remove(request.endpoint);
- return requestedEndpoints.size();
+ trees.add(new TreeResponse(request.endpoint, tree));
+ remainingEndpoints.remove(request.endpoint);
+ return remainingEndpoints.size();
}
/**
@@ -789,30 +809,31 @@ public class AntiEntropyService
*/
public void submitDifferencers()
{
- assert requestedEndpoints.size() == 0;
+ assert remainingEndpoints.isEmpty();
- // Right now, we only difference local host against each
other. CASSANDRA-2610 will fix that.
- // In the meantime ugly special casing will work good enough.
- MerkleTree localTree =
- trees.get(FBUtilities.getBroadcastAddress());
- assert localTree != null;
- for (Map.Entry<InetAddress, MerkleTree> entry :
trees.entrySet())
+ // We need to difference all trees one against another
+ for (int i = 0; i < trees.size() - 1; ++i)
{
- if
(entry.getKey().equals(FBUtilities.getBroadcastAddress()))
- continue;
-
- Differencer differencer = new Differencer(cfname,
entry.getKey(), entry.getValue(), localTree);
- syncJobs.add(entry.getKey());
- logger.debug("Queueing comparison " + differencer);
-
StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+ TreeResponse r1 = trees.get(i);
+ for (int j = i + 1; j < trees.size(); ++j)
+ {
+ TreeResponse r2 = trees.get(j);
+ Differencer differencer = new Differencer(cfname, r1,
r2);
+ logger.debug("Queueing comparison {}", differencer);
+ remainingDifferencers.add(differencer);
+
StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+ }
}
trees.clear(); // allows gc to do its thing
}
- synchronized boolean completedSynchronizationJob(InetAddress
remote)
+ /**
+ * @return true if the @param differencer was the last remaining
+ */
+ synchronized boolean completedSynchronization(Differencer
differencer)
{
- syncJobs.remove(remote);
- return syncJobs.isEmpty();
+ remainingDifferencers.remove(differencer);
+ return remainingDifferencers.isEmpty();
}
}
@@ -822,17 +843,15 @@ public class AntiEntropyService
class Differencer implements Runnable
{
public final String cfname;
- public final InetAddress remote;
- public final MerkleTree ltree;
- public final MerkleTree rtree;
+ public final TreeResponse r1;
+ public final TreeResponse r2;
public List<Range> differences;
- Differencer(String cfname, InetAddress remote, MerkleTree ltree,
MerkleTree rtree)
+ Differencer(String cfname, TreeResponse r1, TreeResponse r2)
{
this.cfname = cfname;
- this.remote = remote;
- this.ltree = ltree;
- this.rtree = rtree;
+ this.r1 = r1;
+ this.r2 = r2;
this.differences = new ArrayList<Range>();
}
@@ -841,88 +860,65 @@ public class AntiEntropyService
*/
public void run()
{
- InetAddress local = FBUtilities.getBroadcastAddress();
-
// restore partitioners (in case we were serialized)
- if (ltree.partitioner() == null)
- ltree.partitioner(StorageService.getPartitioner());
- if (rtree.partitioner() == null)
- rtree.partitioner(StorageService.getPartitioner());
+ if (r1.tree.partitioner() == null)
+ r1.tree.partitioner(StorageService.getPartitioner());
+ if (r2.tree.partitioner() == null)
+ r2.tree.partitioner(StorageService.getPartitioner());
// compare trees, and collect differences
- differences.addAll(MerkleTree.difference(ltree, rtree));
+ differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
// choose a repair method based on the significance of the
difference
- String format = "Endpoints " + local + " and " + remote + " %s
for " + cfname + " on " + range;
+ String format = String.format("[repair #%s] Endpoints %s and
%s %%s for %s", getName(), r1.endpoint, r2.endpoint, cfname);
if (differences.isEmpty())
{
logger.info(String.format(format, "are consistent"));
- completed(remote, cfname);
+ completed(this);
return;
}
// non-0 difference: perform streaming repair
logger.info(String.format(format, "have " + differences.size()
+ " range(s) out of sync"));
- try
- {
- performStreamingRepair();
- }
- catch(IOException e)
- {
- throw new RuntimeException(e);
- }
+ performStreamingRepair();
}
/**
* Starts sending/receiving our list of differences to/from the
remote endpoint: creates a callback
* that will be called out of band once the streams complete.
*/
- void performStreamingRepair() throws IOException
+ void performStreamingRepair()
{
- logger.info("Performing streaming repair of " +
differences.size() + " ranges with " + remote + " for " + range);
- ColumnFamilyStore cfstore =
Table.open(tablename).getColumnFamilyStore(cfname);
- try
- {
- // We acquire references for transferSSTables
- Collection<SSTableReader> sstables =
cfstore.markCurrentSSTablesReferenced();
- Callback callback = new Callback();
- // send ranges to the remote node
- StreamOutSession outsession =
StreamOutSession.create(tablename, remote, callback);
- StreamOut.transferSSTables(outsession, sstables,
differences, OperationType.AES);
- // request ranges from the remote node
- StreamIn.requestRanges(remote, tablename, differences,
callback, OperationType.AES);
- }
- catch(Exception e)
+ Runnable callback = new Runnable()
{
- throw new IOException("Streaming repair failed.", e);
- }
+ public void run()
+ {
+ completed(Differencer.this);
+ }
+ };
+ StreamingRepairTask task =
StreamingRepairTask.create(r1.endpoint, r2.endpoint, tablename, cfname,
differences, callback);
+
+ // Pre 1.0, nodes don't know how to handle forwarded streaming
task so don't bother
+ if (task.isLocalTask() ||
Gossiper.instance.getVersion(task.dst) >= MessagingService.VERSION_10)
+ task.run();
}
public String toString()
{
- return "#<Differencer " + remote + "/" + range + ">";
+ return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint +
"/" + range + ">";
}
+ }
+ }
- /**
- * When a repair is necessary, this callback is created to wait
for the inbound
- * and outbound streams to complete.
- */
- class Callback extends WrappedRunnable
- {
- // we expect one callback for the receive, and one for the send
- private final AtomicInteger outstanding = new AtomicInteger(2);
+ static class TreeResponse
+ {
+ public final InetAddress endpoint;
+ public final MerkleTree tree;
- protected void runMayThrow() throws Exception
- {
- if (outstanding.decrementAndGet() > 0)
- // waiting on more calls
- return;
-
- // all calls finished successfully
- completed(remote, cfname);
- logger.info(String.format("Finished streaming repair with
%s for %s", remote, range));
- }
- }
+ TreeResponse(InetAddress endpoint, MerkleTree tree)
+ {
+ this.endpoint = endpoint;
+ this.tree = tree;
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Fri Sep 2 10:24:21 2011
@@ -107,6 +107,8 @@ public class StorageService implements I
REPLICATION_FINISHED,
INTERNAL_RESPONSE, // responses to internal calls
COUNTER_MUTATION,
+ STREAMING_REPAIR_REQUEST,
+ STREAMING_REPAIR_RESPONSE,
// use as padding for backwards compatability where a previous version
needs to validate a verb from the future.
UNUSED_1,
UNUSED_2,
@@ -129,6 +131,8 @@ public class StorageService implements I
put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
+ put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY);
+ put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY);
put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
@@ -245,6 +249,8 @@ public class StorageService implements I
MessagingService.instance().registerVerbHandlers(Verb.INTERNAL_RESPONSE, new
ResponseVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.TREE_REQUEST,
new TreeRequestVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.TREE_RESPONSE,
new AntiEntropyService.TreeResponseVerbHandler());
+
MessagingService.instance().registerVerbHandlers(Verb.STREAMING_REPAIR_REQUEST,
new StreamingRepairTask.StreamingRepairRequest());
+
MessagingService.instance().registerVerbHandlers(Verb.STREAMING_REPAIR_RESPONSE,
new StreamingRepairTask.StreamingRepairResponse());
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new
GossipDigestSynVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new
GossipDigestAckVerbHandler());
Added:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java?rev=1164463&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
Fri Sep 2 10:24:21 2011
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * Task that make two nodes exchange (stream) some ranges (for a given
table/cf).
+ * This handle the case where the local node is neither of the two nodes that
+ * must stream their range, and allow to register a callback to be called on
+ * completion.
+ */
+public class StreamingRepairTask implements Runnable
+{
+ private static final Logger logger =
LoggerFactory.getLogger(StreamingRepairTask.class);
+
+ // maps of tasks created on this node
+ private static final ConcurrentMap<UUID, StreamingRepairTask> tasks = new
ConcurrentHashMap<UUID, StreamingRepairTask>();
+ private static final StreamingRepairTaskSerializer serializer = new
StreamingRepairTaskSerializer();
+
+ public final UUID id;
+ private final InetAddress owner; // the node where the task is created;
can be == src but don't need to
+ public final InetAddress src;
+ public final InetAddress dst;
+
+ private final String tableName;
+ private final String cfName;
+ private final Collection<Range> ranges;
+ private final Runnable callback;
+
+ private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src,
InetAddress dst, String tableName, String cfName, Collection<Range> ranges,
Runnable callback)
+ {
+ this.id = id;
+ this.owner = owner;
+ this.src = src;
+ this.dst = dst;
+ this.tableName = tableName;
+ this.cfName = cfName;
+ this.ranges = ranges;
+ this.callback = callback;
+ }
+
+ public static StreamingRepairTask create(InetAddress ep1, InetAddress ep2,
String tableName, String cfName, Collection<Range> ranges, Runnable callback)
+ {
+ InetAddress local = FBUtilities.getLocalAddress();
+ UUID id = UUIDGen.makeType1UUIDFromHost(local);
+ // We can take anyone of the node as source or destination, however if
one is localhost, we put at source to avoid a forwarding
+ InetAddress src = ep2.equals(local) ? ep2 : ep1;
+ InetAddress dst = ep2.equals(local) ? ep1 : ep2;
+ StreamingRepairTask task = new StreamingRepairTask(id, local, src,
dst, tableName, cfName, ranges, wrapCallback(callback, id, local.equals(src)));
+ tasks.put(id, task);
+ return task;
+ }
+
+ /**
+ * Returns true if the task if the task can be executed locally, false if
+ * it has to be forwarded.
+ */
+ public boolean isLocalTask()
+ {
+ return owner.equals(src);
+ }
+
+ public void run()
+ {
+ if (src.equals(FBUtilities.getLocalAddress()))
+ {
+ initiateStreaming();
+ }
+ else
+ {
+ forwardToSource();
+ }
+ }
+
+ private void initiateStreaming()
+ {
+ ColumnFamilyStore cfstore =
Table.open(tableName).getColumnFamilyStore(cfName);
+ try
+ {
+ logger.info(String.format("[streaming task #%s] Performing
streaming repair of %d ranges with %s", id, ranges.size(), dst));
+ Collection<SSTableReader> sstables = cfstore.getSSTables();
+ // send ranges to the remote node
+ StreamOutSession outsession = StreamOutSession.create(tableName,
dst, callback);
+ StreamOut.transferSSTables(outsession, sstables, ranges,
OperationType.AES);
+ // request ranges from the remote node
+ StreamIn.requestRanges(dst, tableName, ranges, callback,
OperationType.AES);
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException("Streaming repair failed", e);
+ }
+ }
+
+ private void forwardToSource()
+ {
+ try
+ {
+ logger.info(String.format("[streaming task #%s] Forwarding
streaming repair of %d ranges to %s (to be streamed with %s)", id,
ranges.size(), src, dst));
+ StreamingRepairRequest.send(this);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Error forwarding streaming task to " +
src, e);
+ }
+ }
+
+ private static Runnable makeReplyingCallback(final InetAddress taskOwner,
final UUID taskId)
+ {
+ return new Runnable()
+ {
+ // we expect one callback for the receive, and one for the send
+ private final AtomicInteger outstanding = new AtomicInteger(2);
+
+ public void run()
+ {
+ if (outstanding.decrementAndGet() > 0)
+ // waiting on more calls
+ return;
+
+ try
+ {
+ StreamingRepairResponse.reply(taskOwner, taskId);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ };
+ }
+
+ // wrap a given callback so as to unregister the streaming repair task on
completion
+ private static Runnable wrapCallback(final Runnable callback, final UUID
taskid, final boolean isLocalTask)
+ {
+ return new Runnable()
+ {
+ // we expect one callback for the receive, and one for the send
+ private final AtomicInteger outstanding = new
AtomicInteger(isLocalTask ? 2 : 1);
+
+ public void run()
+ {
+ if (outstanding.decrementAndGet() > 0)
+ // waiting on more calls
+ return;
+
+ tasks.remove(taskid);
+ if (callback != null)
+ callback.run();
+ }
+ };
+ }
+
+ public static class StreamingRepairRequest implements IVerbHandler
+ {
+ public void doVerb(Message message, String id)
+ {
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(bytes));
+
+ StreamingRepairTask task;
+ try
+ {
+ task = StreamingRepairTask.serializer.deserialize(dis,
message.getVersion());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+
+ assert task.src.equals(FBUtilities.getLocalAddress());
+ assert task.owner.equals(message.getFrom());
+
+ logger.info(String.format("[streaming task #%s] Received task from
%s to stream %d ranges to %s", task.id, message.getFrom(), task.ranges.size(),
task.dst));
+
+ task.run();
+ }
+
+ private static void send(StreamingRepairTask task) throws IOException
+ {
+ int version = Gossiper.instance.getVersion(task.src);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ StreamingRepairTask.serializer.serialize(task, dos, version);
+ Message msg = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.STREAMING_REPAIR_REQUEST, bos.toByteArray(), version);
+ MessagingService.instance().sendOneWay(msg, task.src);
+ }
+ }
+
+ public static class StreamingRepairResponse implements IVerbHandler
+ {
+ public void doVerb(Message message, String id)
+ {
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(bytes));
+
+ UUID taskid;
+ try
+ {
+ taskid = UUIDGen.read(dis);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(new IOException("Error reading stream repair
response from " + message.getFrom(), e));
+ }
+
+ StreamingRepairTask task = tasks.get(taskid);
+ if (task == null)
+ {
+ logger.error(String.format("Received a stream repair response
from %s for unknow taks %s (have this node been restarted recently?)",
message.getFrom(), taskid));
+ return;
+ }
+
+ assert task.owner.equals(FBUtilities.getLocalAddress());
+
+ logger.info(String.format("[streaming task #%s] task succeeded",
task.id));
+ if (task.callback != null)
+ task.callback.run();
+ }
+
+ private static void reply(InetAddress remote, UUID taskid) throws
IOException
+ {
+ logger.info(String.format("[streaming task #%s] task suceed,
forwarding response to %s", taskid, remote));
+ int version = Gossiper.instance.getVersion(remote);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ UUIDGen.write(taskid, dos);
+ Message msg = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.STREAMING_REPAIR_RESPONSE, bos.toByteArray(), version);
+ MessagingService.instance().sendOneWay(msg, remote);
+ }
+ }
+
+ private static class StreamingRepairTaskSerializer implements
ICompactSerializer<StreamingRepairTask>
+ {
+ public void serialize(StreamingRepairTask task, DataOutputStream dos,
int version) throws IOException
+ {
+ UUIDGen.write(task.id, dos);
+ CompactEndpointSerializationHelper.serialize(task.owner, dos);
+ CompactEndpointSerializationHelper.serialize(task.src, dos);
+ CompactEndpointSerializationHelper.serialize(task.dst, dos);
+ dos.writeUTF(task.tableName);
+ dos.writeUTF(task.cfName);
+ dos.writeInt(task.ranges.size());
+ for (Range range : task.ranges)
+ {
+ AbstractBounds.serializer().serialize(range, dos);
+ }
+ // We don't serialize the callback on purpose
+ }
+
+ public StreamingRepairTask deserialize(DataInputStream dis, int
version) throws IOException
+ {
+ UUID id = UUIDGen.read(dis);
+ InetAddress owner =
CompactEndpointSerializationHelper.deserialize(dis);
+ InetAddress src =
CompactEndpointSerializationHelper.deserialize(dis);
+ InetAddress dst =
CompactEndpointSerializationHelper.deserialize(dis);
+ String tableName = dis.readUTF(dis);
+ String cfName = dis.readUTF(dis);
+ int rangesCount = dis.readInt();
+ List<Range> ranges = new ArrayList<Range>(rangesCount);
+ for (int i = 0; i < rangesCount; ++i)
+ {
+ ranges.add((Range)
AbstractBounds.serializer().deserialize(dis));
+ }
+ return new StreamingRepairTask(id, owner, src, dst, tableName,
cfName, ranges, makeReplyingCallback(owner, id));
+ }
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java Fri Sep 2
10:24:21 2011
@@ -20,7 +20,9 @@ package org.apache.cassandra.utils;
*
*/
-
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@@ -68,6 +70,19 @@ public class UUIDGen
return new UUID(raw.getLong(raw.position()),
raw.getLong(raw.position() + 8));
}
+ /** reads a uuid from an input stream. */
+ public static UUID read(DataInputStream dis) throws IOException
+ {
+ return new UUID(dis.readLong(), dis.readLong());
+ }
+
+ /** writes a uuid to an output stream. */
+ public static void write(UUID uuid, DataOutputStream dos) throws
IOException
+ {
+ dos.writeLong(uuid.getMostSignificantBits());
+ dos.writeLong(uuid.getLeastSignificantBits());
+ }
+
/** decomposes a uuid into raw bytes. */
public static byte[] decompose(UUID uuid)
{
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
Fri Sep 2 10:24:21 2011
@@ -73,6 +73,7 @@ public class CompactSerializerTest exten
expectedClassNames.add("InnerSerializer");
expectedClassNames.add("LeafSerializer");
expectedClassNames.add("MerkleTreeSerializer");
+ expectedClassNames.add("StreamingRepairTaskSerializer");
discoveredClassNames = new ArrayList<String>();
String cp = System.getProperty("java.class.path");
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1164463&r1=1164462&r2=1164463&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
Fri Sep 2 10:24:21 2011
@@ -103,8 +103,10 @@ public abstract class AntiEntropyService
local_range = StorageService.instance.getLocalPrimaryRange();
- // random session id for each test
- request = new TreeRequest(UUID.randomUUID().toString(), LOCAL,
local_range, new CFPair(tablename, cfname));
+ // (we use REMOTE instead of LOCAL so that the reponses for the
validator.complete() get lost)
+ request = new TreeRequest(UUID.randomUUID().toString(), REMOTE,
local_range, new CFPair(tablename, cfname));
+ // Set a fake session corresponding to this fake request
+ AntiEntropyService.instance.submitArtificialRepairSession(request,
tablename, cfname);
}
@After
@@ -159,26 +161,6 @@ public abstract class AntiEntropyService
}
@Test
- public void testManualRepair() throws Throwable
- {
- RepairFuture sess =
AntiEntropyService.instance.submitRepairSession(local_range, tablename, cfname);
-
- // ensure that the session doesn't end without a response from REMOTE
- try
- {
- sess.get(500, TimeUnit.MILLISECONDS);
- fail("Repair session should not end without response from REMOTE");
- }
- catch (TimeoutException e) {}
-
- // deliver a fake response from REMOTE
- sess.session.completed(REMOTE, request.cf.right);
-
- // block until the repair has completed
- sess.get();
- }
-
- @Test
public void testGetNeighborsPlusOne() throws Throwable
{
// generate rf+1 nodes, and ensure that all nodes are returned
@@ -244,7 +226,10 @@ public abstract class AntiEntropyService
interesting.add(changed);
// difference the trees
- AntiEntropyService.RepairSession.Differencer diff = sess.session.new
Differencer(cfname, request.endpoint, ltree, rtree);
+ // note: we reuse the same endpoint which is bogus in theory but fine
here
+ AntiEntropyService.TreeResponse r1 = new
AntiEntropyService.TreeResponse(REMOTE, ltree);
+ AntiEntropyService.TreeResponse r2 = new
AntiEntropyService.TreeResponse(REMOTE, rtree);
+ AntiEntropyService.RepairSession.Differencer diff = sess.session.new
Differencer(cfname, r1, r2);
diff.run();
// ensure that the changed range was recorded