http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
new file mode 100644
index 0000000..d10b9c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.StatusRequest;
+import org.apache.cassandra.repair.messages.StatusResponse;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
+
+/**
+ * Manages all consistent repair sessions a node is participating in.
+ * <p/>
+ * Since sessions need to be loaded, and since we need to handle cases where 
sessions might not exist, most of the logic
+ * around local sessions is implemented in this class, with the LocalSession 
class being treated more like a simple struct,
+ * in contrast with {@link CoordinatorSession}
+ */
+public class LocalSessions
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(LocalSessions.class);
+
+    /**
+     * Amount of time a session can go without any activity before we start 
checking the status of other
+     * participants to see if we've missed a message
+     */
+    static final int CHECK_STATUS_TIMEOUT = 
Integer.getInteger("cassandra.repair_status_check_timeout_seconds",
+                                                               
Ints.checkedCast(TimeUnit.HOURS.toSeconds(1)));
+
+    /**
+     * Amount of time a session can go without any activity before being 
automatically set to FAILED
+     */
+    static final int AUTO_FAIL_TIMEOUT = 
Integer.getInteger("cassandra.repair_fail_timeout_seconds",
+                                                            
Ints.checkedCast(TimeUnit.DAYS.toSeconds(1)));
+
+    /**
+     * Amount of time a completed session is kept around after completion 
before being deleted, this gives
+     * compaction plenty of time to move sstables from successful sessions 
into the repaired bucket
+     */
+    static final int AUTO_DELETE_TIMEOUT = 
Integer.getInteger("cassandra.repair_delete_timeout_seconds",
+                                                              
Ints.checkedCast(TimeUnit.DAYS.toSeconds(1)));
+    /**
+     * How often LocalSessions.cleanup is run
+     */
+    public static final int CLEANUP_INTERVAL = 
Integer.getInteger("cassandra.repair_cleanup_interval_seconds",
+                                                                  
Ints.checkedCast(TimeUnit.MINUTES.toSeconds(10)));
+
+    private static Set<TableId> uuidToTableId(Set<UUID> src)
+    {
+        return ImmutableSet.copyOf(Iterables.transform(src, 
TableId::fromUUID));
+    }
+
+    private static Set<UUID> tableIdToUuid(Set<TableId> src)
+    {
+        return ImmutableSet.copyOf(Iterables.transform(src, TableId::asUUID));
+    }
+
+    private final String keyspace = SchemaConstants.SYSTEM_KEYSPACE_NAME;
+    private final String table = SystemKeyspace.REPAIRS;
+    private boolean started = false;
+    private volatile ImmutableMap<UUID, LocalSession> sessions = 
ImmutableMap.of();
+
+    @VisibleForTesting
+    int getNumSessions()
+    {
+        return sessions.size();
+    }
+
+    @VisibleForTesting
+    protected InetAddress getBroadcastAddress()
+    {
+        return FBUtilities.getBroadcastAddress();
+    }
+
+    @VisibleForTesting
+    protected boolean isAlive(InetAddress address)
+    {
+        return FailureDetector.instance.isAlive(address);
+    }
+
+    @VisibleForTesting
+    protected boolean isNodeInitialized()
+    {
+        return StorageService.instance.isInitialized();
+    }
+
+    public List<Map<String, String>> sessionInfo(boolean all)
+    {
+        Iterable<LocalSession> currentSessions = sessions.values();
+        if (!all)
+        {
+            currentSessions = Iterables.filter(currentSessions, s -> 
!s.isCompleted());
+        }
+        return Lists.newArrayList(Iterables.transform(currentSessions, 
LocalSessionInfo::sessionToMap));
+    }
+
+    /**
+     * hook for operators to cancel sessions, cancelling from a 
non-coordinator is an error, unless
+     * force is set to true. Messages are sent out to other participants, but 
we don't wait for a response
+     */
+    public void cancelSession(UUID sessionID, boolean force)
+    {
+        logger.debug("cancelling session {}", sessionID);
+        LocalSession session = getSession(sessionID);
+        Preconditions.checkArgument(session != null, "Session {} does not 
exist", sessionID);
+        Preconditions.checkArgument(force || 
session.coordinator.equals(getBroadcastAddress()),
+                                    "Cancel session %s from it's coordinator 
(%s) or use --force",
+                                    sessionID, session.coordinator);
+
+        setStateAndSave(session, FAILED);
+        for (InetAddress participant : session.participants)
+        {
+            if (!participant.equals(getBroadcastAddress()))
+                sendMessage(participant, new FailSession(sessionID));
+        }
+    }
+
+    /**
+     * Loads sessions out of the repairs table and sets state to started
+     */
+    public synchronized void start()
+    {
+        Preconditions.checkArgument(!started, "LocalSessions.start can only be 
called once");
+        Preconditions.checkArgument(sessions.isEmpty(), "No sessions should be 
added before start");
+        UntypedResultSet rows = 
QueryProcessor.executeInternalWithPaging(String.format("SELECT * FROM %s.%s", 
keyspace, table), 1000);
+        Map<UUID, LocalSession> loadedSessions = new HashMap<>();
+        for (UntypedResultSet.Row row : rows)
+        {
+            try
+            {
+                LocalSession session = load(row);
+                loadedSessions.put(session.sessionID, session);
+            }
+            catch (IllegalArgumentException | NullPointerException e)
+            {
+                logger.warn("Unable to load malformed repair session {}, 
ignoring", row.has("parent_id") ? row.getUUID("parent_id") : null);
+            }
+        }
+        sessions = ImmutableMap.copyOf(loadedSessions);
+        started = true;
+    }
+
+    public boolean isStarted()
+    {
+        return started;
+    }
+
+    private static boolean shouldCheckStatus(LocalSession session, int now)
+    {
+        return !session.isCompleted() && (now > session.getLastUpdate() + 
CHECK_STATUS_TIMEOUT);
+    }
+
+    private static boolean shouldFail(LocalSession session, int now)
+    {
+        return !session.isCompleted() && (now > session.getLastUpdate() + 
AUTO_FAIL_TIMEOUT);
+    }
+
+    private static boolean shouldDelete(LocalSession session, int now)
+    {
+        return session.isCompleted() && (now > session.getLastUpdate() + 
AUTO_DELETE_TIMEOUT);
+    }
+
+    /**
+     * Auto fails and auto deletes timed out and old sessions
+     * Compaction will clean up the sstables still owned by a deleted session
+     */
+    public void cleanup()
+    {
+        logger.debug("Running LocalSessions.cleanup");
+        if (!isNodeInitialized())
+        {
+            logger.debug("node not initialized, aborting local session 
cleanup");
+            return;
+        }
+        Set<LocalSession> currentSessions = new HashSet<>(sessions.values());
+        for (LocalSession session : currentSessions)
+        {
+            synchronized (session)
+            {
+                int now = FBUtilities.nowInSeconds();
+                if (shouldFail(session, now))
+                {
+                    logger.warn("Auto failing timed out repair session {}", 
session);
+                    failSession(session.sessionID, false);
+                }
+                else if (shouldDelete(session, now))
+                {
+                    logger.warn("Auto deleting repair session {}", session);
+                    deleteSession(session.sessionID);
+                }
+                else if (shouldCheckStatus(session, now))
+                {
+                    sendStatusRequest(session);
+                }
+            }
+        }
+    }
+
+    private static ByteBuffer serializeRange(Range<Token> range)
+    {
+        int size = (int) Token.serializer.serializedSize(range.left, 0);
+        size += (int) Token.serializer.serializedSize(range.right, 0);
+        try (DataOutputBuffer buffer = new DataOutputBuffer(size))
+        {
+            Token.serializer.serialize(range.left, buffer, 0);
+            Token.serializer.serialize(range.right, buffer, 0);
+            return buffer.buffer();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Set<ByteBuffer> serializeRanges(Set<Range<Token>> ranges)
+    {
+        Set<ByteBuffer> buffers = new HashSet<>(ranges.size());
+        ranges.forEach(r -> buffers.add(serializeRange(r)));
+        return buffers;
+    }
+
+    private static Range<Token> deserializeRange(ByteBuffer bb)
+    {
+        try (DataInputBuffer in = new DataInputBuffer(bb, false))
+        {
+            IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+            Token left = Token.serializer.deserialize(in, partitioner, 0);
+            Token right = Token.serializer.deserialize(in, partitioner, 0);
+            return new Range<>(left, right);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Set<Range<Token>> deserializeRanges(Set<ByteBuffer> buffers)
+    {
+        Set<Range<Token>> ranges = new HashSet<>(buffers.size());
+        buffers.forEach(bb -> ranges.add(deserializeRange(bb)));
+        return ranges;
+    }
+
+    /**
+     * Save session state to table
+     */
+    @VisibleForTesting
+    void save(LocalSession session)
+    {
+        String query = "INSERT INTO %s.%s " +
+                       "(parent_id, " +
+                       "started_at, " +
+                       "last_update, " +
+                       "repaired_at, " +
+                       "state, " +
+                       "coordinator, " +
+                       "participants, " +
+                       "ranges, " +
+                       "cfids) " +
+                       "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+        QueryProcessor.executeInternal(String.format(query, keyspace, table),
+                                       session.sessionID,
+                                       
Date.from(Instant.ofEpochSecond(session.startedAt)),
+                                       
Date.from(Instant.ofEpochSecond(session.getLastUpdate())),
+                                       
Date.from(Instant.ofEpochMilli(session.repairedAt)),
+                                       session.getState().ordinal(),
+                                       session.coordinator,
+                                       session.participants,
+                                       serializeRanges(session.ranges),
+                                       tableIdToUuid(session.tableIds));
+    }
+
+    private static int dateToSeconds(Date d)
+    {
+        return Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(d.getTime()));
+    }
+
+    private LocalSession load(UntypedResultSet.Row row)
+    {
+        LocalSession.Builder builder = LocalSession.builder();
+        
builder.withState(ConsistentSession.State.valueOf(row.getInt("state")));
+        builder.withSessionID(row.getUUID("parent_id"));
+        builder.withCoordinator(row.getInetAddress("coordinator"));
+        builder.withTableIds(uuidToTableId(row.getSet("cfids", 
UUIDType.instance)));
+        builder.withRepairedAt(row.getTimestamp("repaired_at").getTime());
+        builder.withRanges(deserializeRanges(row.getSet("ranges", 
BytesType.instance)));
+        builder.withParticipants(row.getSet("participants", 
InetAddressType.instance));
+
+        builder.withStartedAt(dateToSeconds(row.getTimestamp("started_at")));
+        builder.withLastUpdate(dateToSeconds(row.getTimestamp("last_update")));
+
+        return buildSession(builder);
+    }
+
+    private void deleteRow(UUID sessionID)
+    {
+        String query = "DELETE FROM %s.%s WHERE parent_id=?";
+        QueryProcessor.executeInternal(String.format(query, keyspace, table), 
sessionID);
+    }
+
+    /**
+     * Loads a session directly from the table. Should be used for testing only
+     */
+    @VisibleForTesting
+    LocalSession loadUnsafe(UUID sessionId)
+    {
+        String query = "SELECT * FROM %s.%s WHERE parent_id=?";
+        UntypedResultSet result = 
QueryProcessor.executeInternal(String.format(query, keyspace, table), 
sessionId);
+        if (result.isEmpty())
+            return null;
+
+        UntypedResultSet.Row row = result.one();
+        return load(row);
+    }
+
+    @VisibleForTesting
+    protected LocalSession buildSession(LocalSession.Builder builder)
+    {
+        return new LocalSession(builder);
+    }
+
+    protected LocalSession getSession(UUID sessionID)
+    {
+        return sessions.get(sessionID);
+    }
+
+    @VisibleForTesting
+    synchronized void putSessionUnsafe(LocalSession session)
+    {
+        putSession(session);
+        save(session);
+    }
+
+    private synchronized void putSession(LocalSession session)
+    {
+        Preconditions.checkArgument(!sessions.containsKey(session.sessionID),
+                                    "LocalSession {} already exists", 
session.sessionID);
+        Preconditions.checkArgument(started, "sessions cannot be added before 
LocalSessions is started");
+        sessions = ImmutableMap.<UUID, LocalSession>builder()
+                               .putAll(sessions)
+                               .put(session.sessionID, session)
+                               .build();
+    }
+
+    private synchronized void removeSession(UUID sessionID)
+    {
+        Preconditions.checkArgument(sessionID != null);
+        Map<UUID, LocalSession> temp = new HashMap<>(sessions);
+        temp.remove(sessionID);
+        sessions = ImmutableMap.copyOf(temp);
+    }
+
+    @VisibleForTesting
+    LocalSession createSessionUnsafe(UUID sessionId, 
ActiveRepairService.ParentRepairSession prs, Set<InetAddress> peers)
+    {
+        LocalSession.Builder builder = LocalSession.builder();
+        builder.withState(ConsistentSession.State.PREPARING);
+        builder.withSessionID(sessionId);
+        builder.withCoordinator(prs.coordinator);
+
+        builder.withTableIds(prs.getTableIds());
+        builder.withRepairedAt(prs.repairedAt);
+        builder.withRanges(prs.getRanges());
+        builder.withParticipants(peers);
+
+        int now = FBUtilities.nowInSeconds();
+        builder.withStartedAt(now);
+        builder.withLastUpdate(now);
+
+        return buildSession(builder);
+    }
+
+    protected ActiveRepairService.ParentRepairSession 
getParentRepairSession(UUID sessionID)
+    {
+        return ActiveRepairService.instance.getParentRepairSession(sessionID);
+    }
+
+    protected void sendMessage(InetAddress destination, RepairMessage message)
+    {
+        logger.debug("sending {} to {}", message, destination);
+        MessageOut<RepairMessage> messageOut = new 
MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, 
RepairMessage.serializer);
+        MessagingService.instance().sendOneWay(messageOut, destination);
+    }
+
+    private void setStateAndSave(LocalSession session, ConsistentSession.State 
state)
+    {
+        synchronized (session)
+        {
+            
Preconditions.checkArgument(session.getState().canTransitionTo(state),
+                                        "Invalid state transition %s -> %s",
+                                        session.getState(), state);
+            logger.debug("Setting LocalSession state from {} -> {} for {}", 
session.getState(), state, session.sessionID);
+            session.setState(state);
+            session.setLastUpdate();
+            save(session);
+        }
+    }
+
+    public void failSession(UUID sessionID)
+    {
+        failSession(sessionID, true);
+    }
+
+    public void failSession(UUID sessionID, boolean sendMessage)
+    {
+        logger.debug("failing session {}", sessionID);
+        LocalSession session = getSession(sessionID);
+        if (session != null)
+        {
+            setStateAndSave(session, FAILED);
+            if (sendMessage)
+            {
+                sendMessage(session.coordinator, new FailSession(sessionID));
+            }
+        }
+    }
+
+    public synchronized void deleteSession(UUID sessionID)
+    {
+        logger.debug("deleting session {}", sessionID);
+        LocalSession session = getSession(sessionID);
+        Preconditions.checkArgument(session.isCompleted(), "Cannot delete 
incomplete sessions");
+
+        deleteRow(sessionID);
+        removeSession(sessionID);
+    }
+
+    @VisibleForTesting
+    ListenableFuture submitPendingAntiCompaction(LocalSession session, 
ExecutorService executor)
+    {
+        PendingAntiCompaction pac = new 
PendingAntiCompaction(session.sessionID, session.ranges, executor);
+        return pac.run();
+    }
+
+    /**
+     * The PrepareConsistentRequest effectively promotes the parent repair 
session to a consistent
+     * incremental session, and begins the 'pending anti compaction' which 
moves all sstable data
+     * that is to be repaired into it's own silo, preventing it from mixing 
with other data.
+     *
+     * No response is sent to the repair coordinator until the pending anti 
compaction has completed
+     * successfully. If the pending anti compaction fails, a failure message 
is sent to the coordinator,
+     * cancelling the session.
+     */
+    public void handlePrepareMessage(InetAddress from, 
PrepareConsistentRequest request)
+    {
+        logger.debug("received {} from {}", request, from);
+        UUID sessionID = request.parentSession;
+        InetAddress coordinator = request.coordinator;
+        Set<InetAddress> peers = request.participants;
+
+        ActiveRepairService.ParentRepairSession parentSession;
+        try
+        {
+            parentSession = getParentRepairSession(sessionID);
+        }
+        catch (Throwable e)
+        {
+            logger.debug("Error retrieving ParentRepairSession for session {}, 
responding with failure", sessionID);
+            sendMessage(coordinator, new FailSession(sessionID));
+            return;
+        }
+
+        LocalSession session = createSessionUnsafe(sessionID, parentSession, 
peers);
+        putSessionUnsafe(session);
+        logger.debug("created local session for {}", sessionID);
+
+        ExecutorService executor = 
Executors.newFixedThreadPool(parentSession.getColumnFamilyStores().size());
+
+        ListenableFuture pendingAntiCompaction = 
submitPendingAntiCompaction(session, executor);
+        Futures.addCallback(pendingAntiCompaction, new FutureCallback()
+        {
+            public void onSuccess(@Nullable Object result)
+            {
+                logger.debug("pending anti-compaction for {} completed", 
sessionID);
+                setStateAndSave(session, PREPARED);
+                sendMessage(coordinator, new 
PrepareConsistentResponse(sessionID, getBroadcastAddress(), true));
+                executor.shutdown();
+            }
+
+            public void onFailure(Throwable t)
+            {
+                logger.debug("pending anti-compaction for {} failed", 
sessionID);
+                failSession(sessionID);
+                executor.shutdown();
+            }
+        });
+    }
+
+    public void maybeSetRepairing(UUID sessionID)
+    {
+        LocalSession session = getSession(sessionID);
+        if (session != null && session.getState() != REPAIRING)
+        {
+            logger.debug("Setting local session {} to REPAIRING", session);
+            setStateAndSave(session, REPAIRING);
+        }
+    }
+
+    public void handleFinalizeProposeMessage(InetAddress from, FinalizePropose 
propose)
+    {
+        logger.debug("received {} from {}", propose, from);
+        UUID sessionID = propose.sessionID;
+        LocalSession session = getSession(sessionID);
+        if (session == null)
+        {
+            logger.debug("No LocalSession found for session {}, responding 
with failure", sessionID);
+            sendMessage(from, new FailSession(sessionID));
+            return;
+        }
+
+        try
+        {
+            setStateAndSave(session, FINALIZE_PROMISED);
+            sendMessage(from, new FinalizePromise(sessionID, 
getBroadcastAddress(), true));
+        }
+        catch (IllegalArgumentException e)
+        {
+            logger.error("error setting session to FINALIZE_PROMISED", e);
+            failSession(sessionID);
+        }
+    }
+
+    /**
+     * Finalizes the repair session, completing it as successful.
+     *
+     * This only changes the state of the session, it doesn't promote the 
siloed sstables to repaired. That will happen
+     * as part of the compaction process, and avoids having to worry about in 
progress compactions interfering with the
+     * promotion.
+     */
+    public void handleFinalizeCommitMessage(InetAddress from, FinalizeCommit 
commit)
+    {
+        logger.debug("received {} from {}", commit, from);
+        UUID sessionID = commit.sessionID;
+        LocalSession session = getSession(sessionID);
+        if (session == null)
+        {
+            logger.warn("Received finalize commit message for unknown session 
{}", sessionID);
+            return;
+        }
+
+        setStateAndSave(session, FINALIZED);
+    }
+
+    public void handleFailSessionMessage(InetAddress from, FailSession msg)
+    {
+        logger.debug("received {} from {}", msg, from);
+        failSession(msg.sessionID, false);
+    }
+
+    public void sendStatusRequest(LocalSession session)
+    {
+        StatusRequest request = new StatusRequest(session.sessionID);
+        for (InetAddress participant : session.participants)
+        {
+            if (!getBroadcastAddress().equals(participant) && 
isAlive(participant))
+            {
+                sendMessage(participant, request);
+            }
+        }
+    }
+
+    public void handleStatusRequest(InetAddress from, StatusRequest request)
+    {
+        logger.debug("received {} from {}", request, from);
+        UUID sessionID = request.sessionID;
+        LocalSession session = getSession(sessionID);
+        if (session == null)
+        {
+            logger.warn("Received status response message for unknown session 
{}", sessionID);
+            sendMessage(from, new StatusResponse(sessionID, FAILED));
+        }
+        else
+        {
+            sendMessage(from, new StatusResponse(sessionID, 
session.getState()));
+        }
+    }
+
+    public void handleStatusResponse(InetAddress from, StatusResponse response)
+    {
+        logger.debug("received {} from {}", response, from);
+        UUID sessionID = response.sessionID;
+        LocalSession session = getSession(sessionID);
+        if (session == null)
+        {
+            logger.warn("Received status response message for unknown session 
{}", sessionID);
+            return;
+        }
+
+        // only change local state if response state is FINALIZED or FAILED, 
since those are
+        // the only statuses that would indicate we've missed a message 
completing the session
+        if (response.state == FINALIZED || response.state == FAILED)
+        {
+            setStateAndSave(session, response.state);
+        }
+        else
+        {
+            logger.debug("{} is not actionable");
+        }
+    }
+
+    /**
+     * determines if a local session exists, and if it's not finalized or 
failed
+     */
+    public boolean isSessionInProgress(UUID sessionID)
+    {
+        LocalSession session = getSession(sessionID);
+        return session != null && session.getState() != FINALIZED && 
session.getState() != FAILED;
+    }
+
+    /**
+     * Returns the repairedAt time for a sessions which is unknown, failed, or 
finalized
+     * calling this for a session which is in progress throws an exception
+     */
+    public long getFinalSessionRepairedAt(UUID sessionID)
+    {
+        LocalSession session = getSession(sessionID);
+        if (session == null || session.getState() == FAILED)
+        {
+            return ActiveRepairService.UNREPAIRED_SSTABLE;
+        }
+        else if (session.getState() == FINALIZED)
+        {
+            return session.repairedAt;
+        }
+        else
+        {
+            throw new IllegalStateException("Cannot get final repaired at 
value for in progress session: " + session);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java 
b/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java
new file mode 100644
index 0000000..5203c41
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Performs an anti compaction on a set of tables and token ranges, isolating 
the unrepaired sstables
+ * for a give token range into a pending repair group so they can't be 
compacted with other sstables
+ * while they are being repaired.
+ */
+public class PendingAntiCompaction
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(PendingAntiCompaction.class);
+
+    static class AcquireResult
+    {
+        final ColumnFamilyStore cfs;
+        final Refs<SSTableReader> refs;
+        final LifecycleTransaction txn;
+
+        AcquireResult(ColumnFamilyStore cfs, Refs<SSTableReader> refs, 
LifecycleTransaction txn)
+        {
+            this.cfs = cfs;
+            this.refs = refs;
+            this.txn = txn;
+        }
+
+        void abort()
+        {
+            txn.abort();
+            refs.release();
+        }
+    }
+
+    static class AcquisitionCallable implements Callable<AcquireResult>
+    {
+        private final ColumnFamilyStore cfs;
+        private final Collection<Range<Token>> ranges;
+        private final UUID sessionID;
+
+        public AcquisitionCallable(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, UUID sessionID)
+        {
+            this.cfs = cfs;
+            this.ranges = ranges;
+            this.sessionID = sessionID;
+        }
+
+        private Iterable<SSTableReader> getSSTables()
+        {
+            return Iterables.filter(cfs.getLiveSSTables(), s -> 
!s.isRepaired() && !s.isPendingRepair() && s.intersects(ranges));
+        }
+
+        private AcquireResult acquireTuple()
+        {
+            List<SSTableReader> sstables = Lists.newArrayList(getSSTables());
+            if (sstables.isEmpty())
+                return new AcquireResult(cfs, null, null);
+
+            LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.ANTICOMPACTION);
+            if (txn != null)
+                return new AcquireResult(cfs, Refs.ref(sstables), txn);
+            else
+                return null;
+        }
+
+        public AcquireResult call() throws Exception
+        {
+            logger.debug("acquiring sstables for pending anti compaction on 
session {}", sessionID);
+            AcquireResult refTxn = acquireTuple();
+            if (refTxn != null)
+                return refTxn;
+
+            // try to modify after cancelling running compactions. This will 
attempt to cancel in flight compactions for
+            // up to a minute, after which point, null will be returned
+            return cfs.runWithCompactionsDisabled(this::acquireTuple, false, 
false);
+        }
+    }
+
+    static class AcquisitionCallback implements 
AsyncFunction<List<AcquireResult>, Object>
+    {
+        private final UUID parentRepairSession;
+        private final Collection<Range<Token>> ranges;
+
+        public AcquisitionCallback(UUID parentRepairSession, 
Collection<Range<Token>> ranges)
+        {
+            this.parentRepairSession = parentRepairSession;
+            this.ranges = ranges;
+        }
+
+        ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result)
+        {
+            return 
CompactionManager.instance.submitPendingAntiCompaction(result.cfs, ranges, 
result.refs, result.txn, parentRepairSession);
+        }
+
+        public ListenableFuture apply(List<AcquireResult> results) throws 
Exception
+        {
+            if (Iterables.any(results, t -> t == null))
+            {
+                // Release all sstables, and report failure back to coordinator
+                for (AcquireResult result : results)
+                {
+                    if (result != null)
+                    {
+                        logger.info("Releasing acquired sstables for {}.{}", 
result.cfs.metadata.keyspace, result.cfs.metadata.name);
+                        result.abort();
+                    }
+                }
+                return Futures.immediateFailedFuture(new 
RuntimeException("unable to acquire sstables"));
+            }
+            else
+            {
+                List<ListenableFuture<?>> pendingAntiCompactions = new 
ArrayList<>(results.size());
+                for (AcquireResult result : results)
+                {
+                    if (result.txn != null)
+                    {
+                        ListenableFuture<?> future = 
submitPendingAntiCompaction(result);
+                        pendingAntiCompactions.add(future);
+                    }
+                }
+
+                return Futures.allAsList(pendingAntiCompactions);
+            }
+        }
+    }
+
+    private final UUID prsId;
+    private final Collection<Range<Token>> ranges;
+    private final ExecutorService executor;
+
+    public PendingAntiCompaction(UUID prsId, Collection<Range<Token>> ranges, 
ExecutorService executor)
+    {
+        this.prsId = prsId;
+        this.ranges = ranges;
+        this.executor = executor;
+    }
+
+    public ListenableFuture run()
+    {
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
+        List<ListenableFutureTask<AcquireResult>> tasks = new ArrayList<>();
+        for (ColumnFamilyStore cfs : prs.getColumnFamilyStores())
+        {
+            cfs.forceBlockingFlush();
+            ListenableFutureTask<AcquireResult> task = 
ListenableFutureTask.create(new AcquisitionCallable(cfs, ranges, prsId));
+            executor.submit(task);
+            tasks.add(task);
+        }
+        ListenableFuture<List<AcquireResult>> acquisitionResults = 
Futures.successfulAsList(tasks);
+        ListenableFuture compactionResult = 
Futures.transform(acquisitionResults, new AcquisitionCallback(prsId, ranges));
+        return compactionResult;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java 
b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
deleted file mode 100644
index a29cc87..0000000
--- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair.messages;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-public class AnticompactionRequest extends RepairMessage
-{
-    public static MessageSerializer serializer = new 
AnticompactionRequestSerializer();
-    public final UUID parentRepairSession;
-    /**
-     * Successfully repaired ranges. Does not contain null.
-     */
-    public final Collection<Range<Token>> successfulRanges;
-
-    public AnticompactionRequest(UUID parentRepairSession, 
Collection<Range<Token>> ranges)
-    {
-        super(Type.ANTICOMPACTION_REQUEST, null);
-        this.parentRepairSession = parentRepairSession;
-        this.successfulRanges = ranges;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (!(o instanceof AnticompactionRequest))
-            return false;
-        AnticompactionRequest other = (AnticompactionRequest)o;
-        return messageType == other.messageType &&
-               parentRepairSession.equals(other.parentRepairSession) &&
-               successfulRanges.equals(other.successfulRanges);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(messageType, parentRepairSession, 
successfulRanges);
-    }
-
-    public static class AnticompactionRequestSerializer implements 
MessageSerializer<AnticompactionRequest>
-    {
-        public void serialize(AnticompactionRequest message, DataOutputPlus 
out, int version) throws IOException
-        {
-            UUIDSerializer.serializer.serialize(message.parentRepairSession, 
out, version);
-            out.writeInt(message.successfulRanges.size());
-            for (Range<Token> r : message.successfulRanges)
-            {
-                MessagingService.validatePartitioner(r);
-                Range.tokenSerializer.serialize(r, out, version);
-            }
-        }
-
-        public AnticompactionRequest deserialize(DataInputPlus in, int 
version) throws IOException
-        {
-            UUID parentRepairSession = 
UUIDSerializer.serializer.deserialize(in, version);
-            int rangeCount = in.readInt();
-            List<Range<Token>> ranges = new ArrayList<>(rangeCount);
-            for (int i = 0; i < rangeCount; i++)
-                ranges.add((Range<Token>) 
Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), 
version));
-            return new AnticompactionRequest(parentRepairSession, ranges);
-        }
-
-        public long serializedSize(AnticompactionRequest message, int version)
-        {
-            long size = 
UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
-            size += Integer.BYTES; // count of items in successfulRanges
-            for (Range<Token> r : message.successfulRanges)
-                size += Range.tokenSerializer.serializedSize(r, version);
-            return size;
-        }
-    }
-
-    @Override
-    public String toString()
-    {
-        return "AnticompactionRequest{" +
-                "parentRepairSession=" + parentRepairSession +
-                "} " + super.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/FailSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/FailSession.java 
b/src/java/org/apache/cassandra/repair/messages/FailSession.java
new file mode 100644
index 0000000..1227cc3
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/FailSession.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class FailSession extends RepairMessage
+{
+    public final UUID sessionID;
+
+    public FailSession(UUID sessionID)
+    {
+        super(Type.FAILED_SESSION, null);
+        assert sessionID != null;
+        this.sessionID = sessionID;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        FailSession that = (FailSession) o;
+
+        return sessionID.equals(that.sessionID);
+    }
+
+    public int hashCode()
+    {
+        return sessionID.hashCode();
+    }
+
+    public static final MessageSerializer serializer = new 
MessageSerializer<FailSession>()
+    {
+        public void serialize(FailSession msg, DataOutputPlus out, int 
version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(msg.sessionID, out, version);
+        }
+
+        public FailSession deserialize(DataInputPlus in, int version) throws 
IOException
+        {
+            return new FailSession(UUIDSerializer.serializer.deserialize(in, 
version));
+        }
+
+        public long serializedSize(FailSession msg, int version)
+        {
+            return UUIDSerializer.serializer.serializedSize(msg.sessionID, 
version);
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java 
b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
new file mode 100644
index 0000000..a4eb111
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class FinalizeCommit extends RepairMessage
+{
+    public final UUID sessionID;
+
+    public FinalizeCommit(UUID sessionID)
+    {
+        super(Type.FINALIZE_COMMIT, null);
+        assert sessionID != null;
+        this.sessionID = sessionID;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        FinalizeCommit that = (FinalizeCommit) o;
+
+        return sessionID.equals(that.sessionID);
+    }
+
+    public int hashCode()
+    {
+        return sessionID.hashCode();
+    }
+
+    public String toString()
+    {
+        return "FinalizeCommit{" +
+               "sessionID=" + sessionID +
+               '}';
+    }
+
+    public static MessageSerializer serializer = new 
MessageSerializer<FinalizeCommit>()
+    {
+        public void serialize(FinalizeCommit msg, DataOutputPlus out, int 
version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(msg.sessionID, out, version);
+        }
+
+        public FinalizeCommit deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            return new 
FinalizeCommit(UUIDSerializer.serializer.deserialize(in, version));
+        }
+
+        public long serializedSize(FinalizeCommit msg, int version)
+        {
+            return UUIDSerializer.serializer.serializedSize(msg.sessionID, 
version);
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java 
b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
new file mode 100644
index 0000000..6c28347
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.serializers.InetAddressSerializer;
+import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class FinalizePromise extends RepairMessage
+{
+    public final UUID sessionID;
+    public final InetAddress participant;
+    public final boolean promised;
+
+    public FinalizePromise(UUID sessionID, InetAddress participant, boolean 
promised)
+    {
+        super(Type.FINALIZE_PROMISE, null);
+        assert sessionID != null;
+        assert participant != null;
+        this.sessionID = sessionID;
+        this.participant = participant;
+        this.promised = promised;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        FinalizePromise that = (FinalizePromise) o;
+
+        if (promised != that.promised) return false;
+        if (!sessionID.equals(that.sessionID)) return false;
+        return participant.equals(that.participant);
+    }
+
+    public int hashCode()
+    {
+        int result = sessionID.hashCode();
+        result = 31 * result + participant.hashCode();
+        result = 31 * result + (promised ? 1 : 0);
+        return result;
+    }
+
+    public static MessageSerializer serializer = new 
MessageSerializer<FinalizePromise>()
+    {
+        private TypeSerializer<InetAddress> inetSerializer = 
InetAddressSerializer.instance;
+
+        public void serialize(FinalizePromise msg, DataOutputPlus out, int 
version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(msg.sessionID, out, version);
+            
ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(msg.participant), 
out);
+            out.writeBoolean(msg.promised);
+        }
+
+        public FinalizePromise deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            return new 
FinalizePromise(UUIDSerializer.serializer.deserialize(in, version),
+                                       
inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)),
+                                       in.readBoolean());
+        }
+
+        public long serializedSize(FinalizePromise msg, int version)
+        {
+            long size = 
UUIDSerializer.serializer.serializedSize(msg.sessionID, version);
+            size += 
ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(msg.participant));
+            size += TypeSizes.sizeof(msg.promised);
+            return size;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java 
b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
new file mode 100644
index 0000000..c0c49df
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class FinalizePropose extends RepairMessage
+{
+    public final UUID sessionID;
+
+    public FinalizePropose(UUID sessionID)
+    {
+        super(Type.FINALIZE_PROPOSE, null);
+        assert sessionID != null;
+        this.sessionID = sessionID;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        FinalizePropose that = (FinalizePropose) o;
+
+        return sessionID.equals(that.sessionID);
+    }
+
+    public int hashCode()
+    {
+        return sessionID.hashCode();
+    }
+
+    public String toString()
+    {
+        return "FinalizePropose{" +
+               "sessionID=" + sessionID +
+               '}';
+    }
+
+    public static MessageSerializer serializer = new 
MessageSerializer<FinalizePropose>()
+    {
+        public void serialize(FinalizePropose msg, DataOutputPlus out, int 
version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(msg.sessionID, out, version);
+        }
+
+        public FinalizePropose deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            return new 
FinalizePropose(UUIDSerializer.serializer.deserialize(in, version));
+        }
+
+        public long serializedSize(FinalizePropose msg, int version)
+        {
+            return UUIDSerializer.serializer.serializedSize(msg.sessionID, 
version);
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
new file mode 100644
index 0000000..57056ef
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.serializers.InetAddressSerializer;
+import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class PrepareConsistentRequest extends RepairMessage
+{
+    public final UUID parentSession;
+    public final InetAddress coordinator;
+    public final Set<InetAddress> participants;
+
+    public PrepareConsistentRequest(UUID parentSession, InetAddress 
coordinator, Set<InetAddress> participants)
+    {
+        super(Type.CONSISTENT_REQUEST, null);
+        assert parentSession != null;
+        assert coordinator != null;
+        assert participants != null && !participants.isEmpty();
+        this.parentSession = parentSession;
+        this.coordinator = coordinator;
+        this.participants = ImmutableSet.copyOf(participants);
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        PrepareConsistentRequest that = (PrepareConsistentRequest) o;
+
+        if (!parentSession.equals(that.parentSession)) return false;
+        if (!coordinator.equals(that.coordinator)) return false;
+        return participants.equals(that.participants);
+    }
+
+    public int hashCode()
+    {
+        int result = parentSession.hashCode();
+        result = 31 * result + coordinator.hashCode();
+        result = 31 * result + participants.hashCode();
+        return result;
+    }
+
+    public String toString()
+    {
+        return "PrepareConsistentRequest{" +
+               "parentSession=" + parentSession +
+               ", coordinator=" + coordinator +
+               ", participants=" + participants +
+               '}';
+    }
+
+    public static MessageSerializer serializer = new 
MessageSerializer<PrepareConsistentRequest>()
+    {
+        private TypeSerializer<InetAddress> inetSerializer = 
InetAddressSerializer.instance;
+
+        public void serialize(PrepareConsistentRequest request, DataOutputPlus 
out, int version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(request.parentSession, out, 
version);
+            
ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(request.coordinator),
 out);
+            out.writeInt(request.participants.size());
+            for (InetAddress peer : request.participants)
+            {
+                
ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(peer), out);
+            }
+        }
+
+        public PrepareConsistentRequest deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            UUID sessionId = UUIDSerializer.serializer.deserialize(in, 
version);
+            InetAddress coordinator = 
inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in));
+            int numPeers = in.readInt();
+            Set<InetAddress> peers = new HashSet<>(numPeers);
+            for (int i = 0; i < numPeers; i++)
+            {
+                InetAddress peer = 
inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in));
+                peers.add(peer);
+            }
+            return new PrepareConsistentRequest(sessionId, coordinator, peers);
+        }
+
+        public long serializedSize(PrepareConsistentRequest request, int 
version)
+        {
+            long size = 
UUIDSerializer.serializer.serializedSize(request.parentSession, version);
+            size += 
ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(request.coordinator));
+            size += TypeSizes.sizeof(request.participants.size());
+            for (InetAddress peer : request.participants)
+            {
+                size += 
ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(peer));
+            }
+            return size;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
new file mode 100644
index 0000000..cf4410a
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.serializers.InetAddressSerializer;
+import org.apache.cassandra.serializers.TypeSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class PrepareConsistentResponse extends RepairMessage
+{
+    public final UUID parentSession;
+    public final InetAddress participant;
+    public final boolean success;
+
+    public PrepareConsistentResponse(UUID parentSession, InetAddress 
participant, boolean success)
+    {
+        super(Type.CONSISTENT_RESPONSE, null);
+        assert parentSession != null;
+        assert participant != null;
+        this.parentSession = parentSession;
+        this.participant = participant;
+        this.success = success;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        PrepareConsistentResponse that = (PrepareConsistentResponse) o;
+
+        if (success != that.success) return false;
+        if (!parentSession.equals(that.parentSession)) return false;
+        return participant.equals(that.participant);
+    }
+
+    public int hashCode()
+    {
+        int result = parentSession.hashCode();
+        result = 31 * result + participant.hashCode();
+        result = 31 * result + (success ? 1 : 0);
+        return result;
+    }
+
+    public static MessageSerializer serializer = new 
MessageSerializer<PrepareConsistentResponse>()
+    {
+        private TypeSerializer<InetAddress> inetSerializer = 
InetAddressSerializer.instance;
+        public void serialize(PrepareConsistentResponse response, 
DataOutputPlus out, int version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(response.parentSession, out, 
version);
+            
ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(response.participant),
 out);
+            out.writeBoolean(response.success);
+        }
+
+        public PrepareConsistentResponse deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            return new 
PrepareConsistentResponse(UUIDSerializer.serializer.deserialize(in, version),
+                                                 
inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)),
+                                                 in.readBoolean());
+        }
+
+        public long serializedSize(PrepareConsistentResponse response, int 
version)
+        {
+            long size = 
UUIDSerializer.serializer.serializedSize(response.parentSession, version);
+            size += 
ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(response.participant));
+            size += TypeSizes.sizeof(response.success);
+            return size;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java 
b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 55fdb66..3cb913a 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -37,16 +37,24 @@ public abstract class RepairMessage
 
     public static interface MessageSerializer<T extends RepairMessage> extends 
IVersionedSerializer<T> {}
 
-    public static enum Type
+    public enum Type
     {
         VALIDATION_REQUEST(0, ValidationRequest.serializer),
         VALIDATION_COMPLETE(1, ValidationComplete.serializer),
         SYNC_REQUEST(2, SyncRequest.serializer),
         SYNC_COMPLETE(3, SyncComplete.serializer),
-        ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer),
         PREPARE_MESSAGE(5, PrepareMessage.serializer),
         SNAPSHOT(6, SnapshotMessage.serializer),
-        CLEANUP(7, CleanupMessage.serializer);
+        CLEANUP(7, CleanupMessage.serializer),
+
+        CONSISTENT_REQUEST(8, PrepareConsistentRequest.serializer),
+        CONSISTENT_RESPONSE(9, PrepareConsistentResponse.serializer),
+        FINALIZE_PROPOSE(10, FinalizePropose.serializer),
+        FINALIZE_PROMISE(11, FinalizePromise.serializer),
+        FINALIZE_COMMIT(12, FinalizeCommit.serializer),
+        FAILED_SESSION(13, FailSession.serializer),
+        STATUS_REQUEST(14, StatusRequest.serializer),
+        STATUS_RESPONSE(15, StatusResponse.serializer);
 
         private final byte type;
         private final MessageSerializer<RepairMessage> serializer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java 
b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index ced6e43..3b13cd8 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -237,6 +237,11 @@ public class RepairOption
             }
         }
 
+        if (option.isIncremental() && !option.isGlobal())
+        {
+            throw new IllegalArgumentException("Incremental repairs cannot be 
run against a subset of tokens or ranges");
+        }
+
         return option;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/StatusRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/StatusRequest.java 
b/src/java/org/apache/cassandra/repair/messages/StatusRequest.java
new file mode 100644
index 0000000..f6a2b82
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/StatusRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class StatusRequest extends RepairMessage
+{
+    public final UUID sessionID;
+
+    public StatusRequest(UUID sessionID)
+    {
+        super(Type.STATUS_REQUEST, null);
+        this.sessionID = sessionID;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        StatusRequest request = (StatusRequest) o;
+
+        return sessionID.equals(request.sessionID);
+    }
+
+    public int hashCode()
+    {
+        return sessionID.hashCode();
+    }
+
+    public String toString()
+    {
+        return "StatusRequest{" +
+               "sessionID=" + sessionID +
+               '}';
+    }
+
+    public static MessageSerializer serializer = new 
MessageSerializer<StatusRequest>()
+    {
+        public void serialize(StatusRequest msg, DataOutputPlus out, int 
version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(msg.sessionID, out, version);
+        }
+
+        public StatusRequest deserialize(DataInputPlus in, int version) throws 
IOException
+        {
+            return new StatusRequest(UUIDSerializer.serializer.deserialize(in, 
version));
+        }
+
+        public long serializedSize(StatusRequest msg, int version)
+        {
+            return UUIDSerializer.serializer.serializedSize(msg.sessionID, 
version);
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/messages/StatusResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/StatusResponse.java 
b/src/java/org/apache/cassandra/repair/messages/StatusResponse.java
new file mode 100644
index 0000000..99eb76b
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/StatusResponse.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.repair.consistent.ConsistentSession;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class StatusResponse extends RepairMessage
+{
+    public final UUID sessionID;
+    public final ConsistentSession.State state;
+
+    public StatusResponse(UUID sessionID, ConsistentSession.State state)
+    {
+        super(Type.STATUS_RESPONSE, null);
+        assert sessionID != null;
+        assert state != null;
+        this.sessionID = sessionID;
+        this.state = state;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        StatusResponse that = (StatusResponse) o;
+
+        if (!sessionID.equals(that.sessionID)) return false;
+        return state == that.state;
+    }
+
+    public int hashCode()
+    {
+        int result = sessionID.hashCode();
+        result = 31 * result + state.hashCode();
+        return result;
+    }
+
+    public String toString()
+    {
+        return "StatusResponse{" +
+               "sessionID=" + sessionID +
+               ", state=" + state +
+               '}';
+    }
+
+    public static final MessageSerializer serializer = new 
MessageSerializer<StatusResponse>()
+    {
+        public void serialize(StatusResponse msg, DataOutputPlus out, int 
version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(msg.sessionID, out, version);
+            out.writeInt(msg.state.ordinal());
+        }
+
+        public StatusResponse deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            return new 
StatusResponse(UUIDSerializer.serializer.deserialize(in, version),
+                                      
ConsistentSession.State.valueOf(in.readInt()));
+        }
+
+        public long serializedSize(StatusResponse msg, int version)
+        {
+            return UUIDSerializer.serializer.serializedSize(msg.sessionID, 
version)
+                   + TypeSizes.sizeof(msg.state.ordinal());
+        }
+    };
+}

Reply via email to