http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index f1b9cce..670aa0b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.service;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
@@ -26,12 +27,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.Bounds;
@@ -39,6 +39,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -46,12 +47,9 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.*;
-import org.apache.cassandra.repair.messages.AnticompactionRequest;
-import org.apache.cassandra.repair.messages.PrepareMessage;
-import org.apache.cassandra.repair.messages.RepairMessage;
-import org.apache.cassandra.repair.messages.SyncComplete;
-import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.RepairSession;
+import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -73,21 +71,10 @@ public class ActiveRepairService
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ActiveRepairService.class);
     // singleton enforcement
-    public static final ActiveRepairService instance = new 
ActiveRepairService();
+    public static final ActiveRepairService instance = new 
ActiveRepairService(FailureDetector.instance, Gossiper.instance);
 
     public static final long UNREPAIRED_SSTABLE = 0;
 
-    private static final ThreadPoolExecutor executor;
-    static
-    {
-        executor = new JMXConfigurableThreadPoolExecutor(4,
-                                                         60,
-                                                         TimeUnit.SECONDS,
-                                                         new 
LinkedBlockingQueue<Runnable>(),
-                                                         new 
NamedThreadFactory("AntiEntropySessions"),
-                                                         "internal");
-    }
-
     public static enum Status
     {
         STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
@@ -96,17 +83,17 @@ public class ActiveRepairService
     /**
      * A map of active coordinator session.
      */
-    private final ConcurrentMap<UUID, RepairSession> sessions;
+    private final ConcurrentMap<UUID, RepairSession> sessions = new 
ConcurrentHashMap<>();
 
-    private final ConcurrentMap<UUID, ParentRepairSession> 
parentRepairSessions;
+    private final ConcurrentMap<UUID, ParentRepairSession> 
parentRepairSessions = new ConcurrentHashMap<>();
 
-    /**
-     * Protected constructor. Use ActiveRepairService.instance.
-     */
-    protected ActiveRepairService()
+    private final IFailureDetector failureDetector;
+    private final Gossiper gossiper;
+
+    public ActiveRepairService(IFailureDetector failureDetector, Gossiper 
gossiper)
     {
-        sessions = new ConcurrentHashMap<>();
-        parentRepairSessions = new ConcurrentHashMap<>();
+        this.failureDetector = failureDetector;
+        this.gossiper = gossiper;
     }
 
     /**
@@ -114,51 +101,52 @@ public class ActiveRepairService
      *
      * @return Future for asynchronous call or null if there is no need to 
repair
      */
-    public RepairFuture submitRepairSession(UUID parentRepairSession, 
Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> 
endpoints, String... cfnames)
+    public RepairSession submitRepairSession(UUID parentRepairSession,
+                                             Range<Token> range,
+                                             String keyspace,
+                                             boolean isSequential,
+                                             Set<InetAddress> endpoints,
+                                             long repairedAt,
+                                             ListeningExecutorService executor,
+                                             String... cfnames)
     {
-        RepairSession session = new RepairSession(parentRepairSession, range, 
keyspace, isSequential, endpoints, cfnames);
-        if (session.endpoints.isEmpty())
+        if (endpoints.isEmpty())
             return null;
-        RepairFuture futureTask = new RepairFuture(session);
-        executor.execute(futureTask);
-        return futureTask;
-    }
 
-    public void addToActiveSessions(RepairSession session)
-    {
+        final RepairSession session = new RepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), range, keyspace, isSequential, endpoints, repairedAt, 
cfnames);
+
         sessions.put(session.getId(), session);
-        Gossiper.instance.register(session);
-        
FailureDetector.instance.registerFailureDetectionEventListener(session);
-    }
+        // register listeners
+        gossiper.register(session);
+        failureDetector.registerFailureDetectionEventListener(session);
 
-    public void removeFromActiveSessions(RepairSession session)
-    {
-        Gossiper.instance.unregister(session);
-        sessions.remove(session.getId());
+        // unregister listeners at completion
+        session.addListener(new Runnable()
+        {
+            /**
+             * When repair finished, do clean up
+             */
+            public void run()
+            {
+                
failureDetector.unregisterFailureDetectionEventListener(session);
+                gossiper.unregister(session);
+                sessions.remove(session.getId());
+            }
+        }, MoreExecutors.sameThreadExecutor());
+        session.start(executor);
+        return session;
     }
 
     public void terminateSessions()
     {
+        Throwable cause = new IOException("Terminate session is called");
         for (RepairSession session : sessions.values())
         {
-            session.forceShutdown();
+            session.forceShutdown(cause);
         }
         parentRepairSessions.clear();
     }
 
