improve concurrency of repair process

patch by yukim; reviewed by krummas for CASSANDRA-6455


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/810c2d5f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/810c2d5f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/810c2d5f

Branch: refs/heads/trunk
Commit: 810c2d5fe64333c0bcfe0b2ed3ea2c8f6aaf89b7
Parents: 5c35f92
Author: Yuki Morishita <[email protected]>
Authored: Thu Oct 9 18:12:36 2014 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Thu Oct 9 18:12:36 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/repair/Differencer.java    | 136 --------
 .../repair/IRepairJobEventListener.java         |  31 --
 .../apache/cassandra/repair/LocalSyncTask.java  |  81 +++++
 .../apache/cassandra/repair/RemoteSyncTask.java |  67 ++++
 .../apache/cassandra/repair/RepairFuture.java   |  31 --
 .../org/apache/cassandra/repair/RepairJob.java  | 235 +++++++-------
 .../repair/RepairMessageVerbHandler.java        |   7 +-
 .../apache/cassandra/repair/RepairResult.java   |  32 ++
 .../apache/cassandra/repair/RepairSession.java  | 263 ++++++---------
 .../cassandra/repair/RequestCoordinator.java    | 128 --------
 .../apache/cassandra/repair/SnapshotTask.java   |   1 +
 .../cassandra/repair/StreamingRepairTask.java   |  59 ++--
 .../org/apache/cassandra/repair/SyncStat.java   |  33 ++
 .../org/apache/cassandra/repair/SyncTask.java   |  83 +++++
 .../apache/cassandra/repair/ValidationTask.java |  71 ++++
 .../cassandra/repair/messages/RepairOption.java | 290 +++++++++++++++++
 .../cassandra/service/ActiveRepairService.java  | 129 ++++----
 .../cassandra/service/StorageService.java       | 320 +++++++++++--------
 .../cassandra/service/StorageServiceMBean.java  |  23 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  11 +-
 .../org/apache/cassandra/tools/NodeProbe.java   | 122 +------
 .../org/apache/cassandra/tools/NodeTool.java    |  39 ++-
 .../apache/cassandra/tools/RepairRunner.java    | 101 ++++++
 .../cassandra/repair/DifferencerTest.java       | 164 ----------
 .../cassandra/repair/LocalSyncTaskTest.java     | 129 ++++++++
 .../cassandra/repair/RepairSessionTest.java     |  71 ++++
 .../repair/messages/RepairOptionTest.java       |  88 +++++
 .../service/ActiveRepairServiceTest.java        | 218 +++++++++++++
 .../service/AntiEntropyServiceCounterTest.java  |  47 ---
 .../service/AntiEntropyServiceStandardTest.java |  47 ---
 .../service/AntiEntropyServiceTestAbstract.java | 282 ----------------
 32 files changed, 1793 insertions(+), 1547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 432299a..6d67e2e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,7 @@
  * Use unsafe mutations for most unit tests (CASSANDRA-6969)
  * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
  * Fail on very large batch sizes (CASSANDRA-8011)
+ * improve concurrency of repair (CASSANDRA-6455)
 
 
 2.1.1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/Differencer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Differencer.java 
