Author: jbellis
Date: Sun Sep 26 21:53:50 2010
New Revision: 1001532
URL: http://svn.apache.org/viewvc?rev=1001532&view=rev
Log:
add repair callbacks to track session completion.
patch by Stu Hood; reviewed by jbellis for CASSANDRA-1511
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sun Sep 26 21:53:50 2010
@@ -94,6 +94,7 @@
saved property, instead of vice versa (CASSANDRA-1447)
* JMX MessagingService pending and completed counts (CASSANDRA-1533)
* fix race condition processing repair responses (CASSANDRA-1511)
+ * make repair blocking (CASSANDRA-1511)
0.7-beta1
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Sun Sep 26 21:53:50 2010
@@ -24,6 +24,7 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Objects;
import org.slf4j.Logger;
@@ -103,7 +104,7 @@ public class AntiEntropyService
/**
* A map of repair session ids to a Queue of TreeRequests that have been
performed since the session was started.
*/
- private final ConcurrentMap<String, BlockingQueue<TreeRequest>> sessions;
+ private final ConcurrentMap<String, RepairSession.Callback> sessions;
/**
* Protected constructor. Use AntiEntropyService.instance.
@@ -111,7 +112,7 @@ public class AntiEntropyService
protected AntiEntropyService()
{
requests = new ExpiringMap<String, Map<TreeRequest,
TreePair>>(REQUEST_TIMEOUT);
- sessions = new ConcurrentHashMap<String, BlockingQueue<TreeRequest>>();
+ sessions = new ConcurrentHashMap<String, RepairSession.Callback>();
}
/**
@@ -129,11 +130,7 @@ public class AntiEntropyService
void completedRequest(TreeRequest request)
{
// indicate to the waiting session that this request completed
- BlockingQueue<TreeRequest> session = sessions.get(request.sessionid);
- if (session == null)
- // repair client disconnected: ignore
- return;
- session.offer(request);
+ sessions.get(request.sessionid).completed(request);
}
/**
@@ -429,7 +426,7 @@ public class AntiEntropyService
}
/**
- * Runs on the node that initiated a request to compares two trees, and
launch repairs for disagreeing ranges.
+ * Runs on the node that initiated a request to compare two trees, and
launch repairs for disagreeing ranges.
*/
public static class Differencer implements Runnable
{
@@ -479,24 +476,24 @@ public class AntiEntropyService
// choose a repair method based on the significance of the
difference
float difference = differenceFraction();
+ String format = "Endpoints " + local + " and " + request.endpoint
+ " are %s for " + request.cf;
if (difference == 0.0)
{
- logger.info("Endpoints " + local + " and " + request.endpoint
+ " are consistent for " + request.cf);
+ logger.info(String.format(format, "consistent"));
+ AntiEntropyService.instance.completedRequest(request);
+ return;
}
- else
+
+ // non-0 difference: perform streaming repair
+ logger.info(String.format(format, (difference * 100) + "% out of
sync"));
+ try
{
- try
- {
- performStreamingRepair();
- }
- catch(IOException e)
- {
- throw new RuntimeException(e);
- }
+ performStreamingRepair();
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException(e);
}
-
- // repair was completed successfully: notify any waiting sessions
- AntiEntropyService.instance.completedRequest(request);
}
/**
@@ -512,8 +509,8 @@ public class AntiEntropyService
}
/**
- * Sends our list of differences to the remote endpoint using the
- * Streaming API.
+ * Starts sending/receiving our list of differences to/from the remote
endpoint: creates a callback
+ * that will be called out of band once the streams complete.
*/
void performStreamingRepair() throws IOException
{
@@ -521,36 +518,46 @@ public class AntiEntropyService
ColumnFamilyStore cfstore =
Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
try
{
- final List<Range> ranges = new ArrayList<Range>(differences);
- final Collection<SSTableReader> sstables =
cfstore.getSSTables();
+ List<Range> ranges = new ArrayList<Range>(differences);
+ Collection<SSTableReader> sstables = cfstore.getSSTables();
+ Callback callback = new Callback();
// send ranges to the remote node
- Future f = StageManager.getStage(Stage.STREAM).submit(new
WrappedRunnable()
- {
- protected void runMayThrow() throws Exception
- {
- StreamOutSession session =
StreamOutSession.create(request.cf.left, request.endpoint, null);
- StreamOut.transferSSTables(session, sstables, ranges);
- }
- });
+ StreamOutSession outsession =
StreamOutSession.create(request.cf.left, request.endpoint, callback);
+ StreamOut.transferSSTables(outsession, sstables, ranges);
// request ranges from the remote node
- // FIXME: no way to block for the 'requestRanges' call to
complete, or to request a
- // particular cf: see CASSANDRA-1189
- StreamIn.requestRanges(request.endpoint, request.cf.left,
ranges);
-
- // wait until streaming has completed
- f.get();
+ StreamIn.requestRanges(request.endpoint, request.cf.left,
ranges, callback);
}
catch(Exception e)
{
throw new IOException("Streaming repair failed.", e);
}
- logger.info("Finished streaming repair for " + request);
}
public String toString()
{
return "#<Differencer " + request + ">";
}
+
+ /**
+ * When a repair is necessary, this callback is created to wait for
the inbound
+ * and outbound streams to complete.
+ */
+ class Callback extends WrappedRunnable
+ {
+ // we expect one callback for the receive, and one for the send
+ private final AtomicInteger outstanding = new AtomicInteger(2);
+
+ protected void runMayThrow() throws Exception
+ {
+ if (outstanding.decrementAndGet() > 0)
+ // waiting on more calls
+ return;
+
+ // all calls finished successfully
+ logger.info("Finished streaming repair for " + request);
+ AntiEntropyService.instance.completedRequest(request);
+ }
+ }
}
/**
@@ -743,18 +750,21 @@ public class AntiEntropyService
/**
* Triggers repairs with all neighbors for the given table and cfs.
Typical lifecycle is: start() then join().
+ * Executed in client threads.
*/
class RepairSession extends Thread
{
private final String tablename;
private final String[] cfnames;
private final SimpleCondition requestsMade;
+ private final ConcurrentHashMap<TreeRequest,Object> requests;
public RepairSession(String tablename, String... cfnames)
{
super("manual-repair-" + UUID.randomUUID());
this.tablename = tablename;
this.cfnames = cfnames;
this.requestsMade = new SimpleCondition();
+ this.requests = new ConcurrentHashMap<TreeRequest,Object>();
}
/**
@@ -769,39 +779,60 @@ public class AntiEntropyService
public void run()
{
// begin a repair session
- BlockingQueue<TreeRequest> completed = new
LinkedBlockingQueue<TreeRequest>();
- AntiEntropyService.this.sessions.put(getName(), completed);
+ Callback callback = new Callback();
+ AntiEntropyService.this.sessions.put(getName(), callback);
try
{
// request that all relevant endpoints generate trees
- Set<TreeRequest> requests = new HashSet<TreeRequest>();
Set<InetAddress> endpoints =
AntiEntropyService.getNeighbors(tablename);
for (String cfname : cfnames)
{
// send requests to remote nodes and record them
for (InetAddress endpoint : endpoints)
-
requests.add(AntiEntropyService.this.request(getName(), endpoint, tablename,
cfname));
+
requests.put(AntiEntropyService.this.request(getName(), endpoint, tablename,
cfname), this);
// send but don't record an outstanding request to the
local node
AntiEntropyService.this.request(getName(),
FBUtilities.getLocalAddress(), tablename, cfname);
}
+ logger.info("Waiting for repair requests: " +
requests.keySet());
requestsMade.signalAll();
- // block until all requests have been returned by
completedRequest calls
- logger.info("Waiting for repair requests to: " + requests);
- while (!requests.isEmpty())
- {
- TreeRequest request = completed.take();
- logger.info("Repair request to " + request + " completed
successfully.");
- requests.remove(request);
- }
+ // block whatever thread started this session until all
requests have been returned:
+ // if this thread dies, the session will still complete in the
background
+ callback.completed.await();
}
catch (InterruptedException e)
{
throw new RuntimeException("Interrupted while waiting for
repair: repair will continue in the background.");
}
- finally
+ }
+
+ /**
+ * Receives notifications of completed requests, and sets a condition
when all requests
+ * triggered by this session have completed.
+ */
+ class Callback
+ {
+ public final SimpleCondition completed = new SimpleCondition();
+ public void completed(TreeRequest request)
{
+ // don't mark any requests completed until all requests have
been made
+ try
+ {
+ blockUntilRunning();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ requests.remove(request);
+ logger.info("{} completed successfully: {} outstanding.",
request, requests.size());
+ if (!requests.isEmpty())
+ return;
+
+ // all requests completed
+ logger.info("Session " + getName() + " completed
successfully.");
AntiEntropyService.this.sessions.remove(getName());
+ completed.signalAll();
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
Sun Sep 26 21:53:50 2010
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.utils.Pair;
@@ -80,7 +82,7 @@ public class PendingFile
public String toString()
{
- return getFilename() + "/" + sections;
+ return getFilename() + "/" + StringUtils.join(sections, ",");
}
public static class PendingFileSerializer implements
ICompactSerializer<PendingFile>
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun
Sep 26 21:53:50 2010
@@ -26,9 +26,9 @@ import java.io.IOError;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.db.Table;
@@ -72,7 +72,8 @@ public class StreamOut
// this is so that this target shows up as a destination while
anticompaction is happening.
StreamOutSession session = StreamOutSession.create(tableName, target,
callback);
- logger.info("Beginning transfer process to {} for ranges {}", target,
StringUtils.join(ranges, ", "));
+ logger.info("Beginning transfer to {}", target);
+ logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
try
{
@@ -120,7 +121,8 @@ public class StreamOut
{
assert ranges.size() > 0;
- logger.info("Beginning transfer process to {} for ranges {}",
session.getHost(), StringUtils.join(ranges, ", "));
+ logger.info("Beginning transfer to {}", session.getHost());
+ logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
try
{
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Sun Sep 26 21:53:50 2010
@@ -125,9 +125,6 @@ public class AntiEntropyServiceTest exte
// confirm that the tree was validated
Token min = validator.tree.partitioner().getMinimumToken();
assert null != validator.tree.hash(new Range(min, min));
-
- // wait for queued operations to be flushed
- flushAES();
}
@Test
@@ -174,7 +171,7 @@ public class AntiEntropyServiceTest exte
public void testGetNeighborsPlusOne() throws Throwable
{
// generate rf+1 nodes, and ensure that all nodes are returned
- Set<InetAddress> expected = addTokens(1 + 1 +
DatabaseDescriptor.getReplicationFactor(tablename));
+ Set<InetAddress> expected = addTokens(1 +
DatabaseDescriptor.getReplicationFactor(tablename));
expected.remove(FBUtilities.getLocalAddress());
assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
}
@@ -185,7 +182,7 @@ public class AntiEntropyServiceTest exte
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
// generate rf*2 nodes, and ensure that only neighbors specified by
the ARS are returned
- addTokens(1 + (2 *
DatabaseDescriptor.getReplicationFactor(tablename)));
+ addTokens(2 * DatabaseDescriptor.getReplicationFactor(tablename));
AbstractReplicationStrategy ars =
StorageService.instance.getReplicationStrategy(tablename);
Set<InetAddress> expected = new HashSet<InetAddress>();
for (Range replicaRange :
ars.getAddressRanges().get(FBUtilities.getLocalAddress()))
@@ -230,7 +227,7 @@ public class AntiEntropyServiceTest exte
{
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
Set<InetAddress> endpoints = new HashSet<InetAddress>();
- for (int i = 1; i < max; i++)
+ for (int i = 1; i <= max; i++)
{
InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(),
endpoint);