http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 6f7297b..422dbdb 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,6 +45,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.consistent.CoordinatorSession;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.QueryState;
@@ -66,11 +68,15 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RepairRunnable.class);
 
-    private StorageService storageService;
+    private final StorageService storageService;
     private final int cmd;
     private final RepairOption options;
     private final String keyspace;
 
+    private final String tag;
+    private final AtomicInteger progress = new AtomicInteger();
+    private final int totalProgress;
+
     private final List<ProgressListener> listeners = new ArrayList<>();
 
     private static final AtomicInteger threadCounter = new AtomicInteger(1);
@@ -81,6 +87,10 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         this.cmd = cmd;
         this.options = options;
         this.keyspace = keyspace;
+
+        this.tag = "repair:" + cmd;
+        // get valid column families, calculate neighbors, validation, prepare 
for repair + number of ranges to repair
+        this.totalProgress = 4 + options.getRanges().size();
     }
 
     @Override
@@ -223,72 +233,35 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             return;
         }
 
-        // Set up RepairJob executor for this repair command.
-        final ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(new 
JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
-                                                                               
                                          Integer.MAX_VALUE,
-                                                                               
                                          TimeUnit.SECONDS,
-                                                                               
                                          new LinkedBlockingQueue<Runnable>(),
-                                                                               
                                          new NamedThreadFactory("Repair#" + 
cmd),
-                                                                               
                                          "internal"));
-
-        List<ListenableFuture<RepairSessionResult>> futures = new 
ArrayList<>(options.getRanges().size());
-        for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : 
commonRanges)
+        if (options.isIncremental())
         {
-            final RepairSession session = 
ActiveRepairService.instance.submitRepairSession(parentSession,
-                                                              p.right,
-                                                              keyspace,
-                                                              
options.getParallelism(),
-                                                              p.left,
-                                                              repairedAt,
-                                                              
options.isPullRepair(),
-                                                              executor,
-                                                              cfnames);
-            if (session == null)
-                continue;
-            // After repair session completes, notify client its result
-            Futures.addCallback(session, new 
FutureCallback<RepairSessionResult>()
-            {
-                public void onSuccess(RepairSessionResult result)
-                {
-                    /**
-                     * If the success message below is modified, it must also 
be updated on
-                     * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
-                     * for backward-compatibility support.
-                     */
-                    String message = String.format("Repair session %s for 
range %s finished", session.getId(),
-                                                   
session.getRanges().toString());
-                    logger.info(message);
-                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.PROGRESS,
-                                                             
progress.incrementAndGet(),
-                                                             totalProgress,
-                                                             message));
-                }
-
-                public void onFailure(Throwable t)
-                {
-                    /**
-                     * If the failure message below is modified, it must also 
be updated on
-                     * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
-                     * for backward-compatibility support.
-                     */
-                    String message = String.format("Repair session %s for 
range %s failed with error %s",
-                                                   session.getId(), 
session.getRanges().toString(), t.getMessage());
-                    logger.error(message, t);
-                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.PROGRESS,
-                                                             
progress.incrementAndGet(),
-                                                             totalProgress,
-                                                             message));
-                }
-            });
-            futures.add(session);
+            consistentRepair(parentSession, repairedAt, startTime, traceState, 
allNeighbors, commonRanges, cfnames);
         }
+        else
+        {
+            normalRepair(parentSession, startTime, traceState, allNeighbors, 
commonRanges, cfnames);
+        }
+    }
+
+    private void normalRepair(UUID parentSession,
+                              long startTime,
+                              TraceState traceState,
+                              Set<InetAddress> allNeighbors,
+                              List<Pair<Set<InetAddress>, ? extends 
Collection<Range<Token>>>> commonRanges,
+                              String... cfnames)
+    {
+
+        // Set up RepairJob executor for this repair command.
+        ListeningExecutorService executor = createExecutor();
+
+        // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the 
repairedAt times to be preserved across streamed sstables
+        final ListenableFuture<List<RepairSessionResult>> allSessions = 
submitRepairSessions(parentSession, ActiveRepairService.UNREPAIRED_SSTABLE, 
false, executor, commonRanges, cfnames);
 
         // After all repair sessions completes(successful or not),
         // run anticompaction if necessary and send finish notice back to 
client
         final Collection<Range<Token>> successfulRanges = new ArrayList<>();
         final AtomicBoolean hasFailure = new AtomicBoolean();
-        final ListenableFuture<List<RepairSessionResult>> allSessions = 
Futures.successfulAsList(futures);
-        ListenableFuture anticompactionResult = Futures.transform(allSessions, 
new AsyncFunction<List<RepairSessionResult>, Object>()
+        ListenableFuture repairResult = Futures.transform(allSessions, new 
AsyncFunction<List<RepairSessionResult>, Object>()
         {
             @SuppressWarnings("unchecked")
             public ListenableFuture apply(List<RepairSessionResult> results)
@@ -305,57 +278,188 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                         hasFailure.compareAndSet(false, true);
                     }
                 }
-                return 
ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, 
successfulRanges);
+                return Futures.immediateFuture(null);
             }
         });