b/src/java/org/apache/cassandra/repair/Differencer.java
deleted file mode 100644
index 214d2c9..0000000
--- a/src/java/org/apache/cassandra/repair/Differencer.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.base.Objects;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.SyncComplete;
-import org.apache.cassandra.repair.messages.SyncRequest;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
-
-/**
- * Runs on the node that initiated a request to compare two trees, and launch 
repairs for disagreeing ranges.
- */
-public class Differencer implements Runnable
-{
-    private static Logger logger = LoggerFactory.getLogger(Differencer.class);
-
-    private final RepairJobDesc desc;
-    public final TreeResponse r1;
-    public final TreeResponse r2;
-    public final List<Range<Token>> differences = new ArrayList<>();
-
-    public Differencer(RepairJobDesc desc, TreeResponse r1, TreeResponse r2)
-    {
-        this.desc = desc;
-        this.r1 = r1;
-        this.r2 = r2;
-    }
-
-    /**
-     * Compares our trees, and triggers repairs for any ranges that mismatch.
-     */
-    public void run()
-    {
-        // compare trees, and collect differences
-        differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
-
-        // choose a repair method based on the significance of the difference
-        String format = String.format("[repair #%s] Endpoints %s and %s %%s 
for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
-        if (differences.isEmpty())
-        {
-            logger.info(String.format(format, "are consistent"));
-            // send back sync complete message
-            MessagingService.instance().sendOneWay(new SyncComplete(desc, 
r1.endpoint, r2.endpoint, true).createMessage(), FBUtilities.getLocalAddress());
-            return;
-        }
-
-        // non-0 difference: perform streaming repair
-        logger.info(String.format(format, "have " + differences.size() + " 
range(s) out of sync"));
-        performStreamingRepair();
-    }
-
-    /**
-     * Starts sending/receiving our list of differences to/from the remote 
endpoint: creates a callback
-     * that will be called out of band once the streams complete.
-     */
-    void performStreamingRepair()
-    {
-        InetAddress local = FBUtilities.getBroadcastAddress();
-        // We can take anyone of the node as source or destination, however if 
one is localhost, we put at source to avoid a forwarding
-        InetAddress src = r2.endpoint.equals(local) ? r2.endpoint : 
r1.endpoint;
-        InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : 
r2.endpoint;
-
-        SyncRequest request = new SyncRequest(desc, local, src, dst, 
differences);
-        StreamingRepairTask task = new StreamingRepairTask(desc, request);
-        task.run();
-    }
-
-
-    /**
-     * In order to remove completed Differencer, equality is computed only 
from {@code desc} and
-     * endpoint part of two TreeResponses.
-     */
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        Differencer that = (Differencer) o;
-        if (!desc.equals(that.desc)) return false;
-        return minEndpoint().equals(that.minEndpoint()) && 
maxEndpoint().equals(that.maxEndpoint());
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hashCode(desc, minEndpoint(), maxEndpoint());
-    }
-
-    // For equals and hashcode, we don't want to take the endpoint order into 
account.
-    // So we just order endpoint deterministically to simplify this
-    private InetAddress minEndpoint()
-    {
-        return FBUtilities.compareUnsigned(r1.endpoint.getAddress(), 
r2.endpoint.getAddress()) < 0
-             ? r1.endpoint
-             : r2.endpoint;
-    }
-
-    private InetAddress maxEndpoint()
-    {
-        return FBUtilities.compareUnsigned(r1.endpoint.getAddress(), 
r2.endpoint.getAddress()) < 0
-             ? r2.endpoint
-             : r1.endpoint;
-    }
-
-    public String toString()
-    {
-        return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint + ":" + 
desc.columnFamily + "@" + desc.range + ">";
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java 
b/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java
deleted file mode 100644
index 778c09d..0000000
--- a/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-/**
- * Implemented by the RepairSession to accept callbacks from sequential 
snapshot creation failure.
- */
-
-public interface IRepairJobEventListener
-{
-    /**
-     * Signal that there was a failure during the snapshot creation process.
-     *
-     */
-    public void failedSnapshot();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
new file mode 100644
index 0000000..38f63ce
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * LocalSyncTask performs streaming between local(coordinator) node and remote 
replica.
+ */
+public class LocalSyncTask extends SyncTask implements StreamEventHandler
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(LocalSyncTask.class);
+
+    private final long repairedAt;
+
+    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, 
long repairedAt)
+    {
+        super(desc, r1, r2);
+        this.repairedAt = repairedAt;
+    }
+
+    /**
+     * Starts sending/receiving our list of differences to/from the remote 
endpoint: creates a callback
+     * that will be called out of band once the streams complete.
+     */
+    protected void startSync(List<Range<Token>> differences)
+    {
+        InetAddress local = FBUtilities.getBroadcastAddress();
+        // We can take anyone of the node as source or destination, however if 
one is localhost, we put at source to avoid a forwarding
+        InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : 
r2.endpoint;
+
+        logger.info(String.format("[repair #%s] Performing streaming repair of 
%d ranges with %s", desc.sessionId, differences.size(), dst));
+        new StreamPlan("Repair", repairedAt, 1).listeners(this)
+                                            .flushBeforeTransfer(true)
+                                            // request ranges from the remote 
node
+                                            .requestRanges(dst, desc.keyspace, 
differences, desc.columnFamily)
+                                            // send ranges to the remote node
+                                            .transferRanges(dst, 
desc.keyspace, differences, desc.columnFamily)
+                                            .execute();
+    }
+
+    public void handleStreamEvent(StreamEvent event) { /* noop */ }
+
+    public void onSuccess(StreamState result)
+    {
+        logger.info(String.format("[repair #%s] Sync complete between %s and 
%s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily));
+        set(stat);
+    }
+
+    public void onFailure(Throwable t)
+    {
+        setException(t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java 
b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
new file mode 100644
index 0000000..ca5c998
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * RemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node
+ * to repair(stream) data with other replica.
+ *
+ * When RemoteSyncTask receives SyncComplete from remote node, task completes.
+ */
+public class RemoteSyncTask extends SyncTask
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(RemoteSyncTask.class);
+
+    public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2)
+    {
+        super(desc, r1, r2);
+    }
+
+    protected void startSync(List<Range<Token>> differences)
+    {
+        InetAddress local = FBUtilities.getBroadcastAddress();
+        SyncRequest request = new SyncRequest(desc, local, r1.endpoint, 
r2.endpoint, differences);
+        logger.info(String.format("[repair #%s] Forwarding streaming repair of 
%d ranges to %s (to be streamed with %s)", desc.sessionId, 
request.ranges.size(), request.src, request.dst));
+        MessagingService.instance().sendOneWay(request.createMessage(), 
request.src);
+    }
+
+    public void syncComplete(boolean success)
+    {
+        if (success)
+        {
+            set(stat);
+        }
+        else
+        {
+            setException(new RepairException(desc, String.format("Sync failed 
between %s and %s", r1.endpoint, r2.endpoint)));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairFuture.java 
b/src/java/org/apache/cassandra/repair/RepairFuture.java
deleted file mode 100644
index 127d873..0000000
--- a/src/java/org/apache/cassandra/repair/RepairFuture.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.concurrent.FutureTask;
-
-public class RepairFuture extends FutureTask<Void>
-{
-    public final RepairSession session;
-
-    public RepairFuture(RepairSession session)
-    {
-        super(session, null);
-        this.session = session;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 8057ed5..b0d17ab 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -18,93 +18,68 @@
 package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * RepairJob runs repair on given ColumnFamily.
  */
-public class RepairJob
+public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
 {
     private static Logger logger = LoggerFactory.getLogger(RepairJob.class);
 
-    public final RepairJobDesc desc;
+    private final RepairSession session;
+    private final RepairJobDesc desc;
     private final boolean isSequential;
-    // first we send tree requests. this tracks the endpoints remaining to 
hear from
-    private final RequestCoordinator<InetAddress> treeRequests;
-    // tree responses are then tracked here
-    private final List<TreeResponse> trees = new ArrayList<>();
-    // once all responses are received, each tree is compared with each other, 
and differencer tasks
-    // are submitted. the job is done when all differencers are complete.
+    private final long repairedAt;
     private final ListeningExecutorService taskExecutor;
-    private final Condition requestsSent = new SimpleCondition();
-    private int gcBefore = -1;
-
-    private volatile boolean failed = false;
-    /* Count down as sync completes */
-    private AtomicInteger waitForSync;
-
-    private final IRepairJobEventListener listener;
 
     /**
      * Create repair job to run on specific columnfamily
+     *
+     * @param session RepairSession that this RepairJob belongs
+     * @param columnFamily name of the ColumnFamily to repair
+     * @param isSequential when true, validation runs sequentially among 
replica
+     * @param taskExecutor Executor to run various repair tasks
      */
-    public RepairJob(IRepairJobEventListener listener,
-                     UUID parentSessionId,
-                     UUID sessionId,
-                     String keyspace,
+    public RepairJob(RepairSession session,
                      String columnFamily,
-                     Range<Token> range,
                      boolean isSequential,
+                     long repairedAt,
                      ListeningExecutorService taskExecutor)
     {
-        this.listener = listener;
-        this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, 
columnFamily, range);
+        this.session = session;
+        this.desc = new RepairJobDesc(session.parentRepairSession, 
session.getId(), session.keyspace, columnFamily, session.getRange());
         this.isSequential = isSequential;
+        this.repairedAt = repairedAt;
         this.taskExecutor = taskExecutor;
-        this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
-        {
-            public void send(InetAddress endpoint)
-            {
-                ValidationRequest request = new ValidationRequest(desc, 
gcBefore);
-                
MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
-            }
-        };
     }
 
     /**
-     * @return true if this job failed
-     */
-    public boolean isFailed()
-    {
-        return failed;
-    }
-
-    /**
-     * Send merkle tree request to every involved neighbor.
+     * Runs repair job.
+     *
+     * This sets up necessary task and runs them on given {@code taskExecutor}.
+     * After submitting all tasks, waits until validation with replica 
completes.
      */
-    public void sendTreeRequests(Collection<InetAddress> endpoints)
+    public void run()
     {
-        // send requests to all nodes
-        List<InetAddress> allEndpoints = new ArrayList<>(endpoints);
+        List<InetAddress> allEndpoints = new ArrayList<>(session.endpoints);
         allEndpoints.add(FBUtilities.getBroadcastAddress());
 
+        ListenableFuture<List<TreeResponse>> validations;
         if (isSequential)
         {
+            // Request snapshot to all replica
             List<ListenableFuture<InetAddress>> snapshotTasks = new 
ArrayList<>(allEndpoints.size());
             for (InetAddress endpoint : allEndpoints)
             {
@@ -112,102 +87,110 @@ public class RepairJob
                 snapshotTasks.add(snapshotTask);
                 taskExecutor.execute(snapshotTask);
             }
+            // When all snapshot complete, send validation requests
             ListenableFuture<List<InetAddress>> allSnapshotTasks = 
Futures.allAsList(snapshotTasks);
-            // Execute send tree request after all snapshot complete
-            Futures.addCallback(allSnapshotTasks, new 
FutureCallback<List<InetAddress>>()
+            validations = Futures.transform(allSnapshotTasks, new 
AsyncFunction<List<InetAddress>, List<TreeResponse>>()
             {
-                public void onSuccess(List<InetAddress> endpoints)
+                public ListenableFuture<List<TreeResponse>> 
apply(List<InetAddress> endpoints) throws Exception
                 {
-                    sendTreeRequestsInternal(endpoints);
-                }
-
-                public void onFailure(Throwable throwable)
-                {
-                    // TODO need to propagate error to RepairSession
-                    logger.error("Error occurred during snapshot phase", 
throwable);
-                    listener.failedSnapshot();
-                    failed = true;
+                    return sendValidationRequest(endpoints);
                 }
             }, taskExecutor);
         }
         else
         {
-            sendTreeRequestsInternal(allEndpoints);
+            // If not sequential, just send validation request to all replica
+            validations = sendValidationRequest(allEndpoints);
         }
-    }
 
-    private void sendTreeRequestsInternal(Collection<InetAddress> endpoints)
-    {
-        this.gcBefore = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
-        for (InetAddress endpoint : endpoints)
-            treeRequests.add(endpoint);
+        // When all validations complete, submit sync tasks
+        ListenableFuture<List<SyncStat>> syncResults = 
Futures.transform(validations, new AsyncFunction<List<TreeResponse>, 
List<SyncStat>>()
+        {
+            public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> 
trees) throws Exception
+            {
+                // Unregister from FailureDetector once we've completed 
synchronizing Merkle trees.
+                // After this point, we rely on tcp_keepalive for individual 
sockets to notify us when a connection is down.
+                // See CASSANDRA-3569
+                
FailureDetector.instance.unregisterFailureDetectionEventListener(session);
 
-        logger.info(String.format("[repair #%s] requesting merkle trees for %s 
(to %s)", desc.sessionId, desc.columnFamily, endpoints));
-        treeRequests.start();
-        requestsSent.signalAll();
-    }
+                InetAddress local = FBUtilities.getLocalAddress();
 
-    /**
-     * Add a new received tree and return the number of remaining tree to
-     * be received for the job to be complete.
-     *
-     * Callers may assume exactly one addTree call will result in zero 
remaining endpoints.
-     *
-     * @param endpoint address of the endpoint that sent response
-     * @param tree sent Merkle tree or null if validation failed on endpoint
-     * @return the number of responses waiting to receive
-     */
-    public synchronized int addTree(InetAddress endpoint, MerkleTree tree)
-    {
-        // Wait for all request to have been performed (see #3400)
-        try
-        {
-            requestsSent.await();
-        }
-        catch (InterruptedException e)
+                List<SyncTask> syncTasks = new ArrayList<>();
+                // We need to difference all trees one against another
+                for (int i = 0; i < trees.size() - 1; ++i)
+                {
+                    TreeResponse r1 = trees.get(i);
+                    for (int j = i + 1; j < trees.size(); ++j)
+                    {
+                        TreeResponse r2 = trees.get(j);
+                        SyncTask task;
+                        if (r1.endpoint.equals(local) || 
r2.endpoint.equals(local))
+                        {
+                            task = new LocalSyncTask(desc, r1, r2, repairedAt);
+                        }
+                        else
+                        {
+                            task = new RemoteSyncTask(desc, r1, r2);
+                            // RemoteSyncTask expects SyncComplete message 
sent back.
+                            // Register task to RepairSession to receive 
response.
+                            session.waitForSync(Pair.create(desc, new 
NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
+                        }
+                        syncTasks.add(task);
+                        taskExecutor.submit(task);
+                    }
+                }
+                return Futures.allAsList(syncTasks);
+            }
+        }, taskExecutor);
+
+        // When all sync complete, set the final result
+        Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>()
         {
-            throw new AssertionError("Interrupted while waiting for requests 
to be sent");
-        }
+            public void onSuccess(List<SyncStat> stats)
+            {
+                logger.info(String.format("[repair #%s] %s is fully synced", 
session.getId(), desc.columnFamily));
+                set(new RepairResult(desc, stats));
+            }
 
-        if (tree == null)
-            failed = true;
-        else
-            trees.add(new TreeResponse(endpoint, tree));
-        return treeRequests.completed(endpoint);
+            /**
+             * Snapshot, validation and sync failures are all handled here
+             */
+            public void onFailure(Throwable t)
+            {
+                logger.warn(String.format("[repair #%s] %s sync failed", 
session.getId(), desc.columnFamily));
+                setException(t);
+            }
+        }, taskExecutor);
+
+        // Wait for validation to complete
+        Futures.getUnchecked(validations);
     }
 
     /**
-     * Submit differencers for running.
-     * All tree *must* have been received before this is called.
+     * Creates {@link ValidationTask} and submit them to task executor.
+     * If isSequential flag is true, wait previous ValidationTask to complete 
before submitting the next.
+     *
+     * @param endpoints Endpoint addresses to send validation request
+     * @return Future that can get all {@link TreeResponse} from replica, if 
all validation succeed.
      */
-    public void submitDifferencers()
+    private ListenableFuture<List<TreeResponse>> 
sendValidationRequest(Collection<InetAddress> endpoints)
     {
-        assert !failed;
-        List<Differencer> differencers = new ArrayList<>();
-        // We need to difference all trees one against another
-        for (int i = 0; i < trees.size() - 1; ++i)
+        logger.info(String.format("[repair #%s] requesting merkle trees for %s 
(to %s)", desc.sessionId, desc.columnFamily, endpoints));
+        int gcBefore = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+        List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
+        for (InetAddress endpoint : endpoints)
         {
-            TreeResponse r1 = trees.get(i);
-            for (int j = i + 1; j < trees.size(); ++j)
+            ValidationTask task = new ValidationTask(desc, endpoint, gcBefore);
+            tasks.add(task);
+            session.waitForValidation(Pair.create(desc, endpoint), task);
+            taskExecutor.execute(task);
+            if (isSequential)
             {
-                TreeResponse r2 = trees.get(j);
-                Differencer differencer = new Differencer(desc, r1, r2);
-                differencers.add(differencer);
-                logger.debug("Queueing comparison {}", differencer);
+                // tasks are sequentially sent so wait until current 
validation is done.
+                // NOTE: Wait happens on taskExecutor thread
+                Futures.getUnchecked(task);
             }
         }
-        waitForSync = new AtomicInteger(differencers.size());
-        for (Differencer differencer : differencers)
-            taskExecutor.submit(differencer);
-
-        trees.clear(); // allows gc to do its thing
-    }
-
-    /**
-     * @return true if the given node pair was the last remaining
-     */
-    boolean completedSynchronization()
-    {
-        return waitForSync.decrementAndGet() == 0;
+        return Futures.allAsList(tasks);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 54117a3..04a27af 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -103,7 +103,12 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
             case SYNC_REQUEST:
                 // forwarded sync request
                 SyncRequest request = (SyncRequest) message.payload;
-                StreamingRepairTask task = new StreamingRepairTask(desc, 
request);
+
+                long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
+                if (desc.parentSessionId != null && 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != 
null)
+                    repairedAt = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
+
+                StreamingRepairTask task = new StreamingRepairTask(desc, 
request, repairedAt);
                 task.run();
                 break;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairResult.java 
b/src/java/org/apache/cassandra/repair/RepairResult.java
new file mode 100644
index 0000000..259d5f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairResult.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.List;
+
+public class RepairResult
+{
+    public final RepairJobDesc desc;
+    public final List<SyncStat> stats;
+
+    public RepairResult(RepairJobDesc desc, List<SyncStat> stats)
+    {
+        this.desc = desc;
+        this.stats = stats;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 9274342..240a21c 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -21,13 +21,12 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,28 +34,30 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.gms.*;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Coordinates the (active) repair of a token range.
  *
  * A given RepairSession repairs a set of replicas for a given range on a list
  * of column families. For each of the column family to repair, RepairSession
- * creates a RepairJob that handles the repair of that CF.
+ * creates a {@link RepairJob} that handles the repair of that CF.
  *
  * A given RepairJob has the 2 main phases:
- *   1. Validation phase: the job requests merkle trees from each of the 
replica involves
- *      (RepairJob.sendTreeRequests()) and waits until all trees are received 
(in
+ * <ol>
+ *   <li>Validation phase: the job requests merkle trees from each of the 
replica involves
+ *      ({@link org.apache.cassandra.repair.ValidationTask}) and waits until 
all trees are received (in
  *      validationComplete()).
- *   2. Synchonization phase: once all trees are received, the job compares 
each tree with
- *      all the other using a so-called Differencer (started by 
submitDifferencers()). If
- *      differences there is between 2 trees, the concerned Differencer will 
start a streaming
- *      of the difference between the 2 endpoint concerned 
(Differencer.performStreamingRepair).
- * The job is done once all its Differencer are done (i.e. have either 
computed no differences
+ *   </li>
+ *   <li>Synchronization phase: once all trees are received, the job compares 
each tree with
+ *      all the other using a so-called {@link SyncTask}. If there is 
difference between 2 trees, the
+ *      concerned SyncTask will start a streaming of the difference between 
the 2 endpoint concerned.
+ *   </li>
+ * </ol>
+ * The job is done once all its SyncTasks are done (i.e. have either computed 
no differences
  * or the streaming they started is done (syncComplete())).
  *
  * A given session will execute the first phase (validation phase) of each of 
it's job
@@ -71,15 +72,15 @@ import 
org.apache.cassandra.utils.concurrent.SimpleCondition;
  * we still first send a message to each node to flush and snapshot data so 
each merkle tree
  * creation is still done on similar data, even if the actual creation is not
  * done simulatneously). If not sequential, all merkle tree are requested in 
parallel.
- * Similarly, if a job is sequential, it will handle one Differencer at a 
time, but will handle
+ * Similarly, if a job is sequential, it will handle one SyncTask at a time, 
but will handle
  * all of them in parallel otherwise.
  */
-public class RepairSession extends WrappedRunnable implements 
IEndpointStateChangeSubscriber,
-                                                              
IFailureDetectionEventListener,
-                                                              
IRepairJobEventListener
+public class RepairSession extends AbstractFuture<List<RepairResult>> 
implements IEndpointStateChangeSubscriber,
+                                                                               
  IFailureDetectionEventListener
 {
     private static Logger logger = 
LoggerFactory.getLogger(RepairSession.class);
 
+    public final UUID parentRepairSession;
     /** Repair session ID */
     private final UUID id;
     public final String keyspace;
@@ -88,25 +89,18 @@ public class RepairSession extends WrappedRunnable 
implements IEndpointStateChan
     /** Range to repair */
     public final Range<Token> range;
     public final Set<InetAddress> endpoints;
+    private final long repairedAt;
 
-    private volatile Exception exception;
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
-    private final AtomicBoolean fdUnregistered = new AtomicBoolean(false);
-
-    // First, all RepairJobs are added to this queue,
-    final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<>();
 
-    // and after receiving all validation, the job is moved to
-    // this map, keyed by CF name.
-    final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
+    // Each validation task waits response from replica in validating 
ConcurrentMap (keyed by CF name and endpoint address)
+    private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, 
ValidationTask> validating = new ConcurrentHashMap<>();
+    // Remote syncing jobs wait response in syncingTasks map
+    private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> 
syncingTasks = new ConcurrentHashMap<>();
 
     // Tasks(snapshot, validate request, differencing, ...) are run on 
taskExecutor
     private final ListeningExecutorService taskExecutor = 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new 
NamedThreadFactory("RepairJobTask")));
 
-    private final SimpleCondition completed = new SimpleCondition();
-    public final Condition differencingDone = new SimpleCondition();
-    public final UUID parentRepairSession;
-
     private volatile boolean terminated = false;
 
     /**
@@ -118,21 +112,25 @@ public class RepairSession extends WrappedRunnable 
implements IEndpointStateChan
      * @param endpoints the data centers that should be part of the repair; 
null for all DCs
      * @param cfnames names of columnfamilies
      */
-    public RepairSession(UUID parentRepairSession, Range<Token> range, String 
keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames)
+    public RepairSession(UUID parentRepairSession,
+                         UUID id,
+                         Range<Token> range,
+                         String keyspace,
+                         boolean isSequential,
+                         Set<InetAddress> endpoints,
+                         long repairedAt,
+                         String... cfnames)
     {
-        this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, 
isSequential, endpoints, cfnames);
-    }
+        assert cfnames.length > 0 : "Repairing no column families seems 
pointless, doesn't it";
 
-    public RepairSession(UUID parentRepairSession, UUID id, Range<Token> 
range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, 
String[] cfnames)
-    {
         this.parentRepairSession = parentRepairSession;
         this.id = id;
         this.isSequential = isSequential;
         this.keyspace = keyspace;
         this.cfnames = cfnames;
-        assert cfnames.length > 0 : "Repairing no column families seems 
pointless, doesn't it";
         this.range = range;
         this.endpoints = endpoints;
+        this.repairedAt = repairedAt;
     }
 
     public UUID getId()
@@ -145,6 +143,16 @@ public class RepairSession extends WrappedRunnable 
implements IEndpointStateChan
         return range;
     }
 
+    public void waitForValidation(Pair<RepairJobDesc, InetAddress> key, 
ValidationTask task)
+    {
+        validating.put(key, task);
+    }
+
+    public void waitForSync(Pair<RepairJobDesc, NodePair> key, RemoteSyncTask 
task)
+    {
+        syncingTasks.put(key, task);
+    }
+
     /**
      * Receive merkle tree response or failed response from {@code endpoint} 
for current repair job.
      *
@@ -154,52 +162,15 @@ public class RepairSession extends WrappedRunnable 
implements IEndpointStateChan
      */
     public void validationComplete(RepairJobDesc desc, InetAddress endpoint, 
MerkleTree tree)
     {
-        RepairJob job = jobs.peek();
-        if (job == null)
+        ValidationTask task = validating.remove(Pair.create(desc, endpoint));
+        if (task == null)
         {
             assert terminated;
             return;
         }
 
-        if (tree == null)
-        {
-            exception = new RepairException(desc, "Validation failed in " + 
endpoint);
-            forceShutdown();
-            return;
-        }
-
         logger.info(String.format("[repair #%s] Received merkle tree for %s 
from %s", getId(), desc.columnFamily, endpoint));
-
-        assert job.desc.equals(desc);
-        if (job.addTree(endpoint, tree) == 0)
-        {
-            logger.debug("All responses received for {}/{}", getId(), 
desc.columnFamily);
-            if (!job.isFailed())
-            {
-                syncingJobs.put(job.desc.columnFamily, job);
-                job.submitDifferencers();
-            }
-
-            // This job is complete, switching to next in line (note that only 
one thread will ever do this)
-            jobs.poll();
-            RepairJob nextJob = jobs.peek();
-            if (nextJob == null)
-            {
-                // Unregister from FailureDetector once we've completed 
synchronizing Merkle trees.
-                // After this point, we rely on tcp_keepalive for individual 
sockets to notify us when a connection is down.
-                // See CASSANDRA-3569
-                if (fdUnregistered.compareAndSet(false, true))
-                    
FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-
-                // We are done with this repair session as far as differencing
-                // is considered. Just inform the session
-                differencingDone.signalAll();
-            }
-            else
-            {
-                nextJob.sendTreeRequests(endpoints);
-            }
-        }
+        task.treeReceived(tree);
     }
 
     /**
@@ -211,38 +182,15 @@ public class RepairSession extends WrappedRunnable 
implements IEndpointStateChan
      */
     public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean 
success)
     {
-        RepairJob job = syncingJobs.get(desc.columnFamily);
-        if (job == null)
+        RemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
+        if (task == null)
         {
             assert terminated;
             return;
         }
 
-        if (!success)
-        {
-            exception = new RepairException(desc, String.format("Sync failed 
between %s and %s", nodes.endpoint1, nodes.endpoint2));
-            forceShutdown();
-            return;
-        }
-
         logger.debug(String.format("[repair #%s] Repair completed between %s 
and %s on %s", getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily));
-
-        if (job.completedSynchronization())
-        {
-            RepairJob completedJob = syncingJobs.remove(job.desc.columnFamily);
-            String remaining = syncingJobs.size() == 0 ? "" : String.format(" 
(%d remaining table to sync for this session)", syncingJobs.size());
-            if (completedJob != null && completedJob.isFailed())
-                logger.warn(String.format("[repair #%s] %s sync failed%s", 
getId(), desc.columnFamily, remaining));
-            else
-                logger.info(String.format("[repair #%s] %s is fully synced%s", 
getId(), desc.columnFamily, remaining));
-
-            if (jobs.isEmpty() && syncingJobs.isEmpty())
-            {
-                taskExecutor.shutdown();
-                // this repair session is completed
-                completed.signalAll();
-            }
-        }
+        task.syncComplete(success);
     }
 
     private String repairedNodes()
@@ -254,15 +202,25 @@ public class RepairSession extends WrappedRunnable 
implements IEndpointStateChan
         return sb.toString();
     }
 
-    // we don't care about the return value but care about it throwing 
exception
-    public void runMayThrow() throws Exception
+    /**
+     * Start RepairJob on given ColumnFamilies.
+     *
+     * This first validates if all replica are available, and if they are,
+     * creates RepairJobs and submit to run on given executor.
+     *
+     * @param executor Executor to run validation
+     */
+    public void start(ListeningExecutorService executor)
     {
+        if (terminated)
+            return;
+
         logger.info(String.format("[repair #%s] new session: will sync %s on 
range %s for %s.%s", getId(), repairedNodes(), range, keyspace, 
Arrays.toString(cfnames)));
 
         if (endpoints.isEmpty())
         {
-            differencingDone.signalAll();
             logger.info(String.format("[repair #%s] No neighbors to repair 
with on range %s: session completed", getId(), range));
+            set(Lists.<RepairResult>newArrayList());
             return;
         }
 
@@ -272,85 +230,59 @@ public class RepairSession extends WrappedRunnable 
implements IEndpointStateChan
             if (!FailureDetector.instance.isAlive(endpoint))
             {
                 String message = String.format("Cannot proceed on repair 
because a neighbor (%s) is dead: session failed", endpoint);
-                differencingDone.signalAll();
                 logger.error("[repair #{}] {}", getId(), message);
-                throw new IOException(message);
+                setException(new IOException(message));
+                return;
             }
         }
 
-        ActiveRepairService.instance.addToActiveSessions(this);
-        try
+        // Create and submit RepairJob for each ColumnFamily
+        List<ListenableFuture<RepairResult>> jobs = new 
ArrayList<>(cfnames.length);
+        for (String cfname : cfnames)
         {
-            // Create and queue a RepairJob for each column family
-            for (String cfname : cfnames)
-            {
-                RepairJob job = new RepairJob(this, parentRepairSession, id, 
keyspace, cfname, range, isSequential, taskExecutor);
-                jobs.offer(job);
-            }
-            logger.debug("Sending tree requests to endpoints {}", endpoints);
-            jobs.peek().sendTreeRequests(endpoints);
-
-            // block whatever thread started this session until all requests 
have been returned:
-            // if this thread dies, the session will still complete in the 
background
-            completed.await();
+            RepairJob job = new RepairJob(this, cfname, isSequential, 
repairedAt, taskExecutor);
+            executor.execute(job);
+            jobs.add(job);
+        }
 
-            if (exception == null)
+        // When all RepairJobs are done without error, cleanup and set the 
final result
+        Futures.addCallback(Futures.allAsList(jobs), new 
FutureCallback<List<RepairResult>>()
+        {
+            public void onSuccess(List<RepairResult> results)
             {
+                // this repair session is completed
                 logger.info(String.format("[repair #%s] session completed 
successfully", getId()));
+                set(results);
+                taskExecutor.shutdown();
+                // mark this session as terminated
+                terminate();
             }
-            else
+
+            public void onFailure(Throwable t)
             {
-                logger.error(String.format("[repair #%s] session completed 
with the following error", getId()), exception);
-                throw exception;
+                logger.error("Repair job failed", t);
+                setException(t);
             }
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException("Interrupted while waiting for 
repair.");
-        }
-        finally
-        {
-            // mark this session as terminated
-            terminate();
-
-            ActiveRepairService.instance.removeFromActiveSessions(this);
-
-            // If we've reached here in an exception state without completing 
Merkle Tree sync, we'll still be registered
-            // with the FailureDetector.
-            if (fdUnregistered.compareAndSet(false, true))
-                
FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-        }
+        });
     }
 
     public void terminate()
     {
         terminated = true;
-        jobs.clear();
-        syncingJobs.clear();
+        validating.clear();
+        syncingTasks.clear();
     }
 
     /**
      * clear all RepairJobs and terminate this session.
+     *
+     * @param reason Cause of error for shutdown
      */
-    public void forceShutdown()
+    public void forceShutdown(Throwable reason)
     {
+        setException(reason);
         taskExecutor.shutdownNow();
-        differencingDone.signalAll();
-        completed.signalAll();
-    }
-
-    public void failedSnapshot()
-    {
-        exception = new IOException("Failed during snapshot creation.");
-        forceShutdown();
-    }
-
-    void failedNode(InetAddress remote)
-    {
-        String errorMsg = String.format("Endpoint %s died", remote);
-        exception = new IOException(errorMsg);
-        // If a node failed during Merkle creation, we stop everything (though 
there could still be some activity in the background)
-        forceShutdown();
+        terminate();
     }
 
     public void onJoin(InetAddress endpoint, EndpointState epState) {}
@@ -383,6 +315,9 @@ public class RepairSession extends WrappedRunnable 
implements IEndpointStateChan
         if (!isFailed.compareAndSet(false, true))
             return;
 
-        failedNode(endpoint);
+        Exception exception = new IOException(String.format("Endpoint %s 
died", endpoint));
+        logger.error(String.format("[repair #%s] session completed with the 
following error", getId()), exception);
+        // If a node failed, we stop everything (though there could still be 
some activity in the background)
+        forceShutdown(exception);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RequestCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RequestCoordinator.java 
b/src/java/org/apache/cassandra/repair/RequestCoordinator.java
deleted file mode 100644
index ed089ef..0000000
--- a/src/java/org/apache/cassandra/repair/RequestCoordinator.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-/**
-*/
-public abstract class RequestCoordinator<R>
-{
-    private final Order<R> orderer;
-
-    public RequestCoordinator(boolean isSequential)
-    {
-        this.orderer = isSequential ? new SequentialOrder(this) : new 
ParallelOrder(this);
-    }
-
-    public abstract void send(R request);
-
-    public void add(R request)
-    {
-        orderer.add(request);
-    }
-
-    public void start()
-    {
-        orderer.start();
-    }
-
-    // Returns how many request remains
-    public int completed(R request)
-    {
-        return orderer.completed(request);
-    }
-
-    private static abstract class Order<R>
-    {
-        protected final RequestCoordinator<R> coordinator;
-
-        Order(RequestCoordinator<R> coordinator)
-        {
-            this.coordinator = coordinator;
-        }
-
-        public abstract void add(R request);
-        public abstract void start();
-        public abstract int completed(R request);
-    }
-
-    private static class SequentialOrder<R> extends Order<R>
-    {
-        private final Queue<R> requests = new LinkedList<>();
-
-        SequentialOrder(RequestCoordinator<R> coordinator)
-        {
-            super(coordinator);
-        }
-
-        public void add(R request)
-        {
-            requests.add(request);
-        }
-
-        public void start()
-        {
-            if (requests.isEmpty())
-                return;
-
-            coordinator.send(requests.peek());
-        }
-
-        public int completed(R request)
-        {
-            assert request.equals(requests.peek());
-            requests.poll();
-            int remaining = requests.size();
-            if (remaining != 0)
-                coordinator.send(requests.peek());
-            return remaining;
-        }
-    }
-
-    private static class ParallelOrder<R> extends Order<R>
-    {
-        private final Set<R> requests = new HashSet<>();
-
-        ParallelOrder(RequestCoordinator<R> coordinator)
-        {
-            super(coordinator);
-        }
-
-        public void add(R request)
-        {
-            requests.add(request);
-        }
-
-        public void start()
-        {
-            for (R request : requests)
-                coordinator.send(request);
-        }
-
-        public int completed(R request)
-        {
-            requests.remove(request);
-            return requests.size();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java 
b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 6c3afb1..a87643c 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -74,6 +74,7 @@ public class SnapshotTask extends AbstractFuture<InetAddress> 
implements Runnabl
 
         public void onFailure(InetAddress from)
         {
+            //listener.failedSnapshot();
             task.setException(new RuntimeException("Could not create snapshot 
at " + from));
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 9af949d..f30eb6f 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -23,59 +23,40 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.repair.messages.SyncRequest;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
 
 /**
- * Task that make two nodes exchange (stream) some ranges (for a given 
table/cf).
- * This handle the case where the local node is neither of the two nodes that
- * must stream their range, and allow to register a callback to be called on
- * completion.
+ * StreamingRepairTask performs data streaming between two remote replica 
which neither is not repair coordinator.
+ * Task will send {@link SyncComplete} message back to coordinator upon 
streaming completion.
  */
 public class StreamingRepairTask implements Runnable, StreamEventHandler
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamingRepairTask.class);
 
-    /** Repair session ID that this streaming task belongs */
-    public final RepairJobDesc desc;
-    public final SyncRequest request;
+    private final RepairJobDesc desc;
+    private final SyncRequest request;
+    private final long repairedAt;
 
-    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request)
+    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, long 
repairedAt)
     {
         this.desc = desc;
         this.request = request;
+        this.repairedAt = repairedAt;
     }
 
     public void run()
     {
-        if (request.src.equals(FBUtilities.getBroadcastAddress()))
-            initiateStreaming();
-        else
-            forwardToSource();
-    }
-
-    private void initiateStreaming()
-    {
-        long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
-        if (desc.parentSessionId != null && 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != 
null)
-            repairedAt = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
-
         logger.info(String.format("[streaming task #%s] Performing streaming 
repair of %d ranges with %s", desc.sessionId, request.ranges.size(), 
request.dst));
-        StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1)
-                                    .flushBeforeTransfer(true)
-                                    // request ranges from the remote node
-                                    .requestRanges(request.dst, desc.keyspace, 
request.ranges, desc.columnFamily)
-                                    // send ranges to the remote node
-                                    .transferRanges(request.dst, 
desc.keyspace, request.ranges, desc.columnFamily)
-                                    .execute();
-        op.addEventListener(this);
-    }
-
-    private void forwardToSource()
-    {
-        logger.info(String.format("[repair #%s] Forwarding streaming repair of 
%d ranges to %s (to be streamed with %s)", desc.sessionId, 
request.ranges.size(), request.src, request.dst));
-        MessagingService.instance().sendOneWay(request.createMessage(), 
request.src);
+        new StreamPlan("Repair", repairedAt, 1).listeners(this)
+                                            .flushBeforeTransfer(true)
+                                            // request ranges from the remote 
node
+                                            .requestRanges(request.dst, 
desc.keyspace, request.ranges, desc.columnFamily)
+                                            // send ranges to the remote node
+                                            .transferRanges(request.dst, 
desc.keyspace, request.ranges, desc.columnFamily)
+                                            .execute();
     }
 
     public void handleStreamEvent(StreamEvent event)
@@ -85,7 +66,7 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
     }
 
     /**
-     * If we succeeded on both stream in and out, reply back to the initiator.
+     * If we succeeded on both stream in and out, reply back to coordinator
      */
     public void onSuccess(StreamState state)
     {
@@ -94,7 +75,7 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
     }
 
     /**
-     * If we failed on either stream in or out, reply fail to the initiator.
+     * If we failed on either stream in or out, reply fail to coordinator
      */
     public void onFailure(Throwable t)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/SyncStat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncStat.java 
b/src/java/org/apache/cassandra/repair/SyncStat.java
new file mode 100644
index 0000000..5721a20
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SyncStat.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+/**
+ * Statistics about synchronizing two replica
+ */
+public class SyncStat
+{
+    public final NodePair nodes;
+    public final long numberOfDifferences;
+
+    public SyncStat(NodePair nodes, long numberOfDifferences)
+    {
+        this.nodes = nodes;
+        this.numberOfDifferences = numberOfDifferences;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java 
b/src/java/org/apache/cassandra/repair/SyncTask.java
new file mode 100644
index 0000000..3ce5532
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * SyncTask will calculate the difference of MerkleTree between two nodes
+ * and perform necessary operation to repair replica.
+ */
+public abstract class SyncTask extends AbstractFuture<SyncStat> implements 
Runnable
+{
+    private static Logger logger = LoggerFactory.getLogger(SyncTask.class);
+
+    protected final RepairJobDesc desc;
+    protected final TreeResponse r1;
+    protected final TreeResponse r2;
+
+    protected volatile SyncStat stat;
+
+    public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2)
+    {
+        this.desc = desc;
+        this.r1 = r1;
+        this.r2 = r2;
+    }
+
+    /**
+     * Compares trees, and triggers repairs for any ranges that mismatch.
+     */
+    public void run()
+    {
+        // compare trees, and collect differences
+        List<Range<Token>> differences = new ArrayList<>();
+        differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
+
+        stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), 
differences.size());
+
+        // choose a repair method based on the significance of the difference
+        String format = String.format("[repair #%s] Endpoints %s and %s %%s 
for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
+        if (differences.isEmpty())
+        {
+            logger.info(String.format(format, "are consistent"));
+            set(stat);
+            return;
+        }
+
+        // non-0 difference: perform streaming repair
+        logger.info(String.format(format, "have " + differences.size() + " 
range(s) out of sync"));
+        startSync(differences);
+    }
+
+    public SyncStat getCurrentStat()
+    {
+        return stat;
+    }
+
+    protected abstract void startSync(List<Range<Token>> differences);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/ValidationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java 
b/src/java/org/apache/cassandra/repair/ValidationTask.java
new file mode 100644
index 0000000..a52ec4f
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * ValidationTask sends {@link ValidationRequest} to a replica.
+ * When a replica sends back message, task completes.
+ */
+public class ValidationTask extends AbstractFuture<TreeResponse> implements 
Runnable
+{
+    private final RepairJobDesc desc;
+    private final InetAddress endpoint;
+    private final int gcBefore;
+
+    public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int 
gcBefore)
+    {
+        this.desc = desc;
+        this.endpoint = endpoint;
+        this.gcBefore = gcBefore;
+    }
+
+    /**
+     * Send ValidationRequest to replica
+     */
+    public void run()
+    {
+        ValidationRequest request = new ValidationRequest(desc, gcBefore);
+        MessagingService.instance().sendOneWay(request.createMessage(), 
endpoint);
+    }
+
+    /**
+     * Receive MerkleTree from replica node.
+     *
+     * @param tree MerkleTree that is sent from replica. Null if validation 
failed on replica node.
+     */
+    public void treeReceived(MerkleTree tree)
+    {
+        if (tree == null)
+        {
+            setException(new RepairException(desc, "Validation failed in " + 
endpoint));
+        }
+        else
+        {
+            set(new TreeResponse(endpoint, tree));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java 
b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
new file mode 100644
index 0000000..ca02365
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Repair options.
+ */
+public class RepairOption
+{
+    public static final String SEQUENTIAL_KEY = "sequential";
+    public static final String PRIMARY_RANGE_KEY = "primaryRange";
+    public static final String INCREMENTAL_KEY = "incremental";
+    public static final String JOB_THREADS_KEY = "jobThreads";
+    public static final String RANGES_KEY = "ranges";
+    public static final String COLUMNFAMILIES_KEY = "columnFamilies";
+    public static final String DATACENTERS_KEY = "dataCenters";
+    public static final String HOSTS_KEY = "hosts";
+
+    // we don't want to push nodes too much for repair
+    public static final int MAX_JOB_THREADS = 4;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RepairOption.class);
+
+    /**
+     * Construct RepairOptions object from given map of Strings.
+     * <p>
+     * Available options are:
+     *
+     * <table>
+     *     <thead>
+     *         <tr>
+     *             <th>key</th>
+     *             <th>value</th>
+     *             <th>default (when key not given)</th>
+     *         </tr>
+     *     </thead>
+     *     <tbody>
+     *         <tr>
+     *             <td>sequential</td>
+     *             <td>"true" if perform sequential repair.</td>
+     *             <td>true</td>
+     *         </tr>
+     *         <tr>
+     *             <td>primaryRange</td>
+     *             <td>"true" if perform repair only on primary range.</td>
+     *             <td>false</td>
+     *         </tr>
+     *         <tr>
+     *             <td>incremental</td>
+     *             <td>"true" if perform incremental repair.</td>
+     *             <td>false</td>
+     *         </tr>
+     *         <tr>
+     *             <td>jobThreads</td>
+     *             <td>Number of threads to use to run repair job.</td>
+     *             <td>1</td>
+     *         </tr>
+     *         <tr>
+     *             <td>ranges</td>
+     *             <td>Ranges to repair. A range is expressed as &lt;start 
token&gt;:&lt;end token&gt;
+     *             and multiple ranges can be given as comma separated 
ranges(e.g. aaa:bbb,ccc:ddd).</td>
+     *             <td></td>
+     *         </tr>
+     *         <tr>
+     *             <td>columnFamilies</td>
+     *             <td>Specify names of ColumnFamilies to repair.
+     *             Multiple ColumnFamilies can be given as comma separated 
values(e.g. cf1,cf2,cf3).</td>
+     *             <td></td>
+     *         </tr>
+     *         <tr>
+     *             <td>dataCenters</td>
+     *             <td>Specify names of data centers who participate in this 
repair.
+     *             Multiple data centers can be given as comma separated 
values(e.g. dc1,dc2,dc3).</td>
+     *             <td></td>
+     *         </tr>
+     *         <tr>
+     *             <td>hosts</td>
+     *             <td>Specify names of hosts who participate in this repair.
+     *             Multiple hosts can be given as comma separated values(e.g. 
cass1,cass2).</td>
+     *             <td></td>
+     *         </tr>
+     *     </tbody>
+     * </table>
+     *
+     * @param options options to parse
+     * @param partitioner partitioner is used to construct token ranges
+     * @return RepairOptions object
+     */
+    public static RepairOption parse(Map<String, String> options, IPartitioner 
partitioner)
+    {
+        boolean sequential = !options.containsKey(SEQUENTIAL_KEY) || 
Boolean.parseBoolean(options.get(SEQUENTIAL_KEY));
+        boolean primaryRange = 
Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
+        boolean incremental = 
Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
+
+        int jobThreads = 1;
+        if (options.containsKey(JOB_THREADS_KEY))
+        {
+            try
+            {
+                jobThreads = Integer.parseInt(options.get(JOB_THREADS_KEY));
+            }
+            catch (NumberFormatException ignore) {}
+        }
+        // ranges
+        String rangesStr = options.get(RANGES_KEY);
+        Set<Range<Token>> ranges = new HashSet<>();
+        if (rangesStr != null)
+        {
+            StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
+            while (tokenizer.hasMoreTokens())
+            {
+                String[] rangeStr = tokenizer.nextToken().split(":", 2);
+                if (rangeStr.length < 2)
+                {
+                    continue;
+                }
+                Token parsedBeginToken = 
partitioner.getTokenFactory().fromString(rangeStr[0].trim());
+                Token parsedEndToken = 
partitioner.getTokenFactory().fromString(rangeStr[1].trim());
+                ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
+            }
+        }
+
+        RepairOption option = new RepairOption(sequential, primaryRange, 
incremental, jobThreads, ranges);
+
+        // data centers
+        String dataCentersStr = options.get(DATACENTERS_KEY);
+        Collection<String> dataCenters = new HashSet<>();
+        if (dataCentersStr != null)
+        {
+            StringTokenizer tokenizer = new StringTokenizer(dataCentersStr, 
",");
+            while (tokenizer.hasMoreTokens())
+            {
+                dataCenters.add(tokenizer.nextToken().trim());
+            }
+        }
+        option.getDataCenters().addAll(dataCenters);
+
+        // hosts
+        String hostsStr = options.get(HOSTS_KEY);
+        Collection<String> hosts = new HashSet<>();
+        if (hostsStr != null)
+        {
+            StringTokenizer tokenizer = new StringTokenizer(hostsStr, ",");
+            while (tokenizer.hasMoreTokens())
+            {
+                hosts.add(tokenizer.nextToken().trim());
+            }
+        }
+        option.getHosts().addAll(hosts);
+
+        // columnfamilies
+        String cfStr = options.get(COLUMNFAMILIES_KEY);
+        Collection<String> columnFamilies = new HashSet<>();
+        if (cfStr != null)
+        {
+            StringTokenizer tokenizer = new StringTokenizer(cfStr, ",");
+            while (tokenizer.hasMoreTokens())
+            {
+                columnFamilies.add(tokenizer.nextToken().trim());
+            }
+        }
+        option.getColumnFamilies().addAll(columnFamilies);
+
+        // validate options
+        if (jobThreads > MAX_JOB_THREADS)
+        {
+            throw new IllegalArgumentException("Too many job threads. Max is " 
+ MAX_JOB_THREADS);
+        }
+        if (primaryRange && (!dataCenters.isEmpty() || !hosts.isEmpty()))
+        {
+            throw new IllegalArgumentException("You need to run primary range 
repair on all nodes in the cluster.");
+        }
+
+        return option;
+    }
+
+    private final boolean sequential;
+    private final boolean primaryRange;
+    private final boolean incremental;
+    private final int jobThreads;
+
+    private final Collection<String> columnFamilies = new HashSet<>();
+    private final Collection<String> dataCenters = new HashSet<>();
+    private final Collection<String> hosts = new HashSet<>();
+    private final Collection<Range<Token>> ranges = new HashSet<>();
+
+    public RepairOption(boolean sequential, boolean primaryRange, boolean 
incremental, int jobThreads, Collection<Range<Token>> ranges)
+    {
+        if (sequential && incremental)
+        {
+            String message = "It is not possible to mix sequential repair and 
incremental repairs.";
+            logger.error(message);
+            throw new IllegalArgumentException(message);
+        }
+
+        if (!FBUtilities.isUnix() && sequential)
+        {
+            logger.warn("Snapshot-based repair is not yet supported on 
Windows.  Reverting to parallel repair.");
+            this.sequential = false;
+        }
+        else
+        {
+            this.sequential = sequential;
+        }
+        this.primaryRange = primaryRange;
+        this.incremental = incremental;
+        this.jobThreads = jobThreads;
+        this.ranges.addAll(ranges);
+    }
+
+    public boolean isSequential()
+    {
+        return sequential;
+    }
+
+    public boolean isPrimaryRange()
+    {
+        return primaryRange;
+    }
+
+    public boolean isIncremental()
+    {
+        return incremental;
+    }
+
+    public int getJobThreads()
+    {
+        return jobThreads;
+    }
+
+    public Collection<String> getColumnFamilies()
+    {
+        return columnFamilies;
+    }
+
+    public Collection<Range<Token>> getRanges()
+    {
+        return ranges;
+    }
+
+    public Collection<String> getDataCenters()
+    {
+        return dataCenters;
+    }
+
+    public Collection<String> getHosts()
+    {
+        return hosts;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "repair options (" +
+                       "sequential: " + sequential +
+                       ", primary range: " + primaryRange +
+                       ", incremental: " + incremental +
+                       ", job threads: " + jobThreads +
+                       ", ColumnFamilies: " + columnFamilies +
+                       ", dataCenters: " + dataCenters +
+                       ", hosts: " + hosts +
+                       ", # of ranges: " + ranges.size() +
+                       ')';
+    }
+}

Reply via email to