-    // for testing only. Create a session corresponding to a fake request and
-    // add it to the sessions (avoid NPE in tests)
-    RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
-    {
-        Set<InetAddress> neighbours = new HashSet<>();
-        neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, 
desc.range, null, null));
-        RepairSession session = new RepairSession(desc.parentSessionId, 
desc.sessionId, desc.range, desc.keyspace, false, neighbours, new 
String[]{desc.columnFamily});
-        sessions.put(session.getId(), session);
-        RepairFuture futureTask = new RepairFuture(session);
-        executor.execute(futureTask);
-        return futureTask;
-    }
-
     /**
      * Return all of the neighbors with whom we share the provided range.
      *
@@ -191,7 +179,7 @@ public class ActiveRepairService
         Set<InetAddress> neighbors = new 
HashSet<>(replicaSets.get(rangeSuperSet));
         neighbors.remove(FBUtilities.getBroadcastAddress());
 
-        if (dataCenters != null)
+        if (dataCenters != null && !dataCenters.isEmpty())
         {
             TokenMetadata.Topology topology = 
ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
             Set<InetAddress> dcEndpoints = Sets.newHashSet();
@@ -204,7 +192,7 @@ public class ActiveRepairService
             }
             return Sets.intersection(neighbors, dcEndpoints);
         }
-        else if (hosts != null)
+        else if (hosts != null && !hosts.isEmpty())
         {
             Set<InetAddress> specifiedHost = new HashSet<>();
             for (final String host : hosts)
@@ -314,21 +302,18 @@ public class ActiveRepairService
         parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, 
System.currentTimeMillis()));
     }
 
-    public void finishParentSession(UUID parentSession, Set<InetAddress> 
neighbors, boolean doAntiCompaction)
+    public void finishParentSession(UUID parentSession, Set<InetAddress> 
neighbors)
     {
         try
         {
-            if (doAntiCompaction)
+            for (InetAddress neighbor : neighbors)
             {
-                for (InetAddress neighbor : neighbors)
-                {
-                    AnticompactionRequest acr = new 
AnticompactionRequest(parentSession);
-                    MessageOut<RepairMessage> req = acr.createMessage();
-                    MessagingService.instance().sendOneWay(req, neighbor);
-                }
-                List<Future<?>> futures = doAntiCompaction(parentSession);
-                FBUtilities.waitOnFutures(futures);
+                AnticompactionRequest acr = new 
AnticompactionRequest(parentSession);
+                MessageOut<RepairMessage> req = acr.createMessage();
+                MessagingService.instance().sendOneWay(req, neighbor);
             }
+            List<Future<?>> futures = doAntiCompaction(parentSession);
+            FBUtilities.waitOnFutures(futures);
         }
         finally
         {
@@ -407,7 +392,7 @@ public class ActiveRepairService
             this.repairedAt = repairedAt;
         }
 
-        public Collection<SSTableReader> getAndReferenceSSTables(UUID cfId)
+        public synchronized Collection<SSTableReader> 
getAndReferenceSSTables(UUID cfId)
         {
             Set<SSTableReader> sstables = sstableMap.get(cfId);
             Iterator<SSTableReader> sstableIterator = sstables.iterator();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 5b5fa20..eb4c3e2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.annotation.Nullable;
 import javax.management.JMX;
 import javax.management.MBeanServer;
 import javax.management.Notification;
@@ -46,17 +47,13 @@ import ch.qos.logback.core.Appender;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Uninterruptibles;
-
+import com.google.common.util.concurrent.*;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.Auth;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
@@ -80,8 +77,10 @@ import org.apache.cassandra.net.AsyncOneResponse;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.ResponseVerbHandler;
-import org.apache.cassandra.repair.RepairFuture;
 import org.apache.cassandra.repair.RepairMessageVerbHandler;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.repair.RepairResult;
+import org.apache.cassandra.repair.RepairSession;
 import org.apache.cassandra.service.paxos.CommitVerbHandler;
 import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
@@ -2497,87 +2496,128 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         sendNotification(jmxNotification);
     }
 
-    public int forceRepairAsync(String keyspace, boolean isSequential, 
Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, 
boolean fullRepair, String... columnFamilies) throws IOException
+    public int repairAsync(String keyspace, Map<String, String> repairSpec)
     {
-        Collection<Range<Token>> ranges;
-        if (primaryRange)
+        RepairOption option = RepairOption.parse(repairSpec, getPartitioner());
+        // if ranges are not specified
+        if (option.getRanges().isEmpty())
         {
-            // when repairing only primary range, neither dataCenters nor 
hosts can be set
-            if (dataCenters == null && hosts == null)
-                ranges = getPrimaryRanges(keyspace);
-            // except dataCenters only contain local DC (i.e. -local)
-            else if (dataCenters != null && dataCenters.size() == 1 && 
dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
-                ranges = getPrimaryRangesWithinDC(keyspace);
+            if (option.isPrimaryRange())
+            {
+                // when repairing only primary range, neither dataCenters nor 
hosts can be set
+                if (option.getDataCenters().isEmpty() && 
option.getHosts().isEmpty())
+                    option.getRanges().addAll(getPrimaryRanges(keyspace));
+                    // except dataCenters only contain local DC (i.e. -local)
+                else if (option.getDataCenters().size() == 1 && 
option.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter()))
+                    
option.getRanges().addAll(getPrimaryRangesWithinDC(keyspace));
+                else
+                    throw new IllegalArgumentException("You need to run 
primary range repair on all nodes in the cluster.");
+            }
             else
-                throw new IllegalArgumentException("You need to run primary 
range repair on all nodes in the cluster.");
-        }
-        else
-        {
-             ranges = getLocalRanges(keyspace);
+            {
+                option.getRanges().addAll(getLocalRanges(keyspace));
+            }
         }
-
-        return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, 
ranges, fullRepair, columnFamilies);
+        return forceRepairAsync(keyspace, option);
     }
 
-    public int forceRepairAsync(String keyspace, boolean isSequential, 
Collection<String> dataCenters, Collection<String> hosts, 
Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
+    public int forceRepairAsync(String keyspace,
+                                boolean isSequential,
+                                Collection<String> dataCenters,
+                                Collection<String> hosts,
+                                boolean primaryRange,
+                                boolean fullRepair,
+                                String... columnFamilies)
     {
-        if (ranges.isEmpty() || 
Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
-            return 0;
-
-        int cmd = nextRepairCommand.incrementAndGet();
-        if (ranges.size() > 0)
+        if (!FBUtilities.isUnix() && isSequential)
         {
-            if (!FBUtilities.isUnix() && isSequential)
-            {
-                logger.warn("Snapshot-based repair is not yet supported on 
Windows.  Reverting to parallel repair.");
-                isSequential = false;
-            }
-            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, 
dataCenters, hosts, fullRepair, columnFamilies)).start();
+            logger.warn("Snapshot-based repair is not yet supported on 
Windows.  Reverting to parallel repair.");
+            isSequential = false;
         }
-        return cmd;
-    }
 
-    public int forceRepairAsync(String keyspace, boolean isSequential, boolean 
isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
-    {
-        Collection<Range<Token>> ranges;
-        if (primaryRange)
+        RepairOption options = new RepairOption(isSequential, primaryRange, 
!fullRepair, 1, Collections.<Range<Token>>emptyList());
+        if (dataCenters != null)
         {
-            ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) : 
getPrimaryRanges(keyspace);
+            options.getDataCenters().addAll(dataCenters);
         }
-        else
+        if (hosts != null)
         {
-            ranges = getLocalRanges(keyspace);
+            options.getHosts().addAll(hosts);
         }
-
-        return forceRepairAsync(keyspace, isSequential, isLocal, ranges, 
fullRepair, columnFamilies);
+        if (columnFamilies != null)
+        {
+            for (String columnFamily : columnFamilies)
+            {
+                options.getColumnFamilies().add(columnFamily);
+            }
+        }
+        return forceRepairAsync(keyspace, options);
     }
 
-    public int forceRepairAsync(final String keyspace, final boolean 
isSequential, final boolean isLocal, final Collection<Range<Token>> ranges, 
final boolean fullRepair, final String... columnFamilies)
+    public int forceRepairAsync(String keyspace,
+                                boolean isSequential,
+                                boolean isLocal,
+                                boolean primaryRange,
+                                boolean fullRepair,
+                                String... columnFamilies)
     {
-        if (ranges.isEmpty() || 
Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
-            return 0;
-
-        int cmd = nextRepairCommand.incrementAndGet();
-        new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, 
isLocal, fullRepair, columnFamilies)).start();
-        return cmd;
+        Set<String> dataCenters = null;
+        if (isLocal)
+        {
+            dataCenters = 
Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+        }
+        return forceRepairAsync(keyspace, isSequential, dataCenters, null, 
primaryRange, fullRepair, columnFamilies);
     }
 
-    public int forceRepairRangeAsync(String beginToken, String endToken, 
String keyspaceName, boolean isSequential, Collection<String> dataCenters, 
Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws 
IOException
+    public int forceRepairRangeAsync(String beginToken,
+                                     String endToken,
+                                     String keyspaceName,
+                                     boolean isSequential,
+                                     Collection<String> dataCenters,
+                                     Collection<String> hosts,
+                                     boolean fullRepair,
+                                     String... columnFamilies)
     {
+        if (!FBUtilities.isUnix() && isSequential)
+        {
+            logger.warn("Snapshot-based repair is not yet supported on 
Windows.  Reverting to parallel repair.");
+            isSequential = false;
+        }
         Collection<Range<Token>> repairingRange = 
createRepairRangeFrom(beginToken, endToken);
 
+        RepairOption options = new RepairOption(isSequential, false, 
!fullRepair, 1, repairingRange);
+        options.getDataCenters().addAll(dataCenters);
+        if (hosts != null)
+        {
+            options.getHosts().addAll(hosts);
+        }
+        if (columnFamilies != null)
+        {
+            for (String columnFamily : columnFamilies)
+            {
+                options.getColumnFamilies().add(columnFamily);
+            }
+        }
+
         logger.info("starting user-requested repair of range {} for keyspace 
{} and column families {}",
                            repairingRange, keyspaceName, columnFamilies);
-        return forceRepairAsync(keyspaceName, isSequential, dataCenters, 
hosts, repairingRange, fullRepair, columnFamilies);
+        return forceRepairAsync(keyspaceName, options);
     }
 
-    public int forceRepairRangeAsync(String beginToken, String endToken, 
String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, 
String... columnFamilies)
+    public int forceRepairRangeAsync(String beginToken,
+                                     String endToken,
+                                     String keyspaceName,
+                                     boolean isSequential,
+                                     boolean isLocal,
+                                     boolean fullRepair,
+                                     String... columnFamilies)
     {
-        Collection<Range<Token>> repairingRange = 
createRepairRangeFrom(beginToken, endToken);
-
-        logger.info("starting user-requested repair of range {} for keyspace 
{} and column families {}",
-                           repairingRange, keyspaceName, columnFamilies);
-        return forceRepairAsync(keyspaceName, isSequential, isLocal, 
repairingRange, fullRepair, columnFamilies);
+        Set<String> dataCenters = null;
+        if (isLocal)
+        {
+            dataCenters = 
Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+        }
+        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, 
isSequential, dataCenters, null, fullRepair, columnFamilies);
     }
 
     /**
@@ -2619,32 +2659,19 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return repairingRange;
     }
 
-    private FutureTask<Object> createRepairTask(int cmd,
-                                                String keyspace,
-                                                Collection<Range<Token>> 
ranges,
-                                                boolean isSequential,
-                                                boolean isLocal,
-                                                boolean fullRepair,
-                                                String... columnFamilies)
+    public int forceRepairAsync(String keyspace, RepairOption options)
     {
-        Set<String> dataCenters = null;
-        if (isLocal)
-        {
-            dataCenters = 
Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
-        }
-        return createRepairTask(cmd, keyspace, ranges, isSequential, 
dataCenters, null, fullRepair, columnFamilies);
+        if (options.getRanges().isEmpty() || 
Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
+            return 0;
+
+        int cmd = nextRepairCommand.incrementAndGet();
+        new Thread(createRepairTask(cmd, keyspace, options)).start();
+        return cmd;
     }
 
-    private FutureTask<Object> createRepairTask(final int cmd,
-                                                final String keyspace,
-                                                final Collection<Range<Token>> 
ranges,
-                                                final boolean isSequential,
-                                                final Collection<String> 
dataCenters,
-                                                final Collection<String> hosts,
-                                                final boolean fullRepair,
-                                                final String... columnFamilies)
+    private FutureTask<Object> createRepairTask(final int cmd, final String 
keyspace, final RepairOption options)
     {
-        if (dataCenters != null && 
!dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+        if (!options.getDataCenters().isEmpty() && 
options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter()))
         {
             throw new IllegalArgumentException("the local data center must be 
part of the repair");
         }
@@ -2653,11 +2680,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         {
             protected void runMayThrow() throws Exception
             {
-                String message = String.format("Starting repair command #%d, 
repairing %d ranges for keyspace %s (seq=%b, full=%b)", cmd, ranges.size(), 
keyspace, isSequential, fullRepair);
+                final long startTime = System.currentTimeMillis();
+                String message = String.format("Starting repair command #%d, 
repairing keyspace %s with %s", cmd, keyspace, options);
                 logger.info(message);
                 sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.STARTED.ordinal()});
 
-                if (isSequential && !fullRepair)
+                if (options.isSequential() && options.isIncremental())
                 {
                     message = "It is not possible to mix sequential repair and 
incremental repairs.";
                     logger.error(message);
@@ -2665,13 +2693,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     return;
                 }
 
-                Set<InetAddress> allNeighbors = new HashSet<>();
+                final Set<InetAddress> allNeighbors = new HashSet<>();
                 Map<Range, Set<InetAddress>> rangeToNeighbors = new 
HashMap<>();
-                for (Range<Token> range : ranges)
+                for (Range<Token> range : options.getRanges())
                 {
                     try
                     {
-                        Set<InetAddress> neighbors = 
ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
+                        Set<InetAddress> neighbors = 
ActiveRepairService.getNeighbors(keyspace, range, options.getDataCenters(), 
options.getHosts());
                         rangeToNeighbors.put(range, neighbors);
                         allNeighbors.addAll(neighbors);
                     }
@@ -2685,6 +2713,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
                 // Validate columnfamilies
                 List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
+                String[] columnFamilies = 
options.getColumnFamilies().toArray(new 
String[options.getColumnFamilies().size()]);
                 try
                 {
                     Iterables.addAll(columnFamilyStores, 
getValidColumnFamilies(false, false, keyspace, columnFamilies));
@@ -2695,12 +2724,14 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     return;
                 }
 
-                UUID parentSession = null;
-                if (!fullRepair)
+                final UUID parentSession;
+                long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
+                if (options.isIncremental())
                 {
                     try
                     {
-                        parentSession = 
ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, 
columnFamilyStores);
+                        parentSession = 
ActiveRepairService.instance.prepareForRepair(allNeighbors, 
options.getRanges(), columnFamilyStores);
+                        repairedAt = 
ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
                     }
                     catch (Throwable t)
                     {
@@ -2708,60 +2739,93 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                         return;
                     }
                 }
+                else
+                {
+                    parentSession = null;
+                }
 
-                List<RepairFuture> futures = new ArrayList<>(ranges.size());
+                // Set up RepairJob executor for this repair command.
+                final ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(new 
JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
+                                                                               
                                            Integer.MAX_VALUE,
+                                                                               
                                            TimeUnit.SECONDS,
+                                                                               
                                            new LinkedBlockingQueue<Runnable>(),
+                                                                               
                                            new NamedThreadFactory("Repair#" + 
cmd),
+                                                                               
                                            "internal"));
+
+                List<ListenableFuture<?>> futures = new 
ArrayList<>(options.getRanges().size());
                 String[] cfnames = new String[columnFamilyStores.size()];
                 for (int i = 0; i < columnFamilyStores.size(); i++)
                 {
                     cfnames[i] = columnFamilyStores.get(i).name;
                 }
-                for (Range<Token> range : ranges)
+                for (Range<Token> range : options.getRanges())
                 {
-                    RepairFuture future = 
ActiveRepairService.instance.submitRepairSession(parentSession, range, 
keyspace, isSequential, rangeToNeighbors.get(range), cfnames);
-                    if (future == null)
+                    final RepairSession session = 
ActiveRepairService.instance.submitRepairSession(parentSession,
+                                                                      range,
+                                                                      keyspace,
+                                                                      
options.isSequential(),
+                                                                      
rangeToNeighbors.get(range),
+                                                                      
repairedAt,
+                                                                      executor,
+                                                                      cfnames);
+                    if (session == null)
                         continue;
-                    futures.add(future);
-                    // wait for a session to be done with its differencing 
before starting the next one
-                    try
+                    // After repair session completes, notify client its result
+                    Futures.addCallback(session, new 
FutureCallback<List<RepairResult>>()
                     {
-                        future.session.differencingDone.await();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        message = "Interrupted while waiting for the 
differencing of repair session " + future.session + " to be done. Repair may be 
imprecise.";
-                        logger.error(message, e);
-                        sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.SESSION_FAILED.ordinal()});
-                    }
+                        public void onSuccess(List<RepairResult> results)
+                        {
+                            String message = String.format("Repair session %s 
for range %s finished", session.getId(), session.getRange().toString());
+                            logger.info(message);
+                            sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.SESSION_SUCCESS.ordinal()});
+                        }
+
+                        public void onFailure(Throwable t)
+                        {
+                            String message = String.format("Repair session %s 
for range %s failed with error %s", session.getId(), 
session.getRange().toString(), t.getMessage());
+                            logger.error(message, t);
+                            sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.SESSION_FAILED.ordinal()});
+                        }
+                    });
+                    futures.add(session);
                 }
 
-                boolean successful = true;
-                for (RepairFuture future : futures)
+                // After all repair sessions completes(successful or not),
+                // run anticompaction if necessary and send finish notice back 
to client
+                ListenableFuture<?> allSessions = Futures.allAsList(futures);
+                Futures.addCallback(allSessions, new FutureCallback<Object>()
                 {
-                    try
+                    public void onSuccess(@Nullable Object result)
                     {
-                        future.get();
-                        message = String.format("Repair session %s for range 
%s finished", future.session.getId(), future.session.getRange().toString());
-                        logger.info(message);
-                        sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.SESSION_SUCCESS.ordinal()});
+                        if (options.isIncremental())
+                        {
+                            try
+                            {
+                                
ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors);
+                            }
+                            catch (Exception e)
+                            {
+                                logger.error("Error in incremental repair", e);
+                            }
+                        }
+                        repairComplete();
                     }
-                    catch (ExecutionException e)
+
+                    public void onFailure(Throwable t)
                     {
-                        successful = false;
-                        message = String.format("Repair session %s for range 
%s failed with error %s", future.session.getId(), 
future.session.getRange().toString(), e.getCause().getMessage());
-                        logger.error(message, e);
-                        sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.SESSION_FAILED.ordinal()});
+                        repairComplete();
                     }
-                    catch (Exception e)
+
+                    private void repairComplete()
                     {
-                        successful = false;
-                        message = String.format("Repair session %s for range 
%s failed with error %s", future.session.getId(), 
future.session.getRange().toString(), e.getMessage());
-                        logger.error(message, e);
-                        sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.SESSION_FAILED.ordinal()});
+                        String duration = 
DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, 
true, true);
+                        String message = String.format("Repair command #%d 
finished in %s", cmd, duration);
+                        sendNotification("repair", message,
+                                         new int[]{cmd, 
ActiveRepairService.Status.FINISHED.ordinal()});
+                        logger.info(message);
+                        executor.shutdownNow();
                     }
-                }
-                if (!fullRepair)
-                    
ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, 
successful);
-                sendNotification("repair", String.format("Repair command #%d 
finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+                }, MoreExecutors.sameThreadExecutor());
             }
         }, null);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index cc54639..203d5dc 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -270,29 +270,22 @@ public interface StorageServiceMBean extends 
NotificationEmitter
      *   type: "repair"
      *   userObject: int array of length 2, [0]=command number, [1]=ordinal of 
AntiEntropyService.Status
      *
+     * @param keyspace Keyspace name to repair. Should not be null.
+     * @param options repair option.
      * @return Repair command number, or 0 if nothing to repair
      */