-        Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
+        Futures.addCallback(repairResult, new 
RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, 
hasFailure, executor));
+    }
+
+    private void consistentRepair(UUID parentSession,
+                                  long repairedAt,
+                                  long startTime,
+                                  TraceState traceState,
+                                  Set<InetAddress> allNeighbors,
+                                  List<Pair<Set<InetAddress>, ? extends 
Collection<Range<Token>>>> commonRanges,
+                                  String... cfnames)
+    {
+        // the local node also needs to be included in the set of
+        // participants, since coordinator sessions aren't persisted
+        Set<InetAddress> allParticipants = new HashSet<>(allNeighbors);
+        allParticipants.add(FBUtilities.getBroadcastAddress());
+
+        CoordinatorSession coordinatorSession = 
ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession,
 allParticipants);
+        ListeningExecutorService executor = createExecutor();
+        AtomicBoolean hasFailure = new AtomicBoolean(false);
+        ListenableFuture repairResult = coordinatorSession.execute(executor,
+                                                                   () -> 
submitRepairSessions(parentSession, repairedAt, true, executor, commonRanges, 
cfnames),
+                                                                   hasFailure);
+        Collection<Range<Token>> ranges = new HashSet<>();
+        for (Collection<Range<Token>> range : 
Iterables.transform(commonRanges, cr -> cr.right))
+        {
+            ranges.addAll(range);
+        }
+        Futures.addCallback(repairResult, new 
RepairCompleteCallback(parentSession, ranges, startTime, traceState, 
hasFailure, executor));
+    }
+
+    private ListenableFuture<List<RepairSessionResult>> 
submitRepairSessions(UUID parentSession,
+                                                                             
long repairedAt,
+                                                                             
boolean isConsistent,
+                                                                             
ListeningExecutorService executor,
+                                                                             
List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges,
+                                                                             
String... cfnames)
+    {
+        List<ListenableFuture<RepairSessionResult>> futures = new 
ArrayList<>(options.getRanges().size());
+        for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : 
commonRanges)
+        {
+            RepairSession session = 
ActiveRepairService.instance.submitRepairSession(parentSession,
+                                                                               
      p.right,
+                                                                               
      keyspace,
+                                                                               
      options.getParallelism(),
+                                                                               
      p.left,
+                                                                               
      repairedAt,
+                                                                               
      isConsistent,
+                                                                               
      options.isPullRepair(),
+                                                                               
      executor,
+                                                                               
      cfnames);
+            if (session == null)
+                continue;
+            Futures.addCallback(session, new RepairSessionCallback(session));
+            futures.add(session);
+        }
+        return Futures.successfulAsList(futures);
+    }
+
+    private ListeningExecutorService createExecutor()
+    {
+        return MoreExecutors.listeningDecorator(new 
JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
+                                                                               
       Integer.MAX_VALUE,
+                                                                               
       TimeUnit.SECONDS,
+                                                                               
       new LinkedBlockingQueue<>(),
+                                                                               
       new NamedThreadFactory("Repair#" + cmd),
+                                                                               
       "internal"));
+    }
+
+    private class RepairSessionCallback implements 
FutureCallback<RepairSessionResult>
+    {
+        private final RepairSession session;
+
+        public RepairSessionCallback(RepairSession session)
+        {
+            this.session = session;
+        }
+
+        public void onSuccess(RepairSessionResult result)
+        {
+            /**
+             * If the success message below is modified, it must also be 
updated on
+             * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+             * for backward-compatibility support.
+             */
+            String message = String.format("Repair session %s for range %s 
finished", session.getId(),
+                                           session.getRanges().toString());
+            logger.info(message);
+            fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.PROGRESS,
+                                                     
progress.incrementAndGet(),
+                                                     totalProgress,
+                                                     message));
+        }
+
+        public void onFailure(Throwable t)
+        {
+            /**
+             * If the failure message below is modified, it must also be 
updated on
+             * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+             * for backward-compatibility support.
+             */
+            String message = String.format("Repair session %s for range %s 
failed with error %s",
+                                           session.getId(), 
session.getRanges().toString(), t.getMessage());
+            logger.error(message, t);
+            fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR,
+                                                     
progress.incrementAndGet(),
+                                                     totalProgress,
+                                                     message));
+        }
+    }
+
+    private class RepairCompleteCallback implements FutureCallback<Object>
+    {
+        final UUID parentSession;
+        final Collection<Range<Token>> successfulRanges;
+        final long startTime;
+        final TraceState traceState;
+        final AtomicBoolean hasFailure;
+        final ExecutorService executor;
+
+        public RepairCompleteCallback(UUID parentSession,
+                                      Collection<Range<Token>> 
successfulRanges,
+                                      long startTime,
+                                      TraceState traceState,
+                                      AtomicBoolean hasFailure,
+                                      ExecutorService executor)
         {
-            public void onSuccess(Object result)
+            this.parentSession = parentSession;
+            this.successfulRanges = successfulRanges;
+            this.startTime = startTime;
+            this.traceState = traceState;
+            this.hasFailure = hasFailure;
+            this.executor = executor;
+        }
+
+        public void onSuccess(Object result)
+        {
+            SystemDistributedKeyspace.successfulParentRepair(parentSession, 
successfulRanges);
+            if (hasFailure.get())
             {
-                
SystemDistributedKeyspace.successfulParentRepair(parentSession, 
successfulRanges);
-                if (hasFailure.get())
-                {
-                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
-                                                             "Some repair 
failed"));
-                }
-                else
-                {
-                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
-                                                             "Repair completed 
successfully"));
-                }
-                repairComplete();
+                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
+                                                         "Some repair 
failed"));
             }
-
-            public void onFailure(Throwable t)
+            else
             {
-                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, 
t.getMessage()));
-                SystemDistributedKeyspace.failParentRepair(parentSession, t);
-                repairComplete();
+                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
+                                                         "Repair completed 
successfully"));
             }
