Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/service/ActiveRepairService.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0cfeab60
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0cfeab60
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0cfeab60
Branch: refs/heads/trunk
Commit: 0cfeab60a44bf80cdd60a7887012f33db3fc57ab
Parents: 6e9aec3 8c003a2
Author: Yuki Morishita <[email protected]>
Authored: Mon Feb 2 16:56:46 2015 -0600
Committer: Yuki Morishita <[email protected]>
Committed: Mon Feb 2 16:56:46 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/service/ActiveRepairService.java | 3 +++
2 files changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cfeab60/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cfeab60/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index fa9be8a,15e7641..1882a7b
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -109,40 -120,29 +109,43 @@@ public class ActiveRepairServic
*
* @return Future for asynchronous call or null if there is no need to
repair
*/
- public RepairFuture submitRepairSession(UUID parentRepairSession,
Range<Token> range, String keyspace, RepairParallelism parallelismDegree,
Set<InetAddress> endpoints, String... cfnames)
+ public RepairSession submitRepairSession(UUID parentRepairSession,
+ Range<Token> range,
+ String keyspace,
+ RepairParallelism
parallelismDegree,
+ Set<InetAddress> endpoints,
+ long repairedAt,
+ ListeningExecutorService
executor,
+ String... cfnames)
{
- if (cfnames.length == 0)
+ if (endpoints.isEmpty())
return null;
- RepairSession session = new RepairSession(parentRepairSession, range,
keyspace, parallelismDegree, endpoints, cfnames);
- if (session.endpoints.isEmpty())
+
++ if (cfnames.length == 0)
+ return null;
- RepairFuture futureTask = new RepairFuture(session);
- executor.execute(futureTask);
- return futureTask;
- }
+
- public void addToActiveSessions(RepairSession session)
- {
+ final RepairSession session = new RepairSession(parentRepairSession,
UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints,
repairedAt, cfnames);
+
sessions.put(session.getId(), session);
- Gossiper.instance.register(session);
-
FailureDetector.instance.registerFailureDetectionEventListener(session);
- }
+ // register listeners
+ gossiper.register(session);
+ failureDetector.registerFailureDetectionEventListener(session);
- public void removeFromActiveSessions(RepairSession session)
- {
- Gossiper.instance.unregister(session);
- sessions.remove(session.getId());
+ // unregister listeners at completion
+ session.addListener(new Runnable()
+ {
+ /**
+ * When repair finished, do clean up
+ */
+ public void run()
+ {
+
failureDetector.unregisterFailureDetectionEventListener(session);
+ gossiper.unregister(session);
+ sessions.remove(session.getId());
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ session.start(executor);
+ return session;
}
public synchronized void terminateSessions()