Author: slebresne Date: Thu Nov 3 14:03:01 2011 New Revision: 1197123 URL: http://svn.apache.org/viewvc?rev=1197123&view=rev Log: Add JMX call to remove failed repair sessions patch by yukim; reviewed by slebresne for CASSANDRA-3316
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java Modified: cassandra/branches/cassandra-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1197123&r1=1197122&r2=1197123&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Nov 3 14:03:01 2011 @@ -8,6 +8,7 @@ * fix DecimalType bytebuffer marshalling (CASSANDRA-3421) * fix bug that caused first column in per row indexes to be ignored (CASSANDRA-3441) + * add JMX call to clean (failed) repair sessions (CASSANDRA-3316) Merged from 0.8: * acquire compactionlock during truncate (CASSANDRA-3399) * fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415) Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1197123&r1=1197122&r2=1197123&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Nov 3 14:03:01 2011 @@ -125,6 +125,14 @@ public class AntiEntropyService return futureTask; } + public void terminateSessions() + { + for (RepairSession session : sessions.values()) + { + session.forceShutdown(); + } + } + // for testing only. Create a session corresponding to a fake request and // add it to the sessions (avoid NPE in tests) RepairFuture submitArtificialRepairSession(TreeRequest req, String tablename, String... cfnames) @@ -172,6 +180,9 @@ public class AntiEntropyService return; } + if (session.terminated()) + return; + logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", session.getName(), request.cf.right, request.endpoint)); RepairSession.RepairJob job = session.jobs.peek(); @@ -598,6 +609,8 @@ public class AntiEntropyService private final SimpleCondition completed = new SimpleCondition(); public final Condition differencingDone = new SimpleCondition(); + private volatile boolean terminated = false; + public RepairSession(TreeRequest req, String tablename, String... cfnames) { this(req.sessionid, req.range, tablename, cfnames); @@ -697,12 +710,36 @@ public class AntiEntropyService { FailureDetector.instance.unregisterFailureDetectionEventListener(this); Gossiper.instance.unregister(this); + // mark this session as terminated + terminated = true; AntiEntropyService.instance.sessions.remove(getName()); } } + /** + * @return wheather this session is terminated + */ + public boolean terminated() + { + return terminated; + } + + /** + * clear all RepairJobs and terminate this session. + */ + public void forceShutdown() + { + jobs.clear(); + activeJobs.clear(); + differencingDone.signalAll(); + completed.signalAll(); + } + void completed(Differencer differencer) { + if (terminated) + return; + logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s", getName(), differencer.r1.endpoint, @@ -724,10 +761,7 @@ public class AntiEntropyService String errorMsg = String.format("Endpoint %s died", remote); exception = new IOException(errorMsg); // If a node failed, we stop everything (though there could still be some activity in the background) - jobs.clear(); - activeJobs.clear(); - differencingDone.signalAll(); - completed.signalAll(); + forceShutdown(); } public void onJoin(InetAddress endpoint, EndpointState epState) {} Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java?rev=1197123&r1=1197122&r2=1197123&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java Thu Nov 3 14:03:01 2011 @@ -1757,6 +1757,10 @@ public class StorageService implements I return AntiEntropyService.instance.submitRepairSession(range, tableName, names.toArray(new String[names.size()])); } + public void forceTerminateAllRepairSessions() { + AntiEntropyService.instance.terminateSessions(); + } + /* End of MBean interface methods */ /** Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1197123&r1=1197122&r2=1197123&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java Thu Nov 3 14:03:01 2011 @@ -226,6 +226,8 @@ public interface StorageServiceMBean */ public void forceTableRepairPrimaryRange(String tableName, String... columnFamilies) throws IOException; + public void forceTerminateAllRepairSessions(); + /** * transfer this node's data to other machines and remove it from service. */