+            repairComplete();
+        }
 
-            private void repairComplete()
+        public void onFailure(Throwable t)
+        {
+            fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, 
progress.get(), totalProgress, t.getMessage()));
+            SystemDistributedKeyspace.failParentRepair(parentSession, t);
+            repairComplete();
+        }
+
+        private void repairComplete()
+        {
+            String duration = 
DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
+                                                                      true, 
true);
+            String message = String.format("Repair command #%d finished in 
%s", cmd, duration);
+            fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, 
message));
+            logger.info(message);
+            if (options.isTraced() && traceState != null)
             {
-                String duration = 
DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
-                                                                          
true, true);
-                String message = String.format("Repair command #%d finished in 
%s", cmd, duration);
-                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, 
message));
-                logger.info(message);
-                if (options.isTraced() && traceState != null)
-                {
-                    for (ProgressListener listener : listeners)
-                        traceState.removeProgressListener(listener);
-                    // Because DebuggableThreadPoolExecutor#afterExecute and 
this callback
-                    // run in a nondeterministic order (within the same 
thread), the
-                    // TraceState may have been nulled out at this point. The 
TraceState
-                    // should be traceState, so just set it without bothering 
to check if it
-                    // actually was nulled out.
-                    Tracing.instance.set(traceState);
-                    Tracing.traceRepair(message);
-                    Tracing.instance.stopSession();
-                }
-                executor.shutdownNow();
+                for (ProgressListener listener : listeners)
+                    traceState.removeProgressListener(listener);
+                // Because DebuggableThreadPoolExecutor#afterExecute and this 
callback
+                // run in a nondeterministic order (within the same thread), 
the
+                // TraceState may have been nulled out at this point. The 
TraceState
+                // should be traceState, so just set it without bothering to 
check if it
+                // actually was nulled out.
+                Tracing.instance.set(traceState);
+                Tracing.traceRepair(message);
+                Tracing.instance.stopSession();
             }
-        });
+            executor.shutdownNow();
+        }
     }
 
     private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends 
Collection<Range<Token>>>> neighborRangeList, Range<Token> range, 
Set<InetAddress> neighbors)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 00340a1..43a9bfb 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -91,6 +91,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
     public final Collection<Range<Token>> ranges;
     public final Set<InetAddress> endpoints;
     public final long repairedAt;
+    public final boolean isConsistent;
 
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
@@ -124,6 +125,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
                          RepairParallelism parallelismDegree,
                          Set<InetAddress> endpoints,
                          long repairedAt,
+                         boolean isConsistent,
                          boolean pullRepair,
                          String... cfnames)
     {
@@ -137,6 +139,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         this.ranges = ranges;
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
+        this.isConsistent = isConsistent;
         this.pullRepair = pullRepair;
     }
 
@@ -256,7 +259,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         List<ListenableFuture<RepairResult>> jobs = new 
ArrayList<>(cfnames.length);
         for (String cfname : cfnames)
         {
-            RepairJob job = new RepairJob(this, cfname);
+            RepairJob job = new RepairJob(this, cfname, isConsistent);
             executor.execute(job);
             jobs.add(job);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index f5b2b1d..f24a79a 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -43,12 +43,14 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
     private final RepairJobDesc desc;
     private final SyncRequest request;
     private final long repairedAt;
+    private final boolean isConsistent;
 
-    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, long 
repairedAt)
+    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, long 
repairedAt, boolean isConsistent)
     {
         this.desc = desc;
         this.request = request;
         this.repairedAt = repairedAt;
+        this.isConsistent = isConsistent;
     }
 
     public void run()
@@ -62,7 +64,7 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
             ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
             isIncremental = prs.isIncremental;
         }
-        new StreamPlan("Repair", repairedAt, 1, false, isIncremental, 
false).listeners(this)
+        new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, 
isConsistent ? desc.parentSessionId : null).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote 
node
                                             .requestRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index fc7aab4..6ebd756 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -173,7 +173,8 @@ public final class SystemDistributedKeyspace
         PrintWriter pw = new PrintWriter(sw);
         t.printStackTrace(pw);
         String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, 
parent_id.toString());
-        processSilent(fmtQuery, t.getMessage(), sw.toString());
+        String message = t.getMessage();
+        processSilent(fmtQuery, message != null ? message : "", sw.toString());
     }
 
     public static void successfulParentRepair(UUID parent_id, 
Collection<Range<Token>> successfulRanges)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java 
b/src/java/org/apache/cassandra/repair/Validator.java
index e20995e..e8e3621 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -59,6 +59,7 @@ public class Validator implements Runnable
     public final InetAddress initiator;
     public final int gcBefore;
     private final boolean evenTreeDistribution;
+    public final boolean isConsistent;
 
     // null when all rows with the min token have been consumed
     private long validated;
