Author: jbellis
Date: Fri Jun 25 20:34:22 2010
New Revision: 958105
URL: http://svn.apache.org/viewvc?rev=958105&view=rev
Log:
allow multiple repair sessions per node. patch by Stu Hood; reviewed by
jbellis for CASSANDRA-1190
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=958105&r1=958104&r2=958105&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Jun 25 20:34:22 2010
@@ -36,6 +36,7 @@ dev
* revise HH schema to be per-endpoint (CASSANDRA-1142)
* remove gossip message size limit (CASSANDRA-1138)
* add joining/leaving status to nodetool ring (CASSANDRA-1115)
+ * allow multiple repair sessions per node (CASSANDRA-1190)
0.6.3
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=958105&r1=958104&r2=958105&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Fri Jun 25 20:34:22 2010
@@ -25,6 +25,10 @@ import java.security.NoSuchAlgorithmExce
import java.util.*;
import java.util.concurrent.*;
+import com.google.common.base.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.CompactionManager;
@@ -36,16 +40,15 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.AbstractCompactedRow;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamOut;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamOutManager;
import org.apache.cassandra.utils.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* AntiEntropyService encapsulates "validating" (hashing) individual column
families,
* exchanging MerkleTrees with remote nodes via a TreeRequest/Response
conversation,
@@ -82,24 +85,22 @@ public class AntiEntropyService
{
private static final Logger logger =
LoggerFactory.getLogger(AntiEntropyService.class);
- // timeout for outstanding requests (48 hours)
- public final static long REQUEST_TIMEOUT = 48*60*60*1000;
-
// singleton enforcement
public static final AntiEntropyService instance = new AntiEntropyService();
+ // timeout for outstanding requests (48 hours)
+ public final static long REQUEST_TIMEOUT = 48*60*60*1000;
+
/**
- * Map of column families to remote endpoints that need to rendezvous. The
- * first endpoint to arrive at the rendezvous will store its tree in the
- * appropriate slot of the TreePair object, and the second to arrive will
- * remove the stored tree, and compare it.
+ * Map of outstanding sessions to requests. Once both trees reach the
rendezvous, the local node
+ * will queue a Differencer to compare them.
*
* This map is only accessed from AE_SERVICE_STAGE, so it is not
synchronized.
*/
- private final Map<CFPair, ExpiringMap<InetAddress, TreePair>> trees;
+ private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests;
/**
- * A map of repair request ids to a Queue of TreeRequests that have been
performed since the session was started.
+ * A map of repair session ids to a Queue of TreeRequests that have been
performed since the session was started.
*/
private final ConcurrentMap<String, BlockingQueue<TreeRequest>> sessions;
@@ -108,7 +109,7 @@ public class AntiEntropyService
*/
protected AntiEntropyService()
{
- trees = new HashMap<CFPair, ExpiringMap<InetAddress, TreePair>>();
+ requests = new ExpiringMap<String, Map<TreeRequest,
TreePair>>(REQUEST_TIMEOUT);
sessions = new ConcurrentHashMap<String, BlockingQueue<TreeRequest>>();
}
@@ -124,31 +125,27 @@ public class AntiEntropyService
/**
* Called by Differencer when a full repair round trip has been completed
between the given CF and endpoints.
*/
- void completedRequest(CFPair cf, InetAddress... endpoints)
+ void completedRequest(TreeRequest request)
{
- for (InetAddress endpoint : endpoints)
- {
- // indicate to each waiting session that this request completed
- TreeRequest completed = new TreeRequest(cf, endpoint);
- for (BlockingQueue<TreeRequest> session : sessions.values())
- session.offer(completed);
- }
+ // indicate to the waiting session that this request completed
+ BlockingQueue<TreeRequest> session = sessions.get(request.sessionid);
+ if (session == null)
+ // repair client disconnected: ignore
+ return;
+ session.offer(request);
}
/**
- * Returns the map of waiting rendezvous endpoints to trees for the given
cf.
+ * Returns the map of waiting rendezvous endpoints to trees for the given
session.
* Should only be called within AE_SERVICE_STAGE.
- *
- * @param cf Column family to fetch trees for.
- * @return The store of trees for the given cf.
*/
- private ExpiringMap<InetAddress, TreePair> rendezvousPairs(CFPair cf)
+ private Map<TreeRequest, TreePair> rendezvousPairs(String sessionid)
{
- ExpiringMap<InetAddress, TreePair> ctrees = trees.get(cf);
+ Map<TreeRequest, TreePair> ctrees = requests.get(sessionid);
if (ctrees == null)
{
- ctrees = new ExpiringMap<InetAddress, TreePair>(REQUEST_TIMEOUT);
- trees.put(cf, ctrees);
+ ctrees = new HashMap<TreeRequest, TreePair>();
+ requests.put(sessionid, ctrees);
}
return ctrees;
}
@@ -171,55 +168,50 @@ public class AntiEntropyService
}
/**
- * Register a tree from the given endpoint to be compared to the
appropriate trees
- * in AE_SERVICE_STAGE when they become available.
- *
- * @param cf The column family of the tree.
- * @param endpoint The endpoint which owns the given tree.
- * @param tree The tree for the endpoint.
+ * Register a tree for the given request to be compared to the appropriate
trees in AE_SERVICE_STAGE when they become available.
*/
- private void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree)
+ private void rendezvous(TreeRequest request, MerkleTree tree)
{
InetAddress LOCAL = FBUtilities.getLocalAddress();
- // return the rendezvous pairs for this cf
- ExpiringMap<InetAddress, TreePair> ctrees = rendezvousPairs(cf);
+ // the rendezvous pairs for this session
+ Map<TreeRequest, TreePair> ctrees = rendezvousPairs(request.sessionid);
List<Differencer> differencers = new ArrayList<Differencer>();
- if (LOCAL.equals(endpoint))
+ if (LOCAL.equals(request.endpoint))
{
- // we're registering a local tree: rendezvous with all remote trees
- for (InetAddress neighbor : getNeighbors(cf.left))
+ // we're registering a local tree: rendezvous with remote requests
for the session
+ for (InetAddress neighbor : getNeighbors(request.cf.left))
{
- TreePair waiting = ctrees.remove(neighbor);
+ TreeRequest remotereq = new TreeRequest(request.sessionid,
neighbor, request.cf);
+ TreePair waiting = ctrees.remove(remotereq);
if (waiting != null && waiting.right != null)
{
// the neighbor beat us to the rendezvous: queue
differencing
- differencers.add(new Differencer(cf, LOCAL, neighbor,
- tree, waiting.right));
+ // FIXME: Differencer should take a TreeRequest
+ differencers.add(new Differencer(remotereq, tree,
waiting.right));
continue;
}
// else, the local tree is first to the rendezvous: store and
wait
- ctrees.put(neighbor, new TreePair(tree, null));
- logger.debug("Stored local tree for " + cf + " to wait for " +
neighbor);
+ ctrees.put(remotereq, new TreePair(tree, null));
+ logger.debug("Stored local tree for " + request + " to wait
for " + remotereq);
}
}
else
{
// we're registering a remote tree: rendezvous with the local tree
- TreePair waiting = ctrees.remove(endpoint);
+ TreePair waiting = ctrees.remove(request);
if (waiting != null && waiting.left != null)
{
// the local tree beat us to the rendezvous: queue differencing
- differencers.add(new Differencer(cf, LOCAL, endpoint,
- waiting.left, tree));
+ differencers.add(new Differencer(request, waiting.left, tree));
}
else
{
// else, the remote tree is first to the rendezvous: store and
wait
- ctrees.put(endpoint, new TreePair(null, tree));
- logger.debug("Stored remote tree for " + cf + " from " +
endpoint);
+ ctrees.put(request, new TreePair(null, tree));
+ logger.debug("Stored remote tree for " + request + " to wait
for local tree.");
}
}
@@ -231,41 +223,43 @@ public class AntiEntropyService
}
/**
- * Called by a Validator to send a valid tree to endpoints storing
- * replicas of local data.
- *
- * @param validator A locally generated validator.
- * @param local The local endpoint.
- * @param neighbors A list of neighbor endpoints to send the tree to.
+ * Requests a tree from the given node, and returns the request that was
sent.
+ */
+ TreeRequest request(String sessionid, InetAddress remote, String ksname,
String cfname)
+ {
+ TreeRequest request = new TreeRequest(sessionid, remote, new
CFPair(ksname, cfname));
+
MessagingService.instance.sendOneWay(TreeRequestVerbHandler.makeVerb(request),
remote);
+ return request;
+ }
+
+ /**
+ * Responds to the node that requested the given valid tree.
+ * @param validator A locally generated validator
+ * @param local localhost (parameterized for testing)
*/
- void notifyNeighbors(Validator validator, InetAddress local,
Collection<InetAddress> neighbors)
+ void respond(Validator validator, InetAddress local)
{
MessagingService ms = MessagingService.instance;
try
{
Message message = TreeResponseVerbHandler.makeVerb(local,
validator);
- logger.info("Sending AEService tree for " + validator.cf + " to: "
+ neighbors);
- for (InetAddress neighbor : neighbors)
- ms.sendOneWay(message, neighbor);
+ logger.info("Sending AEService tree for " + validator.request);
+ ms.sendOneWay(message, validator.request.endpoint);
}
catch (Exception e)
{
- logger.error("Could not send valid tree to endpoints: " +
neighbors, e);
+ logger.error("Could not send valid tree for request " +
validator.request, e);
}
}
/**
- * Should only be used in AE_SERVICE_STAGE or for testing.
- *
- * @param table Table containing cf.
- * @param cf The column family.
- * @param remote The remote endpoint for the rendezvous.
- * @return The tree pair for the given rendezvous if it exists, else null.
+ * @return The tree pair for the given request if it exists.
*/
- TreePair getRendezvousPair_TestsOnly(String table, String cf, InetAddress
remote)
+ TreePair getRendezvousPair_TestsOnly(TreeRequest request)
{
- return rendezvousPairs(new CFPair(table, cf)).get(remote);
+ System.out.println(request + "\tvs\t" +
rendezvousPairs(request.sessionid).keySet());
+ return rendezvousPairs(request.sessionid).get(request);
}
/**
@@ -278,7 +272,7 @@ public class AntiEntropyService
*/
public static class Validator implements Callable<Object>
{
- public final CFPair cf; // TODO keep a CFS reference as a field
instead of its string representation
+ public final TreeRequest request;
public final MerkleTree tree;
// the minimum token sorts first, but falls into the last range
@@ -291,18 +285,17 @@ public class AntiEntropyService
public final static MerkleTree.RowHash EMPTY_ROW = new
MerkleTree.RowHash(null, new byte[0]);
- Validator(CFPair cf)
+ Validator(TreeRequest request)
{
- this(cf,
+ this(request,
// TODO: memory usage (maxsize) should either be tunable per
// CF, globally, or as shared for all CFs in a cluster
new MerkleTree(DatabaseDescriptor.getPartitioner(),
MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)));
}
- Validator(CFPair cf, MerkleTree tree)
+ Validator(TreeRequest request, MerkleTree tree)
{
- assert cf != null && tree != null;
- this.cf = cf;
+ this.request = request;
this.tree = tree;
minrows = new ArrayList<MerkleTree.RowHash>();
mintoken = null;
@@ -334,7 +327,7 @@ public class AntiEntropyService
break;
}
}
- logger.debug("Prepared AEService tree of size " + tree.size() + "
for " + cf);
+ logger.debug("Prepared AEService tree of size " + tree.size() + "
for " + request);
mintoken = tree.partitioner().getMinimumToken();
ranges = tree.invalids(new Range(mintoken, mintoken));
}
@@ -425,25 +418,18 @@ public class AntiEntropyService
range.addHash(minrow);
StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(this);
- logger.debug("Validated " + validated + " rows into AEService tree
for " + cf);
+ logger.debug("Validated " + validated + " rows into AEService tree
for " + request);
}
/**
- * Called after the validation lifecycle to trigger additional action
- * with the now valid tree. Runs in AE_SERVICE_STAGE.
+ * Called after the validation lifecycle to respond with the now valid
tree. Runs in AE_SERVICE_STAGE.
*
* @return A meaningless object.
*/
public Object call() throws Exception
{
- AntiEntropyService aes = AntiEntropyService.instance;
- InetAddress local = FBUtilities.getLocalAddress();
-
- Collection<InetAddress> neighbors = getNeighbors(cf.left);
-
- // store the local tree and then broadcast it to our neighbors
- aes.rendezvous(cf, local, tree);
- aes.notifyNeighbors(this, local, neighbors);
+ // respond to the request that triggered this validation
+ AntiEntropyService.instance.respond(this,
FBUtilities.getLocalAddress());
// return any old object
return AntiEntropyService.class;
@@ -451,22 +437,18 @@ public class AntiEntropyService
}
/**
- * Compares two trees, and launches repairs for disagreeing ranges.
+ * Runs on the node that initiated a request to compares two trees, and
launch repairs for disagreeing ranges.
*/
public static class Differencer implements Runnable
{
- public final CFPair cf;
- public final InetAddress local;
- public final InetAddress remote;
+ public final TreeRequest request;
public final MerkleTree ltree;
public final MerkleTree rtree;
public final List<MerkleTree.TreeRange> differences;
- public Differencer(CFPair cf, InetAddress local, InetAddress remote,
MerkleTree ltree, MerkleTree rtree)
+ public Differencer(TreeRequest request, MerkleTree ltree, MerkleTree
rtree)
{
- this.cf = cf;
- this.local = local;
- this.remote = remote;
+ this.request = request;
this.ltree = ltree;
this.rtree = rtree;
differences = new ArrayList<MerkleTree.TreeRange>();
@@ -477,6 +459,7 @@ public class AntiEntropyService
*/
public void run()
{
+ InetAddress local = FBUtilities.getLocalAddress();
StorageService ss = StorageService.instance;
// restore partitioners (in case we were serialized)
@@ -486,8 +469,8 @@ public class AntiEntropyService
rtree.partitioner(StorageService.getPartitioner());
// determine the ranges where responsibility overlaps
- Set<Range> interesting = new
HashSet(ss.getRangesForEndpoint(cf.left, local));
- interesting.retainAll(ss.getRangesForEndpoint(cf.left, remote));
+ Set<Range> interesting = new
HashSet(ss.getRangesForEndpoint(request.cf.left, local));
+ interesting.retainAll(ss.getRangesForEndpoint(request.cf.left,
request.endpoint));
// compare trees, and filter out uninteresting differences
for (MerkleTree.TreeRange diff : MerkleTree.difference(ltree,
rtree))
@@ -506,7 +489,7 @@ public class AntiEntropyService
float difference = differenceFraction();
if (difference == 0.0)
{
- logger.info("Endpoints " + local + " and " + remote + " are
consistent for " + cf);
+ logger.info("Endpoints " + local + " and " + request.endpoint
+ " are consistent for " + request.cf);
}
else
{
@@ -521,7 +504,7 @@ public class AntiEntropyService
}
// repair was completed successfully: notify any waiting sessions
- AntiEntropyService.instance.completedRequest(cf, local, remote);
+ AntiEntropyService.instance.completedRequest(request);
}
/**
@@ -542,32 +525,39 @@ public class AntiEntropyService
*/
void performStreamingRepair() throws IOException
{
- logger.info("Performing streaming repair of " + differences.size()
+ " ranges to " + remote + " for " + cf);
- ColumnFamilyStore cfstore =
Table.open(cf.left).getColumnFamilyStore(cf.right);
+ logger.info("Performing streaming repair of " + differences.size()
+ " ranges for " + request);
+ ColumnFamilyStore cfstore =
Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
try
{
final List<Range> ranges = new ArrayList<Range>(differences);
final Collection<SSTableReader> sstables =
cfstore.getSSTables();
+ // send ranges to the remote node
Future f =
StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
- StreamOut.transferSSTables(remote, cf.left, sstables,
ranges);
- StreamOutManager.remove(remote);
+ StreamOut.transferSSTables(request.endpoint,
request.cf.left, sstables, ranges);
+ StreamOutManager.remove(request.endpoint);
}
});
+ // request ranges from the remote node
+ // FIXME: no way to block for the 'requestRanges' call to
complete, or to request a
+ // particular cf: see CASSANDRA-1189
+ StreamIn.requestRanges(request.endpoint, request.cf.left,
ranges);
+
+ // wait until streaming has completed
f.get();
}
catch(Exception e)
{
throw new IOException("Streaming repair failed.", e);
}
- logger.info("Finished streaming repair to " + remote + " for " +
cf);
+ logger.info("Finished streaming repair for " + request);
}
public String toString()
{
- return "#<Differencer " + cf + " local=" + local + " remote=" +
remote + ">";
+ return "#<Differencer " + request + ">";
}
}
@@ -575,17 +565,20 @@ public class AntiEntropyService
* Handler for requests from remote nodes to generate a valid tree.
* The payload is a CFPair representing the columnfamily to validate.
*/
- public static class TreeRequestVerbHandler implements IVerbHandler,
ICompactSerializer<CFPair>
+ public static class TreeRequestVerbHandler implements IVerbHandler,
ICompactSerializer<TreeRequest>
{
public static final TreeRequestVerbHandler SERIALIZER = new
TreeRequestVerbHandler();
- static Message makeVerb(String table, String cf)
+ static Message makeVerb(TreeRequest request)
{
try
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- SERIALIZER.serialize(new CFPair(table, cf), dos);
- return new Message(FBUtilities.getLocalAddress(),
StageManager.AE_SERVICE_STAGE, StorageService.Verb.TREE_REQUEST,
bos.toByteArray());
+ SERIALIZER.serialize(request, dos);
+ return new Message(FBUtilities.getLocalAddress(),
+ StageManager.AE_SERVICE_STAGE,
+ StorageService.Verb.TREE_REQUEST,
+ bos.toByteArray());
}
catch(IOException e)
{
@@ -593,15 +586,19 @@ public class AntiEntropyService
}
}
- public void serialize(CFPair treerequest, DataOutputStream dos) throws
IOException
+ public void serialize(TreeRequest request, DataOutputStream dos)
throws IOException
{
- dos.writeUTF(treerequest.left);
- dos.writeUTF(treerequest.right);
+ dos.writeUTF(request.sessionid);
+ CompactEndpointSerializationHelper.serialize(request.endpoint,
dos);
+ dos.writeUTF(request.cf.left);
+ dos.writeUTF(request.cf.right);
}
- public CFPair deserialize(DataInputStream dis) throws IOException
+ public TreeRequest deserialize(DataInputStream dis) throws IOException
{
- return new CFPair(dis.readUTF(), dis.readUTF());
+ return new TreeRequest(dis.readUTF(),
+
CompactEndpointSerializationHelper.deserialize(dis),
+ new CFPair(dis.readUTF(), dis.readUTF()));
}
/**
@@ -611,19 +608,16 @@ public class AntiEntropyService
{
byte[] bytes = message.getMessageBody();
- ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+ DataInputStream buffer = new DataInputStream(new
ByteArrayInputStream(bytes));
try
{
- CFPair cf = this.deserialize(new DataInputStream(buffer));
- TreeRequest request = new TreeRequest(cf, message.getFrom());
- // FIXME: 0.7 should send the actual RepairSession id across
with the request
- String sessionid = request.toString();
+ TreeRequest remotereq = this.deserialize(buffer);
+ TreeRequest request = new TreeRequest(remotereq.sessionid,
message.getFrom(), remotereq.cf);
// trigger readonly-compaction
- logger.debug("Queueing validation compaction for session " +
sessionid + " request " + request);
- ColumnFamilyStore store =
Table.open(cf.left).getColumnFamilyStore(cf.right);
- // FIXME: should take session id and request
- Validator validator = new Validator(request.left);
+ ColumnFamilyStore store =
Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
+ Validator validator = new Validator(request);
+ logger.debug("Queueing validation compaction for " + request);
CompactionManager.instance.submitValidation(store, validator);
}
catch (IOException e)
@@ -657,7 +651,7 @@ public class AntiEntropyService
public void serialize(Validator v, DataOutputStream dos) throws
IOException
{
- TreeRequestVerbHandler.SERIALIZER.serialize(v.cf, dos);
+ TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos);
ObjectOutputStream oos = new ObjectOutputStream(dos);
oos.writeObject(v.tree);
oos.flush();
@@ -665,11 +659,11 @@ public class AntiEntropyService
public Validator deserialize(DataInputStream dis) throws IOException
{
- final CFPair cf =
TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
+ final TreeRequest request =
TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
ObjectInputStream ois = new ObjectInputStream(dis);
try
{
- return new Validator(cf, (MerkleTree)ois.readObject());
+ return new Validator(request, (MerkleTree)ois.readObject());
}
catch(Exception e)
{
@@ -680,13 +674,14 @@ public class AntiEntropyService
public void doVerb(Message message)
{
byte[] bytes = message.getMessageBody();
- ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+ DataInputStream buffer = new DataInputStream(new
ByteArrayInputStream(bytes));
try
{
// deserialize the remote tree, and register it
- Validator rvalidator = this.deserialize(new
DataInputStream(buffer));
- AntiEntropyService.instance.rendezvous(rvalidator.cf,
message.getFrom(), rvalidator.tree);
+ Validator response = this.deserialize(buffer);
+ TreeRequest request = new
TreeRequest(response.request.sessionid, message.getFrom(), response.request.cf);
+ AntiEntropyService.instance.rendezvous(request, response.tree);
}
catch (IOException e)
{
@@ -708,27 +703,52 @@ public class AntiEntropyService
}
/**
- * A tuple of a local and remote tree. One of the trees should be null, but
- * not both.
+ * A tuple of a local and remote tree.
*/
static class TreePair extends Pair<MerkleTree,MerkleTree>
{
public TreePair(MerkleTree local, MerkleTree remote)
{
super(local, remote);
- assert local != null ^ remote != null;
}
}
/**
* A triple of table, cf and address that represents a location we have an
outstanding TreeRequest for.
*/
- static class TreeRequest extends Pair<CFPair,InetAddress>
+ public static class TreeRequest
{
- public TreeRequest(CFPair cf, InetAddress target)
+ public final String sessionid;
+ public final InetAddress endpoint;
+ public final CFPair cf;
+
+ public TreeRequest(String sessionid, InetAddress endpoint, CFPair cf)
+ {
+ this.sessionid = sessionid;
+ this.endpoint = endpoint;
+ this.cf = cf;
+ }
+
+ @Override
+ public final int hashCode()
+ {
+ return Objects.hashCode(sessionid, endpoint, cf);
+ }
+
+ @Override
+ public final boolean equals(Object o)
+ {
+ if(!(o instanceof TreeRequest))
+ return false;
+ TreeRequest that = (TreeRequest)o;
+ // handles nulls properly
+ return Objects.equal(sessionid, that.sessionid) &&
Objects.equal(endpoint, that.endpoint) && Objects.equal(cf, that.cf);
+ }
+
+ @Override
+ public String toString()
{
- super(cf, target);
- assert cf != null && target != null;
+ return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + cf
+ ">";
}
}
@@ -767,15 +787,13 @@ public class AntiEntropyService
// request that all relevant endpoints generate trees
Set<TreeRequest> requests = new HashSet<TreeRequest>();
Set<InetAddress> endpoints =
AntiEntropyService.getNeighbors(tablename);
- endpoints.add(FBUtilities.getLocalAddress());
for (String cfname : cfnames)
{
- Message request =
TreeRequestVerbHandler.makeVerb(tablename, cfname);
+ // send requests to remote nodes and record them
for (InetAddress endpoint : endpoints)
- {
- requests.add(new TreeRequest(new CFPair(tablename,
cfname), endpoint));
- MessagingService.instance.sendOneWay(request,
endpoint);
- }
+
requests.add(AntiEntropyService.this.request(getName(), endpoint, tablename,
cfname));
+ // send but don't record an outstanding request to the
local node
+ AntiEntropyService.this.request(getName(),
FBUtilities.getLocalAddress(), tablename, cfname);
}
requestsMade.signalAll();
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=958105&r1=958104&r2=958105&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Fri Jun 25 20:34:22 2010
@@ -53,6 +53,7 @@ public class AntiEntropyServiceTest exte
public static String tablename;
public static String cfname;
+ public static TreeRequest request;
public static ColumnFamilyStore store;
public static InetAddress LOCAL, REMOTE;
@@ -77,6 +78,9 @@ public class AntiEntropyServiceTest exte
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), LOCAL);
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(),
REMOTE);
assert tmd.isMember(REMOTE);
+
+ // random session id for each test
+ request = new TreeRequest(UUID.randomUUID().toString(), LOCAL, new
CFPair(tablename, cfname));
}
@After
@@ -90,13 +94,6 @@ public class AntiEntropyServiceTest exte
}
@Test
- public void testInstance() throws Throwable
- {
- assert null != aes;
- assert aes == AntiEntropyService.instance;
- }
-
- @Test
public void testValidatorPrepare() throws Throwable
{
Validator validator;
@@ -110,7 +107,7 @@ public class AntiEntropyServiceTest exte
Util.writeColumnFamily(rms);
// sample
- validator = new Validator(new CFPair(tablename, cfname));
+ validator = new Validator(request);
validator.prepare(store);
// and confirm that the tree was split
@@ -120,7 +117,7 @@ public class AntiEntropyServiceTest exte
@Test
public void testValidatorComplete() throws Throwable
{
- Validator validator = new Validator(new CFPair(tablename, cfname));
+ Validator validator = new Validator(request);
validator.prepare(store);
validator.complete();
@@ -135,7 +132,7 @@ public class AntiEntropyServiceTest exte
@Test
public void testValidatorAdd() throws Throwable
{
- Validator validator = new Validator(new CFPair(tablename, cfname));
+ Validator validator = new Validator(request);
IPartitioner part = validator.tree.partitioner();
Token min = part.getMinimumToken();
Token mid = part.midpoint(min, min);
@@ -154,31 +151,6 @@ public class AntiEntropyServiceTest exte
assert null != validator.tree.hash(new Range(min, min));
}
- /**
- * Build a column family with 2 or more SSTables, and then force a major
compaction
- */
- @Test
- public void testTreeStore() throws Throwable
- {
- // populate column family
- List<RowMutation> rms = new LinkedList<RowMutation>();
- RowMutation rm = new RowMutation(tablename, "key".getBytes());
- rm.add(new QueryPath(cfname, null, "Column1".getBytes()),
"asdf".getBytes(), new TimestampClock(0));
- rms.add(rm);
- // with two SSTables
- Util.writeColumnFamily(rms);
- Util.writeColumnFamily(rms);
-
- TreePair old = aes.getRendezvousPair_TestsOnly(tablename, cfname,
REMOTE);
- // force a readonly compaction, and wait for it to finish
- Validator validator = new Validator(new CFPair(tablename, cfname));
- CompactionManager.instance.submitValidation(store,
validator).get(5000, TimeUnit.MILLISECONDS);
-
- // check that a tree was created and stored
- flushAES().get(5000, TimeUnit.MILLISECONDS);
- assert old != aes.getRendezvousPair_TestsOnly(tablename, cfname,
REMOTE);
- }
-
@Test
public void testManualRepair() throws Throwable
{
@@ -190,35 +162,14 @@ public class AntiEntropyServiceTest exte
sess.join(100);
assert sess.isAlive();
- // deliver fake responses from LOCAL and REMOTE
- AntiEntropyService.instance.completedRequest(new CFPair(tablename,
cfname), LOCAL);
- AntiEntropyService.instance.completedRequest(new CFPair(tablename,
cfname), REMOTE);
+ // deliver a fake response from REMOTE
+ AntiEntropyService.instance.completedRequest(new
TreeRequest(sess.getName(), REMOTE, request.cf));
// block until the repair has completed
sess.join();
}
@Test
- public void testNotifyNeighbors() throws Throwable
- {
- // generate empty tree
- Validator validator = new Validator(new CFPair(tablename, cfname));
- validator.prepare(store);
- validator.complete();
-
- // grab reference to the tree
- MerkleTree tree = validator.tree;
-
- // notify ourself (should immediately be delivered into AE_STAGE)
- aes.notifyNeighbors(validator, LOCAL, Arrays.asList(LOCAL));
- flushAES().get(5, TimeUnit.SECONDS);
-
- // confirm that our reference is not equal to the original due
- // to (de)serialization
- assert tree != aes.getRendezvousPair_TestsOnly(tablename, cfname,
REMOTE).left;
- }
-
- @Test
public void testGetNeighborsPlusOne() throws Throwable
{
// generate rf+1 nodes, and ensure that all nodes are returned
@@ -248,13 +199,13 @@ public class AntiEntropyServiceTest exte
public void testDifferencer() throws Throwable
{
// generate a tree
- Validator validator = new Validator(new CFPair(tablename, cfname));
+ Validator validator = new Validator(request);
validator.prepare(store);
validator.complete();
MerkleTree ltree = validator.tree;
// and a clone
- validator = new Validator(new CFPair(tablename, cfname));
+ validator = new Validator(request);
validator.prepare(store);
validator.complete();
MerkleTree rtree = validator.tree;
@@ -266,8 +217,7 @@ public class AntiEntropyServiceTest exte
changed.hash("non-empty hash!".getBytes());
// difference the trees
- Differencer diff = new Differencer(new CFPair(tablename, cfname),
- LOCAL, LOCAL, ltree, rtree);
+ Differencer diff = new Differencer(request, ltree, rtree);
diff.run();
// ensure that the changed range was recorded