Author: slebresne
Date: Wed Aug 31 16:24:13 2011
New Revision: 1163685
URL: http://svn.apache.org/viewvc?rev=1163685&view=rev
Log:
merge from 0.8
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7:1026516-1163266
/cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1163272,1163677
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Aug 31 16:24:13 2011
@@ -96,6 +96,8 @@
* fix ip address String representation in the ring cache (CASSANDRA-3044)
* fix ring cache compatibility when mixing pre-0.8.4 nodes with post-
in the same cluster (CASSANDRA-3023)
+ * make repair report failure when a node participating dies (instead of
+ hanging forever) (CASSANDRA-2433)
0.8.4
* include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972)
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
/cassandra/branches/cassandra-0.7/contrib:1026516-1163266
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1163272,1163677
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1163266
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1163272,1163677
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1163266
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1163272,1163677
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1163266
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1163272,1163677
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1163266
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1163272,1163677
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1163266
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1163272,1163677
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Wed
Aug 31 16:24:13 2011
@@ -180,7 +180,7 @@ public class FailureDetector implements
{
for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
{
- listener.convict(ep);
+ listener.convict(ep, phi);
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Aug 31
16:24:13 2011
@@ -262,7 +262,7 @@ public class Gossiper implements IFailur
*
* param @ endpoint end point that is convicted.
*/
- public void convict(InetAddress endpoint)
+ public void convict(InetAddress endpoint, double phi)
{
EndpointState epState = endpointStateMap.get(endpoint);
if (epState.isAlive())
@@ -735,12 +735,11 @@ public class Gossiper implements IFailur
if (logger.isTraceEnabled())
logger.trace("Adding endpoint state for " + ep);
endpointStateMap.put(ep, epState);
- if (epState.isAlive())
- {
- // the node restarted before we ever marked it down, so we'll
report it as dead briefly so maintenance like resetting the connection pool can
occur
- for (IEndpointStateChangeSubscriber subscriber : subscribers)
- subscriber.onDead(ep, epState);
- }
+
+ // the node restarted: it is up to the subscriber to take whatever
action is necessary
+ for (IEndpointStateChangeSubscriber subscriber : subscribers)
+ subscriber.onRestart(ep, epState);
+
if (epState.getApplicationState(ApplicationState.STATUS) != null &&
!isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
markAlive(ep, epState);
else
Modified:
cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
Wed Aug 31 16:24:13 2011
@@ -47,4 +47,12 @@ public interface IEndpointStateChangeSub
public void onDead(InetAddress endpoint, EndpointState state);
public void onRemove(InetAddress endpoint);
+
+ /**
+ * Called whenever a node is restarted.
+ * Note that there is no guarantee when that happens that the node was
+ * previously marked down. It will have only if {@code state.isAlive() ==
false}
+ * as {@code state} is from before the restarted node is marked up.
+ */
+ public void onRestart(InetAddress endpoint, EndpointState state);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
Wed Aug 31 16:24:13 2011
@@ -31,7 +31,7 @@ public interface IFailureDetectionEventL
/**
* Convict the specified endpoint.
* @param ep endpoint to be convicted
+ * @param phi the value of phi with with ep was convicted
*/
- public void convict(InetAddress ep);
-
+ public void convict(InetAddress ep, double phi);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
Wed Aug 31 16:24:13 2011
@@ -73,6 +73,12 @@ public class Ec2MultiRegionSnitch extend
}
@Override
+ public void onRestart(InetAddress endpoint, EndpointState state)
+ {
+ // do nothing
+ }
+
+ @Override
public void onRemove(InetAddress endpoint)
{
// do nothing.
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Wed Aug 31 16:24:13 2011
@@ -23,27 +23,26 @@ import java.net.InetAddress;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import com.google.common.base.Objects;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.gms.Gossiper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
@@ -97,19 +96,19 @@ public class AntiEntropyService
// singleton enforcement
public static final AntiEntropyService instance = new AntiEntropyService();
- // timeout for outstanding requests (48 hours)
- public final static long REQUEST_TIMEOUT = 48*60*60*1000;
-
- /**
- * Map of outstanding sessions to requests. Once both trees reach the
rendezvous, the local node
- * will queue a Differencer to compare them.
- *
- * This map is only accessed from Stage.ANTIENTROPY, so it is not
synchronized.
- */
- private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests;
+ private static final ThreadPoolExecutor executor;
+ static
+ {
+ executor = new JMXConfigurableThreadPoolExecutor(4,
+ 60,
+ TimeUnit.SECONDS,
+ new
LinkedBlockingQueue<Runnable>(),
+ new
NamedThreadFactory("AntiEntropySessions"),
+ "internal");
+ }
/**
- * A map of repair session ids to a Queue of TreeRequests that have been
performed since the session was started.
+ * A map of active session.
*/
private final ConcurrentMap<String, RepairSession> sessions;
@@ -118,22 +117,24 @@ public class AntiEntropyService
*/
protected AntiEntropyService()
{
- requests = new ExpiringMap<String, Map<TreeRequest,
TreePair>>(REQUEST_TIMEOUT);
sessions = new ConcurrentHashMap<String, RepairSession>();
}
/**
* Requests repairs for the given table and column families, and blocks
until all repairs have been completed.
- * TODO: Should add retries: if nodes go offline before they respond to
the requests, this could block forever.
*/
- public RepairSession getRepairSession(Range range, String tablename,
String... cfnames)
+ public RepairFuture submitRepairSession(Range range, String tablename,
String... cfnames)
{
- return new RepairSession(range, tablename, cfnames);
+ RepairFuture futureTask = new RepairSession(range, tablename,
cfnames).getFuture();
+ executor.execute(futureTask);
+ return futureTask;
}
- RepairSession getArtificialRepairSession(TreeRequest req, String
tablename, String... cfnames)
+ RepairFuture submitArtificialRepairSession(TreeRequest req, String
tablename, String... cfnames)
{
- return new RepairSession(req, tablename, cfnames);
+ RepairFuture futureTask = new RepairSession(req, tablename,
cfnames).getFuture();
+ executor.execute(futureTask);
+ return futureTask;
}
/**
@@ -588,24 +589,25 @@ public class AntiEntropyService
* Triggers repairs with all neighbors for the given table, cfs and range.
* Typical lifecycle is: start() then join(). Executed in client threads.
*/
- class RepairSession extends Thread
+ class RepairSession extends WrappedRunnable implements
IEndpointStateChangeSubscriber, IFailureDetectionEventListener
{
+ private final String sessionName;
private final String tablename;
private final String[] cfnames;
- private final ConcurrentHashMap<TreeRequest,Object> requests = new
ConcurrentHashMap<TreeRequest,Object>();
private final Range range;
- private final Set<InetAddress> endpoints;
+ private volatile Exception exception;
+ private final AtomicBoolean isFailed = new AtomicBoolean(false);
- private CountDownLatch completedLatch;
+ private final Set<InetAddress> endpoints;
final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
+ final Map<String, RepairJob> activeJobs = new
ConcurrentHashMap<String, RepairJob>();
+ private final SimpleCondition completed = new SimpleCondition();
public final Condition differencingDone = new SimpleCondition();
public RepairSession(TreeRequest req, String tablename, String...
cfnames)
{
this(req.sessionid, req.range, tablename, cfnames);
- requests.put(req, this);
- completedLatch = new CountDownLatch(cfnames.length);
AntiEntropyService.instance.sessions.put(getName(), this);
}
@@ -616,7 +618,7 @@ public class AntiEntropyService
private RepairSession(String id, Range range, String tablename,
String[] cfnames)
{
- super(id);
+ this.sessionName = id;
this.tablename = tablename;
this.cfnames = cfnames;
assert cfnames.length > 0 : "Repairing no column families seems
pointless, doesn't it";
@@ -624,8 +626,18 @@ public class AntiEntropyService
this.endpoints = AntiEntropyService.getNeighbors(tablename, range);
}
- @Override
- public void run()
+ public String getName()
+ {
+ return sessionName;
+ }
+
+ RepairFuture getFuture()
+ {
+ return new RepairFuture(this);
+ }
+
+ // we don't care about the return value but care about it throwing
exception
+ public void runMayThrow() throws Exception
{
if (endpoints.isEmpty())
{
@@ -646,20 +658,25 @@ public class AntiEntropyService
}
AntiEntropyService.instance.sessions.put(getName(), this);
+ Gossiper.instance.register(this);
+
FailureDetector.instance.registerFailureDetectionEventListener(this);
try
{
// Create and queue a RepairJob for each column family
for (String cfname : cfnames)
- jobs.offer(new RepairJob(cfname));
-
- // We'll repair once by endpoints and column family
- completedLatch = new CountDownLatch(endpoints.size() *
cfnames.length);
+ {
+ RepairJob job = new RepairJob(cfname);
+ jobs.offer(job);
+ activeJobs.put(cfname, job);
+ }
jobs.peek().sendTreeRequests();
// block whatever thread started this session until all
requests have been returned:
// if this thread dies, the session will still complete in the
background
- completedLatch.await();
+ completed.await();
+ if (exception != null)
+ throw exception;
}
catch (InterruptedException e)
{
@@ -667,6 +684,8 @@ public class AntiEntropyService
}
finally
{
+
FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+ Gossiper.instance.unregister(this);
AntiEntropyService.instance.sessions.remove(getName());
}
}
@@ -674,20 +693,69 @@ public class AntiEntropyService
void completed(InetAddress remote, String cfname)
{
logger.debug("Repair completed for {} on {}", remote, cfname);
- completedLatch.countDown();
+ RepairJob job = activeJobs.get(cfname);
+ if (job.completedSynchronizationJob(remote))
+ {
+ activeJobs.remove(cfname);
+ if (activeJobs.isEmpty())
+ completed.signalAll();
+ }
+ }
+
+ void failedNode(InetAddress remote)
+ {
+ String errorMsg = String.format("Problem during repair session %s,
endpoint %s died", sessionName, remote);
+ logger.error(errorMsg);
+ exception = new IOException(errorMsg);
+ // If a node failed, we stop everything (though there could still
be some activity in the background)
+ jobs.clear();
+ activeJobs.clear();
+ differencingDone.signalAll();
+ completed.signalAll();
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void onChange(InetAddress endpoint, ApplicationState state,
VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!endpoints.contains(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than usual
because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ // Though unlikely, it is possible to arrive here multiple time
and we
+ // want to avoid print an error message twice
+ if (!isFailed.compareAndSet(false, true))
+ return;
+
+ failedNode(endpoint);
}
class RepairJob
{
private final String cfname;
- private final AtomicInteger remaining;
- private final Map<InetAddress, MerkleTree> trees;
+ private final Set<InetAddress> requestedEndpoints = new
HashSet<InetAddress>();
+ private final Map<InetAddress, MerkleTree> trees = new
HashMap<InetAddress, MerkleTree>();
+ private final Set<InetAddress> syncJobs = new
HashSet<InetAddress>();
public RepairJob(String cfname)
{
this.cfname = cfname;
- this.remaining = new AtomicInteger(endpoints.size() + 1); //
all neighbor + local host
- this.trees = new ConcurrentHashMap<InetAddress, MerkleTree>();
}
/**
@@ -695,22 +763,24 @@ public class AntiEntropyService
*/
public void sendTreeRequests()
{
- // send requests to remote nodes and record them
- for (InetAddress endpoint : endpoints)
-
requests.put(AntiEntropyService.instance.request(getName(), endpoint, range,
tablename, cfname), RepairSession.this);
- // send but don't record an outstanding request to the local
node
- AntiEntropyService.instance.request(getName(),
FBUtilities.getBroadcastAddress(), range, tablename, cfname);
+ requestedEndpoints.addAll(endpoints);
+ requestedEndpoints.add(FBUtilities.getBroadcastAddress());
+
+ // send requests to all nodes
+ for (InetAddress endpoint : requestedEndpoints)
+ AntiEntropyService.instance.request(getName(), endpoint,
range, tablename, cfname);
}
/**
* Add a new received tree and return the number of remaining tree
to
* be received for the job to be complete.
*/
- public int addTree(TreeRequest request, MerkleTree tree)
+ public synchronized int addTree(TreeRequest request, MerkleTree
tree)
{
assert request.cf.right.equals(cfname);
trees.put(request.endpoint, tree);
- return remaining.decrementAndGet();
+ requestedEndpoints.remove(request.endpoint);
+ return requestedEndpoints.size();
}
/**
@@ -719,7 +789,7 @@ public class AntiEntropyService
*/
public void submitDifferencers()
{
- assert remaining.get() == 0;
+ assert requestedEndpoints.size() == 0;
// Right now, we only difference local host against each
other. CASSANDRA-2610 will fix that.
// In the meantime ugly special casing will work good enough.
@@ -732,9 +802,17 @@ public class AntiEntropyService
continue;
Differencer differencer = new Differencer(cfname,
entry.getKey(), entry.getValue(), localTree);
+ syncJobs.add(entry.getKey());
logger.debug("Queueing comparison " + differencer);
StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
}
+ trees.clear(); // allows gc to do its thing
+ }
+
+ synchronized boolean completedSynchronizationJob(InetAddress
remote)
+ {
+ syncJobs.remove(remote);
+ return syncJobs.isEmpty();
}
}
@@ -841,11 +919,21 @@ public class AntiEntropyService
return;
// all calls finished successfully
- //
completed(remote, cfname);
- logger.info(String.format("Finished streaming repair with
%s for %s: %d oustanding to complete session", remote, range,
completedLatch.getCount()));
+ logger.info(String.format("Finished streaming repair with
%s for %s", remote, range));
}
}
}
}
+
+ public static class RepairFuture extends FutureTask
+ {
+ public final RepairSession session;
+
+ RepairFuture(RepairSession session)
+ {
+ super(session, null);
+ this.session = session;
+ }
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
Wed Aug 31 16:24:13 2011
@@ -62,6 +62,8 @@ public class LoadBroadcaster implements
public void onDead(InetAddress endpoint, EndpointState state) {}
+ public void onRestart(InetAddress endpoint, EndpointState state) {}
+
public void onRemove(InetAddress endpoint) {}
public Map<InetAddress, Double> getLoadInfo()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
Wed Aug 31 16:24:13 2011
@@ -78,6 +78,8 @@ public class MigrationManager implements
public void onDead(InetAddress endpoint, EndpointState state) { }
+ public void onRestart(InetAddress endpoint, EndpointState state) { }
+
public void onRemove(InetAddress endpoint) { }
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Aug 31 16:24:13 2011
@@ -1268,6 +1268,13 @@ public class StorageService implements I
MessagingService.instance().convict(endpoint);
}
+ public void onRestart(InetAddress endpoint, EndpointState state)
+ {
+ // If we have restarted before the node was even marked down, we need
to reset the connection pool
+ if (state.isAlive())
+ onDead(endpoint, state);
+ }
+
/** raw load value */
public double getLoad()
{
@@ -1564,44 +1571,43 @@ public class StorageService implements I
{
return;
}
-
- List<AntiEntropyService.RepairSession> sessions = new
ArrayList<AntiEntropyService.RepairSession>();
+ List<AntiEntropyService.RepairFuture> futures = new
ArrayList<AntiEntropyService.RepairFuture>();
for (Range range : getLocalRanges(tableName))
{
- AntiEntropyService.RepairSession session = forceTableRepair(range,
tableName, columnFamilies);
- sessions.add(session);
+ AntiEntropyService.RepairFuture future = forceTableRepair(range,
tableName, columnFamilies);
+ futures.add(future);
// wait for a session to be done with its differencing before
starting the next one
try
{
- session.differencingDone.await();
+ future.session.differencingDone.await();
}
catch (InterruptedException e)
{
- logger_.error("Interrupted while waiting for the differencing
of repair session " + session + " to be done. Repair may be imprecise.", e);
+ logger_.error("Interrupted while waiting for the differencing
of repair session " + future.session + " to be done. Repair may be imprecise.",
e);
}
}
boolean failedSession = false;
// block until all repair sessions have completed
- for (AntiEntropyService.RepairSession sess : sessions)
+ for (AntiEntropyService.RepairFuture future : futures)
{
try
{
- sess.join();
+ future.get();
}
- catch (InterruptedException e)
+ catch (Exception e)
{
- logger_.error("Repair session " + sess + " failed.", e);
+ logger_.error("Repair session " + future.session + " failed.",
e);
failedSession = true;
}
}
if (failedSession)
- throw new IOException("Some Repair session(s) failed.");
+ throw new IOException("Some repair session(s) failed (see log for
details).");
}
- public AntiEntropyService.RepairSession forceTableRepair(final Range
range, final String tableName, final String... columnFamilies) throws
IOException
+ public AntiEntropyService.RepairFuture forceTableRepair(final Range range,
final String tableName, final String... columnFamilies) throws IOException
{
ArrayList<String> names = new ArrayList<String>();
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName,
columnFamilies))
@@ -1609,9 +1615,7 @@ public class StorageService implements I
names.add(cfStore.getColumnFamilyName());
}
- AntiEntropyService.RepairSession sess =
AntiEntropyService.instance.getRepairSession(range, tableName,
names.toArray(new String[names.size()]));
- sess.start();
- return sess;
+ return AntiEntropyService.instance.submitRepairSession(range,
tableName, names.toArray(new String[names.size()]));
}
/* End of MBean interface methods */
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
Wed Aug 31 16:24:13 2011
@@ -20,9 +20,7 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.*;
import org.apache.cassandra.config.Schema;
import org.junit.After;
@@ -47,6 +45,7 @@ import org.apache.cassandra.utils.Merkle
import static org.apache.cassandra.service.AntiEntropyService.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class AntiEntropyServiceTestAbstract extends CleanupHelper
@@ -162,18 +161,21 @@ public abstract class AntiEntropyService
@Test
public void testManualRepair() throws Throwable
{
- AntiEntropyService.RepairSession sess =
AntiEntropyService.instance.getRepairSession(local_range, tablename, cfname);
- sess.start();
+ RepairFuture sess =
AntiEntropyService.instance.submitRepairSession(local_range, tablename, cfname);
// ensure that the session doesn't end without a response from REMOTE
- sess.join(500);
- assert sess.isAlive();
+ try
+ {
+ sess.get(500, TimeUnit.MILLISECONDS);
+ fail("Repair session should not end without response from REMOTE");
+ }
+ catch (TimeoutException e) {}
// deliver a fake response from REMOTE
- sess.completed(REMOTE, request.cf.right);
+ sess.session.completed(REMOTE, request.cf.right);
// block until the repair has completed
- sess.join();
+ sess.get();
}
@Test
@@ -218,8 +220,8 @@ public abstract class AntiEntropyService
public void testDifferencer() throws Throwable
{
// this next part does some housekeeping so that cleanup in the
differencer doesn't error out.
- AntiEntropyService.RepairSession sess =
AntiEntropyService.instance.getArtificialRepairSession(request, tablename,
cfname);
-
+ AntiEntropyService.RepairFuture sess =
AntiEntropyService.instance.submitArtificialRepairSession(request, tablename,
cfname);
+
// generate a tree
Validator validator = new Validator(request);
validator.prepare(store);
@@ -242,7 +244,7 @@ public abstract class AntiEntropyService
interesting.add(changed);
// difference the trees
- AntiEntropyService.RepairSession.Differencer diff = sess.new
Differencer(cfname, request.endpoint, ltree, rtree);
+ AntiEntropyService.RepairSession.Differencer diff = sess.session.new
Differencer(cfname, request.endpoint, ltree, rtree);
diff.run();
// ensure that the changed range was recorded