@@ -72,14 +73,20 @@ public class Validator implements Runnable
 
     public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
     {
-        this(desc, initiator, gcBefore, false);
+        this(desc, initiator, gcBefore, false, false);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
boolean evenTreeDistribution)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
boolean isConsistent)
+    {
+        this(desc, initiator, gcBefore, false, isConsistent);
+    }
+
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
boolean evenTreeDistribution, boolean isConsistent)
     {
         this.desc = desc;
         this.initiator = initiator;
         this.gcBefore = gcBefore;
+        this.isConsistent = isConsistent;
         validated = 0;
         range = null;
         ranges = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java 
b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
new file mode 100644
index 0000000..9b1fec9
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.repair.messages.PrepareMessage;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.repair.messages.StatusRequest;
+import org.apache.cassandra.repair.messages.StatusResponse;
+import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.tools.nodetool.RepairAdmin;
+
+/**
+ * Base class for consistent Local and Coordinator sessions
+ *
+ * <p/>
+ * There are 4 stages to a consistent incremental repair.
+ *
+ * <h1>Repair prepare</h1>
+ *  First, the normal {@link ActiveRepairService#prepareForRepair(UUID, 
InetAddress, Set, RepairOption, List)} stuff
+ *  happens, which sends out {@link PrepareMessage} and creates a {@link 
ActiveRepairService.ParentRepairSession}
+ *  on the coordinator and each of the neighbors.
+ *
+ * <h1>Consistent prepare</h1>
+ *  The consistent prepare step promotes the parent repair session to a 
consistent session, and isolates the sstables
+ *  being repaired other sstables. First, the coordinator sends a {@link 
PrepareConsistentRequest} message to each repair
+ *  participant (including itself). When received, the node creates a {@link 
LocalSession} instance, sets it's state to
+ *  {@code PREPARING}, persists it, and begins a {@link PendingAntiCompaction} 
task. When the pending anti compaction
+ *  completes, the session state is set to {@code PREPARED}, and a {@link 
PrepareConsistentResponse} is sent to the
+ *  coordinator indicating success or failure. If the pending anti-compaction 
fails, the local session state is set
+ *  to {@code FAILED}.
+ *  <p/>
+ *  (see {@link LocalSessions#handlePrepareMessage(InetAddress, 
PrepareConsistentRequest)}
+ *  <p/>
+ *  Once the coordinator recieves positive {@code PrepareConsistentResponse} 
messages from all the participants, the
+ *  coordinator begins the normal repair process.
+ *  <p/>
+ *  (see {@link CoordinatorSession#handlePrepareResponse(InetAddress, boolean)}
+ *
+ * <h1>Repair</h1>
+ *  The coordinator runs the normal data repair process against the sstables 
segregated in the previous step. When a
+ *  node recieves a {@link ValidationRequest}, it sets it's local session 
state to {@code REPAIRING}.
+ *  <p/>
+ *
+ *  If all of the RepairSessions complete successfully, the coordinator begins 
the {@code Finalization} process. Otherwise,
+ *  it begins the {@code Failure} process.
+ *
+ * <h1>Finalization</h1>
+ *  The finalization step finishes the session and promotes the sstables to 
repaired. The coordinator begins by sending
+ *  {@link FinalizePropose} messages to each of the participants. Each 
participant will set it's state to {@code FINALIZE_PROMISED}
+ *  and respond with a {@link FinalizePromise} message. Once the coordinator 
has received promise messages from all participants,
+ *  it will send a {@link FinalizeCommit} message to all of them, ending the 
coordinator session. When a node receives the
+ *  {@code FinalizeCommit} message, it will set it's sessions state to {@code 
FINALIZED}, completing the {@code LocalSession}.
+ *  <p/>
+ *
+ *  For the sake of simplicity, finalization does not immediately mark pending 
repair sstables repaired because of potential
+ *  conflicts with in progress compactions. The sstables will be marked 
repaired as part of the normal compaction process.
+ *  <p/>
+ *
+ *  On the coordinator side, see {@link 
CoordinatorSession#finalizePropose(Executor)}, {@link 
CoordinatorSession#handleFinalizePromise(InetAddress, boolean)},
+ *  & {@link CoordinatorSession#finalizeCommit(Executor)}
+ *  <p/>
+ *
+ *  On the local session side, see {@link 
LocalSessions#handleFinalizeProposeMessage(InetAddress, FinalizePropose)}
+ *  & {@link LocalSessions#handleFinalizeCommitMessage(InetAddress, 
FinalizeCommit)}
+ *
+ * <h1>Failure</h1>
+ *  If there are any failures or problems during the process above, the 
session will be failed. When a session is failed,
+ *  the coordinator will send {@link FailSession} messages to each of the 
participants. In some cases (basically those not
+ *  including Validation and Sync) errors are reported back to the coordinator 
by the local session, at which point, it
+ *  will send {@code FailSession} messages out.
+ *  <p/>
+ *  Just as with finalization, sstables aren't immediately moved back to 
unrepaired, but will be demoted as part of the
+ *  normal compaction process.
+ *
+ *  <p/>
+ *  See {@link LocalSessions#failSession(UUID, boolean)} and {@link 
CoordinatorSession#fail()}
+ *
+ * <h1>Failure Recovery & Session Cleanup</h1>
+ *  There are a few scenarios where sessions can get stuck. If a node fails 
mid session, or it misses a {@code FailSession}
+ *  or {@code FinalizeCommit} message, it will never finish. To address this, 
there is a cleanup task that runs every
+ *  10 minutes that attempts to complete idle sessions.
+ *
+ *  <p/>
+ *  If a session is not completed (not {@code FINALIZED} or {@code FAILED}) 
and there's been no activity on the session for
+ *  over an hour, the cleanup task will attempt to finish the session by 
learning the session state of the other participants.
+ *  To do this, it sends a {@link StatusRequest} message to the other session 
participants. The participants respond with a
+ *  {@link StatusResponse} message, notifying the sender of their state. If 
the sender receives a {@code FAILED} response
+ *  from any of the participants, it fails the session locally. If it receives 
a {@code FINALIZED} response from any of the
+ *  participants, it will set it's state to {@code FINALIZED} as well. Since 
the coordinator won't finalize sessions until
+ *  it's received {@code FinalizePromise} messages from <i>all</i> 
participants, this is safe.
+ *
+ *
+ *  <p/>
+ *  If a session is not completed, and hasn't had any activity for over a day, 
the session is auto-failed.
+ *
+ *  <p/>
+ *  Once a session has been completed for over 2 days, it's deleted.
+ *
+ *  <p/>
+ *  Operators can also manually fail sessions with {@code nodetool 
repair_admin --cancel}
+ *
+ *  <p/>
+ *  See {@link LocalSessions#cleanup()} and {@link RepairAdmin}
+ *
+ */
+public abstract class ConsistentSession
+{
+    /**
+     * The possible states of a {@code ConsistentSession}. The typical 
progression is {@link State#PREPARING}, {@link State#PREPARED},
+     * {@link State#REPAIRING}, {@link State#FINALIZE_PROMISED}, and {@link 
State#FINALIZED}. With the exception of {@code FINALIZED},
+     * any state can be transitions to {@link State#FAILED}.
+     */
+    public enum State
+    {
+        PREPARING(0),
+        PREPARED(1),
+        REPAIRING(2),
+        FINALIZE_PROMISED(3),
+        FINALIZED(4),
+        FAILED(5);
+
+        State(int expectedOrdinal)
+        {
+            assert ordinal() == expectedOrdinal;
+        }
+
+        private static final Map<State, Set<State>> transitions = new 
EnumMap<State, Set<State>>(State.class) {{
+            put(PREPARING, ImmutableSet.of(PREPARED, FAILED));
+            put(PREPARED, ImmutableSet.of(REPAIRING, FAILED));
+            put(REPAIRING, ImmutableSet.of(FINALIZE_PROMISED, FAILED));
+            put(FINALIZE_PROMISED, ImmutableSet.of(FINALIZED, FAILED));
+            put(FINALIZED, ImmutableSet.of());
+            put(FAILED, ImmutableSet.of(FAILED));
+        }};
+
+        public boolean canTransitionTo(State state)
+        {
+            return transitions.get(this).contains(state);
+        }
+
+        public static State valueOf(int ordinal)
+        {
+            return values()[ordinal];
+        }
+    }
+
+    private volatile State state;
+    public final UUID sessionID;
+    public final InetAddress coordinator;
+    public final ImmutableSet<TableId> tableIds;
+    public final long repairedAt;
+    public final ImmutableSet<Range<Token>> ranges;
+    public final ImmutableSet<InetAddress> participants;
+
+    ConsistentSession(AbstractBuilder builder)
+    {
+        builder.validate();
+        this.state = builder.state;
+        this.sessionID = builder.sessionID;
+        this.coordinator = builder.coordinator;
+        this.tableIds = ImmutableSet.copyOf(builder.ids);
+        this.repairedAt = builder.repairedAt;
+        this.ranges = ImmutableSet.copyOf(builder.ranges);
+        this.participants = ImmutableSet.copyOf(builder.participants);
+    }
+
+    public State getState()
+    {
+        return state;
+    }
+
+    public void setState(State state)
+    {
+        this.state = state;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ConsistentSession that = (ConsistentSession) o;
+
+        if (repairedAt != that.repairedAt) return false;
+        if (state != that.state) return false;
+        if (!sessionID.equals(that.sessionID)) return false;
+        if (!coordinator.equals(that.coordinator)) return false;
+        if (!tableIds.equals(that.tableIds)) return false;
+        if (!ranges.equals(that.ranges)) return false;
+        return participants.equals(that.participants);
+    }
+
+    public int hashCode()
+    {
+        int result = state.hashCode();
+        result = 31 * result + sessionID.hashCode();
+        result = 31 * result + coordinator.hashCode();
+        result = 31 * result + tableIds.hashCode();
+        result = 31 * result + (int) (repairedAt ^ (repairedAt >>> 32));
+        result = 31 * result + ranges.hashCode();
+        result = 31 * result + participants.hashCode();
+        return result;
+    }
+
+    public String toString()
+    {
+        return "ConsistentSession{" +
+               "state=" + state +
+               ", sessionID=" + sessionID +
+               ", coordinator=" + coordinator +
+               ", tableIds=" + tableIds +
+               ", repairedAt=" + repairedAt +
+               ", ranges=" + ranges +
+               ", participants=" + participants +
+               '}';
+    }
+
+    abstract static class AbstractBuilder
+    {
+        private State state;
+        private UUID sessionID;
+        private InetAddress coordinator;
+        private Set<TableId> ids;
+        private long repairedAt;
+        private Collection<Range<Token>> ranges;
+        private Set<InetAddress> participants;
+
+        void withState(State state)
+        {
+            this.state = state;
+        }
+
+        void withSessionID(UUID sessionID)
+        {
+            this.sessionID = sessionID;
+        }
+
+        void withCoordinator(InetAddress coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        void withUUIDTableIds(Iterable<UUID> ids)
+        {
+            this.ids = ImmutableSet.copyOf(Iterables.transform(ids, 
TableId::fromUUID));
+        }
+
+        void withTableIds(Set<TableId> ids)
+        {
+            this.ids = ids;
+        }
+
+        void withRepairedAt(long repairedAt)
+        {
+            this.repairedAt = repairedAt;
+        }
+
+        void withRanges(Collection<Range<Token>> ranges)
+        {
+            this.ranges = ranges;
+        }
+
+        void withParticipants(Set<InetAddress> peers)
+        {
+            this.participants = peers;
+        }
+
+        void validate()
+        {
+            Preconditions.checkArgument(state != null);
+            Preconditions.checkArgument(sessionID != null);
+            Preconditions.checkArgument(coordinator != null);
+            Preconditions.checkArgument(ids != null);
+            Preconditions.checkArgument(!ids.isEmpty());
+            Preconditions.checkArgument(repairedAt > 0);
+            Preconditions.checkArgument(ranges != null);
+            Preconditions.checkArgument(!ranges.isEmpty());
+            Preconditions.checkArgument(participants != null);
+            Preconditions.checkArgument(!participants.isEmpty());
+            Preconditions.checkArgument(participants.contains(coordinator));
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java 
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
new file mode 100644
index 0000000..ef3eacd
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.RepairSessionResult;
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.service.ActiveRepairService;
+
+/**
+ * Coordinator side logic and state of a consistent repair session. Like 
{@link ActiveRepairService.ParentRepairSession},
+ * there is only one {@code CoordinatorSession} per user repair command, 
regardless of the number of tables and token
+ * ranges involved.
+ */
+public class CoordinatorSession extends ConsistentSession
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorSession.class);
+
+    private final Map<InetAddress, State> participantStates = new HashMap<>();
+    private final SettableFuture<Boolean> prepareFuture = 
SettableFuture.create();
+    private final SettableFuture<Boolean> finalizeProposeFuture = 
SettableFuture.create();
+
+
+    public CoordinatorSession(Builder builder)
+    {
+        super(builder);
+        for (InetAddress participant : participants)
+        {
+            participantStates.put(participant, State.PREPARING);
+        }
+    }
+
+    public static class Builder extends AbstractBuilder
+    {
+        public CoordinatorSession build()
+        {
+            validate();
+            return new CoordinatorSession(this);
+        }
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public void setState(State state)
+    {
+        logger.debug("Setting coordinator state to {} for repair {}", state, 
sessionID);
+        super.setState(state);
+    }
+
+    public synchronized void setParticipantState(InetAddress participant, 
State state)
+    {
+        logger.debug("Setting participant {} to state {} for repair {}", 
participant, state, sessionID);
+        Preconditions.checkArgument(participantStates.containsKey(participant),
+                                    "Session %s doesn't include %s",
+                                    sessionID, participant);
+        
Preconditions.checkArgument(participantStates.get(participant).canTransitionTo(state),
+                                    "Invalid state transition %s -> %s",
+                                    participantStates.get(participant), state);
+        participantStates.put(participant, state);
+
+        // update coordinator state if all participants are at the value being 
set
+        if (Iterables.all(participantStates.values(), s -> s == state))
+        {
+            setState(state);
+        }
+    }
+
+    synchronized void setAll(State state)
+    {
+        for (InetAddress participant : participants)
+        {
+            setParticipantState(participant, state);
+        }
+    }
+
+    synchronized boolean allStates(State state)
+    {
+        return getState() == state && 
Iterables.all(participantStates.values(), v -> v == state);
+    }
+
+    synchronized boolean hasFailed()
+    {
+        return getState() == State.FAILED || 
Iterables.any(participantStates.values(), v -> v == State.FAILED);
+    }
+
+    protected void sendMessage(InetAddress destination, RepairMessage message)
+    {
+        MessageOut<RepairMessage> messageOut = new 
MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, 
RepairMessage.serializer);
+        MessagingService.instance().sendOneWay(messageOut, destination);
+    }
+
+    public ListenableFuture<Boolean> prepare(Executor executor)
+    {
+        Preconditions.checkArgument(allStates(State.PREPARING));
+
+        logger.debug("Sending PrepareConsistentRequest message to {}", 
participants);
+        PrepareConsistentRequest message = new 
PrepareConsistentRequest(sessionID, coordinator, participants);
+        for (final InetAddress participant : participants)
+        {
+            executor.execute(() -> sendMessage(participant, message));
+        }
+        return prepareFuture;
+    }
+
+    public synchronized void handlePrepareResponse(InetAddress participant, 
boolean success)
+    {
+        if (getState() == State.FAILED)
+        {
+            logger.debug("Consistent repair {} has failed, ignoring prepare 
response from {}", sessionID, participant);
+        }
+        else if (!success)
+        {
+            logger.debug("Failed prepare response received from {} for session 
{}", participant, sessionID);
+            fail();
+            prepareFuture.set(false);
+        }
+        else
+        {
+            logger.debug("Successful prepare response received from {} for 
session {}", participant, sessionID);
+            setParticipantState(participant, State.PREPARED);
+            if (getState() == State.PREPARED)
+            {
+                prepareFuture.set(true);
+            }
+        }
+    }
+
+    public synchronized void setRepairing()
+    {
+        setAll(State.REPAIRING);
+    }
+
+    public synchronized ListenableFuture<Boolean> finalizePropose(Executor 
executor)
+    {
+        Preconditions.checkArgument(allStates(State.REPAIRING));
+        logger.debug("Sending FinalizePropose message to {}", participants);
+        FinalizePropose message = new FinalizePropose(sessionID);
+        for (final InetAddress participant : participants)
+        {
+            executor.execute(() -> sendMessage(participant, message));
+        }
+        return finalizeProposeFuture;
+    }
+
+    public synchronized void handleFinalizePromise(InetAddress participant, 
boolean success)
+    {
+        if (getState() == State.FAILED)
+        {
+            logger.debug("Consistent repair {} has failed, ignoring finalize 
promise from {}", sessionID, participant);
+        }
+        else if (!success)
+        {
+            logger.debug("Failed finalize promise received from {} for session 
{}", participant, sessionID);
+            fail();
+            finalizeProposeFuture.set(false);
+        }
+        else
+        {
+            logger.debug("Successful finalize promise received from {} for 
session {}", participant, sessionID);
+            setParticipantState(participant, State.FINALIZE_PROMISED);
+            if (getState() == State.FINALIZE_PROMISED)
+            {
+                finalizeProposeFuture.set(true);
+            }
+        }
+    }
+
+    public synchronized void finalizeCommit(Executor executor)
+    {
+        Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED));
+        logger.debug("Sending FinalizeCommit message to {}", participants);
+        FinalizeCommit message = new FinalizeCommit(sessionID);
+        for (final InetAddress participant : participants)
+        {
+            executor.execute(() -> sendMessage(participant, message));
+        }
+        setAll(State.FINALIZED);
+    }
+
+    public void fail()
+    {
+        fail(MoreExecutors.directExecutor());
+    }
+
+    public synchronized void fail(Executor executor)
+    {
+        logger.debug("Failing session {}", sessionID);
+        FailSession message = new FailSession(sessionID);
+        for (final InetAddress participant : participants)
+        {
+            if (participantStates.get(participant) != State.FAILED)
+            {
+                executor.execute(() -> sendMessage(participant, message));
+            }
+        }
+        setAll(State.FAILED);
+    }
+
+    /**
+     * Runs the asynchronous consistent repair session. Actual repair sessions 
are scheduled via a submitter to make unit testing easier
+     */
+    public ListenableFuture execute(Executor executor, 
Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSubmitter, 
AtomicBoolean hasFailure)
+    {
+        logger.debug("Executing consistent repair {}", sessionID);
+
+        ListenableFuture<Boolean> prepareResult = prepare(executor);
+
+        // run repair sessions normally
+        ListenableFuture<List<RepairSessionResult>> repairSessionResults = 
Futures.transform(prepareResult, new AsyncFunction<Boolean, 
List<RepairSessionResult>>()
+        {
+            public ListenableFuture<List<RepairSessionResult>> apply(Boolean 
success) throws Exception
+            {
+                if (success)
+                {
+                    setRepairing();
+                    return sessionSubmitter.get();
+                }
+                else
+                {
+                    return Futures.immediateFuture(null);
+                }
+
+            }
+        });
+
+        // mark propose finalization
+        ListenableFuture<Boolean> proposeFuture = 
Futures.transform(repairSessionResults, new 
AsyncFunction<List<RepairSessionResult>, Boolean>()
+        {
+            public ListenableFuture<Boolean> apply(List<RepairSessionResult> 
results) throws Exception
+            {
+                if (results == null || results.isEmpty() || 
Iterables.any(results, r -> r == null))
+                {
+                    return Futures.immediateFailedFuture(new 
RuntimeException());
+                }
+                else
+                {
+                    return finalizePropose(executor);
+                }
+            }
+        });
+
+        // commit repaired data
+        Futures.addCallback(proposeFuture, new FutureCallback<Boolean>()
+        {
+            public void onSuccess(@Nullable Boolean result)
+            {
+                if (result != null && result)
+                {
+                    finalizeCommit(executor);
+                }
+                else
+                {
+                    hasFailure.set(true);
+                    fail(executor);
+                }
+            }
+
+            public void onFailure(Throwable t)
+            {
+                hasFailure.set(true);
+                fail(executor);
+            }
+        });
+
+        return proposeFuture;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
new file mode 100644
index 0000000..211e0c1
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.service.ActiveRepairService;
+
+/**
+ * Container for all consistent repair sessions a node is coordinating
+ */
+public class CoordinatorSessions
+{
+    private final Map<UUID, CoordinatorSession> sessions = new HashMap<>();
+
+    protected CoordinatorSession buildSession(CoordinatorSession.Builder 
builder)
+    {
+        return new CoordinatorSession(builder);
+    }
+
+    public synchronized CoordinatorSession registerSession(UUID sessionId, 
Set<InetAddress> participants)
+    {
+        Preconditions.checkArgument(!sessions.containsKey(sessionId), "A 
coordinator already exists for session %s", sessionId);
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionId);
+        CoordinatorSession.Builder builder = CoordinatorSession.builder();
+        builder.withState(ConsistentSession.State.PREPARING);
+        builder.withSessionID(sessionId);
+        builder.withCoordinator(prs.coordinator);
+
+        builder.withTableIds(prs.getTableIds());
+        builder.withRepairedAt(prs.repairedAt);
+        builder.withRanges(prs.getRanges());
+        builder.withParticipants(participants);
+        CoordinatorSession session = buildSession(builder);
+        sessions.put(session.sessionID, session);
+        return session;
+    }
+
+    public synchronized CoordinatorSession getSession(UUID sessionId)
+    {
+        return sessions.get(sessionId);
+    }
+
+    public void handlePrepareResponse(PrepareConsistentResponse msg)
+    {
+        CoordinatorSession session = getSession(msg.parentSession);
+        if (session != null)
+        {
+            session.handlePrepareResponse(msg.participant, msg.success);
+        }
+    }
+
+    public void handleFinalizePromiseMessage(FinalizePromise msg)
+    {
+        CoordinatorSession session = getSession(msg.sessionID);
+        if (session != null)
+        {
+            session.handleFinalizePromise(msg.participant, msg.promised);
+        }
+    }
+
+    public void handleFailSessionMessage(FailSession msg)
+    {
+        CoordinatorSession session = getSession(msg.sessionID);
+        if (session != null)
+        {
+            session.fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/LocalSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSession.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java
new file mode 100644
index 0000000..9d8b4bd
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Basically just a record of a local session. All of the local session logic 
is implemented in {@link LocalSessions}
+ */
+public class LocalSession extends ConsistentSession
+{
+    public final int startedAt;
+    private volatile int lastUpdate;
+
+    public LocalSession(Builder builder)
+    {
+        super(builder);
+        this.startedAt = builder.startedAt;
+        this.lastUpdate = builder.lastUpdate;
+    }
+
+    public boolean isCompleted()
+    {
+        State s = getState();
+        return s == State.FINALIZED || s == State.FAILED;
+    }
+
+    public int getStartedAt()
+    {
+        return startedAt;
+    }
+
+    public int getLastUpdate()
+    {
+        return lastUpdate;
+    }
+
+    public void setLastUpdate()
+    {
+        lastUpdate = FBUtilities.nowInSeconds();
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+
+        LocalSession session = (LocalSession) o;
+
+        if (startedAt != session.startedAt) return false;
+        return lastUpdate == session.lastUpdate;
+    }
+
+    public int hashCode()
+    {
+        int result = super.hashCode();
+        result = 31 * result + startedAt;
+        result = 31 * result + lastUpdate;
+        return result;
+    }
+
+    public String toString()
+    {
+        return "LocalSession{" +
+               "state=" + getState() +
+               ", sessionID=" + sessionID +
+               ", coordinator=" + coordinator +
+               ", tableIds=" + tableIds +
+               ", repairedAt=" + repairedAt +
+               ", ranges=" + ranges +
+               ", participants=" + participants +
+               ", startedAt=" + startedAt +
+               ", lastUpdate=" + lastUpdate +
+               '}';
+    }
+
+    public static class Builder extends AbstractBuilder
+    {
+        private int startedAt;
+        private int lastUpdate;
+
+        public void withStartedAt(int startedAt)
+        {
+            this.startedAt = startedAt;
+        }
+
+        public void withLastUpdate(int lastUpdate)
+        {
+            this.lastUpdate = lastUpdate;
+        }
+
+        void validate()
+        {
+            super.validate();
+            Preconditions.checkArgument(startedAt > 0);
+            Preconditions.checkArgument(lastUpdate > 0);
+        }
+
+        public LocalSession build()
+        {
+            validate();
+            return new LocalSession(this);
+        }
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
new file mode 100644
index 0000000..903aeb5
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * helper for JMX management functions
+ */
+public class LocalSessionInfo
+{
+    public static final String SESSION_ID = "SESSION_ID";
+    public static final String STATE = "STATE";
+    public static final String STARTED = "STARTED";
+    public static final String LAST_UPDATE = "LAST_UPDATE";
+    public static final String COORDINATOR = "COORDINATOR";
+    public static final String PARTICIPANTS = "PARTICIPANTS";
+    public static final String TABLES = "TABLES";
+
+
+    private LocalSessionInfo() {}
+
+    private static String tableString(TableId id)
+    {
+        TableMetadata meta = Schema.instance.getTableMetadata(id);
+        return meta != null ? meta.keyspace + '.' + meta.name : "<null>";
+    }
+
+    static Map<String, String> sessionToMap(LocalSession session)
+    {
+        Map<String, String> m = new HashMap<>();
+        m.put(SESSION_ID, session.sessionID.toString());
+        m.put(STATE, session.getState().toString());
+        m.put(STARTED, Integer.toString(session.getStartedAt()));
+        m.put(LAST_UPDATE, Integer.toString(session.getLastUpdate()));
+        m.put(COORDINATOR, session.coordinator.toString());
+        m.put(PARTICIPANTS, 
Joiner.on(',').join(Iterables.transform(session.participants, 
InetAddress::toString)));
+        m.put(TABLES, 
Joiner.on(',').join(Iterables.transform(session.tableIds, 
LocalSessionInfo::tableString)));
+
+        return m;
+    }
+}

Reply via email to