+    public int repairAsync(String keyspace, Map<String, String> options);
+
+    @Deprecated
     public int forceRepairAsync(String keyspace, boolean isSequential, 
Collection<String> dataCenters, Collection<String> hosts,  boolean 
primaryRange, boolean repairedAt, String... columnFamilies) throws IOException;
 
-    /**
-     * Same as forceRepairAsync, but handles a specified range
-     */
+    @Deprecated
     public int forceRepairRangeAsync(String beginToken, String endToken, 
String keyspaceName, boolean isSequential, Collection<String> dataCenters, 
Collection<String> hosts, boolean repairedAt, String... columnFamilies) throws 
IOException;
 
-    /**
-     * Invoke repair asynchronously.
-     * You can track repair progress by subscribing JMX notification sent from 
this StorageServiceMBean.
-     * Notification format is:
-     *   type: "repair"
-     *   userObject: int array of length 2, [0]=command number, [1]=ordinal of 
AntiEntropyService.Status
-     *
-     * @return Repair command number, or 0 if nothing to repair
-     */
+    @Deprecated
     public int forceRepairAsync(String keyspace, boolean isSequential, boolean 
isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies);
 
-    /**
-     * Same as forceRepairAsync, but handles a specified range
-     */
+    @Deprecated
     public int forceRepairRangeAsync(String beginToken, String endToken, 
String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, 
String... columnFamilies);
 
     public void forceTerminateAllRepairSessions();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 33da3d1..f26e439 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -21,16 +21,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -38,9 +37,7 @@ import org.apache.cassandra.utils.Pair;
  */
 public class StreamReceiveTask extends StreamTask
 {
-    private static final ThreadPoolExecutor executor = 
DebuggableThreadPoolExecutor.createWithMaximumPoolSize("StreamReceiveTask",
-                                                                               
                               FBUtilities.getAvailableProcessors(),
-                                                                               
                               60, TimeUnit.SECONDS);
+    private static final ExecutorService executor = 
Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask"));
 
     // number of files to receive
     private final int totalFiles;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 70788fd..8793e92 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -43,6 +43,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
+import com.google.common.util.concurrent.AbstractFuture;
 import com.yammer.metrics.reporting.JmxReporter;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
@@ -249,43 +250,15 @@ public class NodeProbe implements AutoCloseable
         ssProxy.forceKeyspaceFlush(keyspaceName, columnFamilies);
     }
 
-    public void forceRepairAsync(final PrintStream out, final String 
keyspaceName, boolean isSequential, Collection<String> dataCenters, 
Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... 
columnFamilies) throws IOException
+    public void repairAsync(final PrintStream out, final String keyspace, 
Map<String, String> options) throws IOException
     {
-        RepairRunner runner = new RepairRunner(out, keyspaceName, 
columnFamilies);
+        RepairRunner runner = new RepairRunner(out, ssProxy, keyspace, 
options);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, 
hosts, primaryRange, fullRepair))
-                failed = true;
-        }
-        catch (Exception e)
-        {
-            throw new IOException(e) ;
-        }
-        finally
-        {
-            try
-            {
-                ssProxy.removeNotificationListener(runner);
-                jmxc.removeConnectionNotificationListener(runner);
-            }
-            catch (Throwable t)
-            {
-                JVMStabilityInspector.inspectThrowable(t);
-                out.println("Exception occurred during clean-up. " + t);
-            }
-        }
-    }
-
-    public void forceRepairRangeAsync(final PrintStream out, final String 
keyspaceName, boolean isSequential, Collection<String> dataCenters, 
Collection<String> hosts, final String startToken, final String endToken, 
boolean fullRepair, String... columnFamilies) throws IOException
-    {
-        RepairRunner runner = new RepairRunner(out, keyspaceName, 
columnFamilies);
-        try
-        {
-            jmxc.addConnectionNotificationListener(runner, null, null);
-            ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairRangeAndWait(ssProxy,  isSequential, 
dataCenters, hosts, startToken, endToken, fullRepair))
+            runner.run();
+            if (!runner.get())
                 failed = true;
         }
         catch (Exception e)
@@ -1273,88 +1246,3 @@ class ThreadPoolProxyMBeanIterator implements 
Iterator<Map.Entry<String, JMXEnab
         throw new UnsupportedOperationException();
     }
 }
-
-class RepairRunner implements NotificationListener
-{
-    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss,SSS");
-    private final Condition condition = new SimpleCondition();
-    private final PrintStream out;
-    private final String keyspace;
-    private final String[] columnFamilies;
-    private int cmd;
-    private volatile boolean success = true;
-    private volatile Exception error = null;
-
-    RepairRunner(PrintStream out, String keyspace, String... columnFamilies)
-    {
-        this.out = out;
-        this.keyspace = keyspace;
-        this.columnFamilies = columnFamilies;
-    }
-
-    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean 
isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean 
primaryRangeOnly, boolean fullRepair) throws Exception
-    {
-        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, 
hosts, primaryRangeOnly, fullRepair, columnFamilies);
-        waitForRepair();
-        return success;
-    }
-
-    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean 
isSequential, Collection<String> dataCenters, Collection<String> hosts, String 
startToken, String endToken, boolean fullRepair) throws Exception
-    {
-        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, 
isSequential, dataCenters, hosts, fullRepair, columnFamilies);
-        waitForRepair();
-        return success;
-    }
-
-    private void waitForRepair() throws Exception
-    {
-        if (cmd > 0)
-        {
-            condition.await();
-        }
-        else
-        {
-            String message = String.format("[%s] Nothing to repair for 
keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
-            out.println(message);
-        }
-        if (error != null)
-        {
-            throw error;
-        }
-    }
-
-    public void handleNotification(Notification notification, Object handback)
-    {
-        if ("repair".equals(notification.getType()))
-        {
-            int[] status = (int[]) notification.getUserData();
-            assert status.length == 2;
-            if (cmd == status[0])
-            {
-                String message = String.format("[%s] %s", 
format.format(notification.getTimeStamp()), notification.getMessage());
-                out.println(message);
-                // repair status is int array with [0] = cmd number, [1] = 
status
-                if (status[1] == 
ActiveRepairService.Status.SESSION_FAILED.ordinal())
-                    success = false;
-                else if (status[1] == 
ActiveRepairService.Status.FINISHED.ordinal())
-                    condition.signalAll();
-            }
-        }
-        else if 
(JMXConnectionNotification.NOTIFS_LOST.equals(notification.getType()))
-        {
-            String message = String.format("[%s] Lost notification. You should 
check server log for repair status of keyspace %s",
-                                           
format.format(notification.getTimeStamp()),
-                                           keyspace);
-            out.println(message);
-        }
-        else if 
(JMXConnectionNotification.FAILED.equals(notification.getType())
-                 || 
JMXConnectionNotification.CLOSED.equals(notification.getType()))
-        {
-            String message = String.format("JMX connection closed. You should 
check server log for repair status of keyspace %s"
-                                           + "(Subsequent keyspaces are not 
going to be repaired).",
-                                           keyspace);
-            error = new IOException(message);
-            condition.signalAll();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 80b0b8f..18536bf 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Maps;
 import com.yammer.metrics.reporting.JmxReporter;
 
 import io.airlift.command.*;
+import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.config.Schema;
@@ -48,6 +49,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.service.CacheServiceMBean;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.SessionInfo;
@@ -1677,6 +1679,11 @@ public class NodeTool
         @Option(title = "full", name = {"-full", "--full"}, description = "Use 
-full to issue a full repair.")
         private boolean fullRepair = false;
 
+        @Option(title = "job_threads", name = {"-j", "--job-threads"}, 
description = "Number of threads to run repair jobs. " +
+                                                                               
      "Usually this means number of CFs to repair concurrently. " +
+                                                                               
      "WARNING: increasing this puts more load on repairing nodes, so be 
careful. (default: 1, max: 4)")
+        private int numJobThreads = 1;
+
         @Override
         public void execute(NodeProbe probe)
         {
@@ -1688,20 +1695,28 @@ public class NodeTool
 
             for (String keyspace : keyspaces)
             {
+                Map<String, String> options = new HashMap<>();
+                options.put(RepairOption.SEQUENTIAL_KEY, 
Boolean.toString(sequential));
+                options.put(RepairOption.PRIMARY_RANGE_KEY, 
Boolean.toString(primaryRange));
+                options.put(RepairOption.INCREMENTAL_KEY, 
Boolean.toString(!fullRepair));
+                options.put(RepairOption.JOB_THREADS_KEY, 
Integer.toString(numJobThreads));
+                options.put(RepairOption.COLUMNFAMILIES_KEY, 
StringUtils.join(cfnames, ","));
+                if (!startToken.isEmpty() || !endToken.isEmpty())
+                {
+                    options.put(RepairOption.RANGES_KEY, startToken + ":" + 
endToken);
+                }
+                if (localDC)
+                {
+                    options.put(RepairOption.DATACENTERS_KEY, 
StringUtils.join(newArrayList(probe.getDataCenter()), ","));
+                }
+                else
+                {
+                    options.put(RepairOption.DATACENTERS_KEY, 
StringUtils.join(specificDataCenters, ","));
+                }
+                options.put(RepairOption.HOSTS_KEY, 
StringUtils.join(specificHosts, ","));
                 try
                 {
-                    Collection<String> dataCenters = null;
-                    Collection<String> hosts = null;
-                    if (!specificDataCenters.isEmpty())
-                        dataCenters = newArrayList(specificDataCenters);
-                    else if (localDC)
-                        dataCenters = newArrayList(probe.getDataCenter());
-                    else if(!specificHosts.isEmpty())
-                        hosts = newArrayList(specificHosts);
-                    if (!startToken.isEmpty() || !endToken.isEmpty())
-                        probe.forceRepairRangeAsync(System.out, keyspace, 
sequential, dataCenters,hosts, startToken, endToken, fullRepair);
-                    else
-                        probe.forceRepairAsync(System.out, keyspace, 
sequential, dataCenters, hosts, primaryRange, fullRepair, cfnames);
+                    probe.repairAsync(System.out, keyspace, options);
                 } catch (Exception e)
                 {
                     throw new RuntimeException("Error occurred during repair", 
e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/tools/RepairRunner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java 
b/src/java/org/apache/cassandra/tools/RepairRunner.java
new file mode 100644
index 0000000..1898bb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/RepairRunner.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.remote.JMXConnectionNotification;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageServiceMBean;
+
+public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, 
NotificationListener
+{
+    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss,SSS");
+
+    private final PrintStream out;
+    private final StorageServiceMBean ssProxy;
+    private final String keyspace;
+    private final Map<String, String> options;
+
+    private volatile int cmd;
+    private volatile boolean success;
+
+    public RepairRunner(PrintStream out, StorageServiceMBean ssProxy, String 
keyspace, Map<String, String> options)
+    {
+        this.out = out;
+        this.ssProxy = ssProxy;
+        this.keyspace = keyspace;
+        this.options = options;
+    }
+
+    public void run()
+    {
+        cmd = ssProxy.repairAsync(keyspace, options);
+        if (cmd <= 0)
+        {
+            String message = String.format("[%s] Nothing to repair for 
keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
+            out.println(message);
+            set(true);
+        }
+    }
+
+    public void handleNotification(Notification notification, Object handback)
+    {
+        if ("repair".equals(notification.getType()))
+        {
+            int[] status = (int[]) notification.getUserData();
+            assert status.length == 2;
+            if (cmd == status[0])
+            {
+                String message = String.format("[%s] %s", 
format.format(notification.getTimeStamp()), notification.getMessage());
+                out.println(message);
+                // repair status is int array with [0] = cmd number, [1] = 
status
+                if (status[1] == 
ActiveRepairService.Status.SESSION_FAILED.ordinal())
+                {
+                    success = false;
+                }
+                else if (status[1] == 
ActiveRepairService.Status.FINISHED.ordinal())
+                {
+                    set(success);
+                }
+            }
+        }
+        else if 
(JMXConnectionNotification.NOTIFS_LOST.equals(notification.getType()))
+        {
+            String message = String.format("[%s] Lost notification. You should 
check server log for repair status of keyspace %s",
+                                           
format.format(notification.getTimeStamp()),
+                                           keyspace);
+            out.println(message);
+        }
+        else if 
(JMXConnectionNotification.FAILED.equals(notification.getType())
+                 || 
JMXConnectionNotification.CLOSED.equals(notification.getType()))
+        {
+            String message = String.format("JMX connection closed. You should 
check server log for repair status of keyspace %s"
+                                           + "(Subsequent keyspaces are not 
going to be repaired).",
+                                           keyspace);
+            setException(new IOException(message));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/repair/DifferencerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java 
b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
deleted file mode 100644
index e1ff26e..0000000
--- a/test/unit/org/apache/cassandra/repair/DifferencerTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.sink.IMessageSink;
-import org.apache.cassandra.sink.SinkManager;
-import org.apache.cassandra.repair.messages.RepairMessage;
-import org.apache.cassandra.repair.messages.SyncComplete;
-import org.apache.cassandra.utils.MerkleTree;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class DifferencerTest
-{
-    private static final IPartitioner partirioner = new Murmur3Partitioner();
-    public static final String KEYSPACE1 = "DifferencerTest";
-    public static final String CF_STANDARD = "Standard1";
-
-    @BeforeClass
-    public static void defineSchema() throws Exception
-    {
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    SimpleStrategy.class,
-                                    KSMetaData.optsWithRF(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD));
-    }
-
-    @After
-    public void tearDown()
-    {
-        SinkManager.clear();
-    }
-
-    /**
-     * When there is no difference between two, Differencer should respond 
SYNC_COMPLETE
-     */
-    @Test
-    public void testNoDifference() throws Throwable
-    {
-        final InetAddress ep1 = InetAddress.getByName("127.0.0.1");
-        final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
-
-        SinkManager.add(new IMessageSink()
-        {
-            @SuppressWarnings("unchecked")
-            public MessageOut handleMessage(MessageOut message, int id, 
InetAddress to)
-            {
-                if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                {
-                    RepairMessage m = (RepairMessage) message.payload;
-                    assertEquals(RepairMessage.Type.SYNC_COMPLETE, 
m.messageType);
-                    // we should see SYNC_COMPLETE
-                    assertEquals(new NodePair(ep1, ep2), 
((SyncComplete)m).nodes);
-                }
-                return null;
-            }
-
-            public MessageIn handleMessage(MessageIn message, int id, 
InetAddress to)
-            {
-                return null;
-            }
-        });
-        Range<Token> range = new Range<>(partirioner.getMinimumToken(), 
partirioner.getRandomToken());
-        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), KEYSPACE1, "Standard1", range);
-
-        MerkleTree tree1 = createInitialTree(desc);
-        MerkleTree tree2 = createInitialTree(desc);
-
-        // difference the trees
-        // note: we reuse the same endpoint which is bogus in theory but fine 
here
-        TreeResponse r1 = new TreeResponse(ep1, tree1);
-        TreeResponse r2 = new TreeResponse(ep2, tree2);
-        Differencer diff = new Differencer(desc, r1, r2);
-        diff.run();
-
-        assertTrue(diff.differences.isEmpty());
-    }
-
-    @Test
-    public void testDifference() throws Throwable
-    {
-        Range<Token> range = new Range<>(partirioner.getMinimumToken(), 
partirioner.getRandomToken());
-        UUID parentRepairSession = UUID.randomUUID();
-        Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
Arrays.asList(cfs), Arrays.asList(range));
-
-        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, 
UUID.randomUUID(), KEYSPACE1, "Standard1", range);
-
-        MerkleTree tree1 = createInitialTree(desc);
-        MerkleTree tree2 = createInitialTree(desc);
-
-        // change a range in one of the trees
-        Token token = partirioner.midpoint(range.left, range.right);
-        tree1.invalidate(token);
-        MerkleTree.TreeRange changed = tree1.get(token);
-        changed.hash("non-empty hash!".getBytes());
-
-        Set<Range<Token>> interesting = new HashSet<>();
-        interesting.add(changed);
-
-        // difference the trees
-        // note: we reuse the same endpoint which is bogus in theory but fine 
here
-        TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), 
tree1);
-        TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), 
tree2);
-        Differencer diff = new Differencer(desc, r1, r2);
-        diff.run();
-
-        // ensure that the changed range was recorded
-        assertEquals("Wrong differing ranges", interesting, new 
HashSet<>(diff.differences));
-    }
-
-    private MerkleTree createInitialTree(RepairJobDesc desc)
-    {
-        MerkleTree tree = new MerkleTree(partirioner, desc.range, 
MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15));
-        tree.init();
-        for (MerkleTree.TreeRange r : tree.invalids())
-        {
-            r.ensureHashInitialised();
-        }
-        return tree;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
new file mode 100644
index 0000000..b3d333a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.MerkleTree;
+
+import static org.junit.Assert.assertEquals;
+
+public class LocalSyncTaskTest extends SchemaLoader
+{
+    private static final IPartitioner partirioner = new Murmur3Partitioner();
+    public static final String KEYSPACE1 = "DifferencerTest";
+    public static final String CF_STANDARD = "Standard1";
+
+    @BeforeClass
+    public static void defineSchema() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD));
+    }
+
+    /**
+     * When there is no difference between two, LocalSyncTask should return 
stats with 0 difference.
+     */
+    @Test
+    public void testNoDifference() throws Throwable
+    {
+        final InetAddress ep1 = InetAddress.getByName("127.0.0.1");
+        final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
+
+        Range<Token> range = new Range<>(partirioner.getMinimumToken(), 
partirioner.getRandomToken());
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), KEYSPACE1, "Standard1", range);
+
+        MerkleTree tree1 = createInitialTree(desc);
+        MerkleTree tree2 = createInitialTree(desc);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine 
here
+        TreeResponse r1 = new TreeResponse(ep1, tree1);
+        TreeResponse r2 = new TreeResponse(ep2, tree2);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        task.run();
+
+        assertEquals(0, task.get().numberOfDifferences);
+    }
+
+    @Test
+    public void testDifference() throws Throwable
+    {
+        Range<Token> range = new Range<>(partirioner.getMinimumToken(), 
partirioner.getRandomToken());
+        UUID parentRepairSession = UUID.randomUUID();
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+
+        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
Arrays.asList(cfs), Arrays.asList(range));
+
+        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, 
UUID.randomUUID(), KEYSPACE1, "Standard1", range);
+
+        MerkleTree tree1 = createInitialTree(desc);
+        MerkleTree tree2 = createInitialTree(desc);
+
+        // change a range in one of the trees
+        Token token = partirioner.midpoint(range.left, range.right);
+        tree1.invalidate(token);
+        MerkleTree.TreeRange changed = tree1.get(token);
+        changed.hash("non-empty hash!".getBytes());
+
+        Set<Range<Token>> interesting = new HashSet<>();
+        interesting.add(changed);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine 
here
+        TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), 
tree1);
+        TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), 
tree2);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        task.run();
+
+        // ensure that the changed range was recorded
+        assertEquals("Wrong differing ranges", interesting.size(), 
task.getCurrentStat().numberOfDifferences);
+    }
+
+    private MerkleTree createInitialTree(RepairJobDesc desc)
+    {
+        MerkleTree tree = new MerkleTree(partirioner, desc.range, 
MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15));
+        tree.init();
+        for (MerkleTree.TreeRange r : tree.invalids())
+        {
+            r.ensureHashInitialised();
+        }
+        return tree;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java 
b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
new file mode 100644
index 0000000..9811fcc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class RepairSessionTest
+{
+    @Test
+    public void testConviction() throws Exception
+    {
+        InetAddress remote = InetAddress.getByName("127.0.0.2");
+        Gossiper.instance.initializeNodeUnsafe(remote, UUID.randomUUID(), 1);
+
+        // Set up RepairSession
+        UUID parentSessionId = UUIDGen.getTimeUUID();
+        UUID sessionId = UUID.randomUUID();
+        IPartitioner p = new Murmur3Partitioner();
+        Range<Token> repairRange = new 
Range<>(p.getToken(ByteBufferUtil.bytes(0)), 
p.getToken(ByteBufferUtil.bytes(100)), p);
+        Set<InetAddress> endpoints = Sets.newHashSet(remote);
+        RepairSession session = new RepairSession(parentSessionId, sessionId, 
repairRange, "Keyspace1", true, endpoints, 
ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1");
+
+        // perform convict
+        session.convict(remote, Double.MAX_VALUE);
+
+        // RepairSession should throw ExecutorException with the cause of 
IOException when getting its value
+        try
+        {
+            session.get();
+            fail();
+        }
+        catch (ExecutionException ex)
+        {
+            assertEquals(IOException.class, ex.getCause().getClass());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java 
b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
new file mode 100644
index 0000000..59ad8a3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import static org.junit.Assert.*;
+
+public class RepairOptionTest
+{
+    @Test
+    public void testParseOptions()
+    {
+        IPartitioner partitioner = new Murmur3Partitioner();
+        Token.TokenFactory tokenFactory = partitioner.getTokenFactory();
+
+        // parse with empty options
+        RepairOption option = RepairOption.parse(new HashMap<String, 
String>(), partitioner);
+        assertTrue(option.isSequential());
+        assertFalse(option.isPrimaryRange());
+        assertFalse(option.isIncremental());
+
+        // parse everything
+        Map<String, String> options = new HashMap<>();
+        options.put(RepairOption.SEQUENTIAL_KEY, "false");
+        options.put(RepairOption.PRIMARY_RANGE_KEY, "false");
+        options.put(RepairOption.INCREMENTAL_KEY, "true");
+        options.put(RepairOption.RANGES_KEY, "0:10,11:20,21:30");
+        options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3");
+        options.put(RepairOption.DATACENTERS_KEY, "dc1,dc2,dc3");
+        options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3");
+
+        option = RepairOption.parse(options, partitioner);
+        assertFalse(option.isSequential());
+        assertFalse(option.isPrimaryRange());
+        assertTrue(option.isIncremental());
+
+        Set<Range<Token>> expectedRanges = new HashSet<>(3);
+        expectedRanges.add(new Range<>(tokenFactory.fromString("0"), 
tokenFactory.fromString("10")));
+        expectedRanges.add(new Range<>(tokenFactory.fromString("11"), 
tokenFactory.fromString("20")));
+        expectedRanges.add(new Range<>(tokenFactory.fromString("21"), 
tokenFactory.fromString("30")));
+        assertEquals(expectedRanges, option.getRanges());
+
+        Set<String> expectedCFs = new HashSet<>(3);
+        expectedCFs.add("cf1");
+        expectedCFs.add("cf2");
+        expectedCFs.add("cf3");
+        assertEquals(expectedCFs, option.getColumnFamilies());
+
+        Set<String> expectedDCs = new HashSet<>(3);
+        expectedDCs.add("dc1");
+        expectedDCs.add("dc2");
+        expectedDCs.add("dc3");
+        assertEquals(expectedDCs, option.getDataCenters());
+
+        Set<String> expectedHosts = new HashSet<>(3);
+        expectedHosts.add("127.0.0.1");
+        expectedHosts.add("127.0.0.2");
+        expectedHosts.add("127.0.0.3");
+        assertEquals(expectedHosts, option.getHosts());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java 
b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
new file mode 100644
index 0000000..dab45f9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -0,0 +1,218 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import com.google.common.collect.Sets;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+public class ActiveRepairServiceTest
+{
+    public static final String KEYSPACE5 = "Keyspace5";
+    public static final String CF_STANDRAD1 = "Standard1";
+    public static final String CF_COUNTER = "Counter1";
+
+    public String cfname;
+    public ColumnFamilyStore store;
+    public InetAddress LOCAL, REMOTE;
+
+    private boolean initialized;
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE5,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(2),
+                                    SchemaLoader.standardCFMD(KEYSPACE5, 
CF_COUNTER),
+                                    SchemaLoader.standardCFMD(KEYSPACE5, 
CF_STANDRAD1));
+    }
+
+    @Before
+    public void prepare() throws Exception
+    {
+        if (!initialized)
+        {
+            SchemaLoader.startGossiper();
+            initialized = true;
+
+            LOCAL = FBUtilities.getBroadcastAddress();
+            // generate a fake endpoint for which we can spoof 
receiving/sending trees
+            REMOTE = InetAddress.getByName("127.0.0.2");
+        }
+
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        tmd.clearUnsafe();
+        
StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
+        
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), 
REMOTE);
+        assert tmd.isMember(REMOTE);
+    }
+
+    @Test
+    public void testGetNeighborsPlusOne() throws Throwable
+    {
+        // generate rf+1 nodes, and ensure that all nodes are returned
+        Set<InetAddress> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        expected.remove(FBUtilities.getBroadcastAddress());
+        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Set<InetAddress> neighbors = new HashSet<>();
+        for (Range<Token> range : ranges)
+        {
+            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
range, null, null));
+        }
+        assertEquals(expected, neighbors);
+    }
+
+    @Test
+    public void testGetNeighborsTimesTwo() throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        // generate rf*2 nodes, and ensure that only neighbors specified by 
the ARS are returned
+        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
+        Set<InetAddress> expected = new HashSet<>();
+        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+        {
+            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+        }
+        expected.remove(FBUtilities.getBroadcastAddress());
+        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Set<InetAddress> neighbors = new HashSet<>();
+        for (Range<Token> range : ranges)
+        {
+            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
range, null, null));
+        }
+        assertEquals(expected, neighbors);
+    }
+
+    @Test
+    public void testGetNeighborsPlusOneInLocalDC() throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        // generate rf+1 nodes, and ensure that all nodes are returned
+        Set<InetAddress> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        expected.remove(FBUtilities.getBroadcastAddress());
+        // remove remote endpoints
+        TokenMetadata.Topology topology = 
tmd.cloneOnlyTokenMap().getTopology();
+        HashSet<InetAddress> localEndpoints = 
Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+        expected = Sets.intersection(expected, localEndpoints);
+
+        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Set<InetAddress> neighbors = new HashSet<>();
+        for (Range<Token> range : ranges)
+        {
+            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+        }
+        assertEquals(expected, neighbors);
+    }
+
+    @Test
+    public void testGetNeighborsTimesTwoInLocalDC() throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        // generate rf*2 nodes, and ensure that only neighbors specified by 
the ARS are returned
+        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
+        Set<InetAddress> expected = new HashSet<>();
+        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+        {
+            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+        }
+        expected.remove(FBUtilities.getBroadcastAddress());
+        // remove remote endpoints
+        TokenMetadata.Topology topology = 
tmd.cloneOnlyTokenMap().getTopology();
+        HashSet<InetAddress> localEndpoints = 
Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+        expected = Sets.intersection(expected, localEndpoints);
+
+        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
+        Set<InetAddress> neighbors = new HashSet<>();
+        for (Range<Token> range : ranges)
+        {
+            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+        }
+        assertEquals(expected, neighbors);
+    }
+
+    @Test
+    public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        // generate rf*2 nodes, and ensure that only neighbors specified by 
the hosts are returned
+        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
+        List<InetAddress> expected = new ArrayList<>();
+        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+        {
+            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+        }
+
+        expected.remove(FBUtilities.getBroadcastAddress());
+        Collection<String> hosts = 
Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
+
+       assertEquals(expected.get(0), 
ActiveRepairService.getNeighbors(KEYSPACE5,
+                                                                      
StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(),
+                                                                      null, 
hosts).iterator().next());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws 
Throwable
+    {
+        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
+        //Dont give local endpoint
+        Collection<String> hosts = Arrays.asList("127.0.0.3");
+        ActiveRepairService.getNeighbors(KEYSPACE5, 
StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), null, 
hosts);
+    }
+
+    Set<InetAddress> addTokens(int max) throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        Set<InetAddress> endpoints = new HashSet<>();
+        for (int i = 1; i <= max; i++)
+        {
+            InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
+            
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), 
endpoint);
+            endpoints.add(endpoint);
+        }
+        return endpoints;
+    }
+}

Reply via email to