Repository: cassandra
Updated Branches:
  refs/heads/trunk 1757e1330 -> 98d74ed99


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
new file mode 100644
index 0000000..a85e01b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -0,0 +1,885 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.StatusRequest;
+import org.apache.cassandra.repair.messages.StatusResponse;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
+
+public class LocalSessionTest extends AbstractConsistentSessionTest
+{
+
+    static LocalSession.Builder createBuilder()
+    {
+        LocalSession.Builder builder = LocalSession.builder();
+        builder.withState(PREPARING);
+        builder.withSessionID(UUIDGen.getTimeUUID());
+        builder.withCoordinator(COORDINATOR);
+        builder.withUUIDTableIds(Sets.newHashSet(UUIDGen.getTimeUUID(), 
UUIDGen.getTimeUUID()));
+        builder.withRepairedAt(System.currentTimeMillis());
+        builder.withRanges(Sets.newHashSet(RANGE1, RANGE2, RANGE3));
+        builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, 
PARTICIPANT3));
+
+        int now = FBUtilities.nowInSeconds();
+        builder.withStartedAt(now);
+        builder.withLastUpdate(now);
+
+        return builder;
+    }
+
+    static LocalSession createSession()
+    {
+        return createBuilder().build();
+    }
+
+    private static void assertValidationFailure(Consumer<LocalSession.Builder> 
consumer)
+    {
+        try
+        {
+            LocalSession.Builder builder = createBuilder();
+            consumer.accept(builder);
+            builder.build();
+            Assert.fail("Expected assertion error");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // expected
+        }
+    }
+
+    private static void assertNoMessagesSent(InstrumentedLocalSessions 
sessions, InetAddress to)
+    {
+        Assert.assertNull(sessions.sentMessages.get(to));
+    }
+
+    private static void assertMessagesSent(InstrumentedLocalSessions sessions, 
InetAddress to, RepairMessage... expected)
+    {
+        Assert.assertEquals(Lists.newArrayList(expected), 
sessions.sentMessages.get(to));
+    }
+
+    static class InstrumentedLocalSessions extends LocalSessions
+    {
+        Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>();
+        protected void sendMessage(InetAddress destination, RepairMessage 
message)
+        {
+            if (!sentMessages.containsKey(destination))
+            {
+                sentMessages.put(destination, new ArrayList<>());
+            }
+            sentMessages.get(destination).add(message);
+        }
+
+        SettableFuture<Object> pendingAntiCompactionFuture = null;
+        boolean submitPendingAntiCompactionCalled = false;
+        ListenableFuture submitPendingAntiCompaction(LocalSession session, 
ExecutorService executor)
+        {
+            submitPendingAntiCompactionCalled = true;
+            if (pendingAntiCompactionFuture != null)
+            {
+                return pendingAntiCompactionFuture;
+            }
+            else
+            {
+                return super.submitPendingAntiCompaction(session, executor);
+            }
+        }
+
+        boolean failSessionCalled = false;
+        public void failSession(UUID sessionID, boolean sendMessage)
+        {
+            failSessionCalled = true;
+            super.failSession(sessionID, sendMessage);
+        }
+
+        public LocalSession prepareForTest(UUID sessionID)
+        {
+            pendingAntiCompactionFuture = SettableFuture.create();
+            handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+            pendingAntiCompactionFuture.set(new Object());
+            sentMessages.clear();
+            return getSession(sessionID);
+        }
+
+        protected InetAddress getBroadcastAddress()
+        {
+            return PARTICIPANT1;
+        }
+
+        protected boolean isAlive(InetAddress address)
+        {
+            return true;
+        }
+
+        protected boolean isNodeInitialized()
+        {
+            return true;
+        }
+    }
+
+    private static TableMetadata cfm;
+    private static ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+        cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, 
v INT)", "localsessiontest").build();
+        SchemaLoader.createKeyspace("localsessiontest", 
KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+    }
+
+    @Before
+    public void setup()
+    {
+        // clear out any data from previous test runs
+        ColumnFamilyStore repairCfs = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.REPAIRS);
+        repairCfs.truncateBlocking();
+    }
+
+    private static UUID registerSession()
+    {
+        return registerSession(cfs);
+    }
+
+    @Test
+    public void validation()
+    {
+        assertValidationFailure(b -> b.withState(null));
+        assertValidationFailure(b -> b.withSessionID(null));
+        assertValidationFailure(b -> b.withCoordinator(null));
+        assertValidationFailure(b -> b.withTableIds(null));
+        assertValidationFailure(b -> b.withTableIds(new HashSet<>()));
+        assertValidationFailure(b -> b.withRepairedAt(0));
+        assertValidationFailure(b -> b.withRepairedAt(-1));
+        assertValidationFailure(b -> b.withRanges(null));
+        assertValidationFailure(b -> b.withRanges(new HashSet<>()));
+        assertValidationFailure(b -> b.withParticipants(null));
+        assertValidationFailure(b -> b.withParticipants(new HashSet<>()));
+        assertValidationFailure(b -> b.withStartedAt(0));
+        assertValidationFailure(b -> b.withLastUpdate(0));
+    }
+
+    /**
+     * Test that sessions are loaded and saved properly
+     */
+    @Test
+    public void persistence()
+    {
+        LocalSessions sessions = new LocalSessions();
+        LocalSession expected = createSession();
+        sessions.save(expected);
+        LocalSession actual = sessions.loadUnsafe(expected.sessionID);
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void prepareSuccessCase()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        // replacing future so we can inspect state before and after anti 
compaction callback
+        sessions.pendingAntiCompactionFuture = SettableFuture.create();
+        Assert.assertFalse(sessions.submitPendingAntiCompactionCalled);
+        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        Assert.assertTrue(sessions.submitPendingAntiCompactionCalled);
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+
+        // anti compaction hasn't finished yet, so state in memory and on disk 
should be PREPARING
+        LocalSession session = sessions.getSession(sessionID);
+        Assert.assertNotNull(session);
+        Assert.assertEquals(PREPARING, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+
+        // anti compaction has now finished, so state in memory and on disk 
should be PREPARED
+        sessions.pendingAntiCompactionFuture.set(new Object());
+        session = sessions.getSession(sessionID);
+        Assert.assertNotNull(session);
+        Assert.assertEquals(PREPARED, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+
+        // ...and we should have sent a success message back to the coordinator
+        assertMessagesSent(sessions, COORDINATOR, new 
PrepareConsistentResponse(sessionID, PARTICIPANT1, true));
+    }
+
+    /**
+     * If anti compactionn fails, we should fail the session locally,
+     * and send a failure message back to the coordinator
+     */
+    @Test
+    public void prepareAntiCompactFailure()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        // replacing future so we can inspect state before and after anti 
compaction callback
+        sessions.pendingAntiCompactionFuture = SettableFuture.create();
+        Assert.assertFalse(sessions.submitPendingAntiCompactionCalled);
+        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        Assert.assertTrue(sessions.submitPendingAntiCompactionCalled);
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+
+        // anti compaction hasn't finished yet, so state in memory and on disk 
should be PREPARING
+        LocalSession session = sessions.getSession(sessionID);
+        Assert.assertNotNull(session);
+        Assert.assertEquals(PREPARING, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+
+        // anti compaction has now finished, so state in memory and on disk 
should be PREPARED
+        sessions.pendingAntiCompactionFuture.setException(new 
RuntimeException());
+        session = sessions.getSession(sessionID);
+        Assert.assertNotNull(session);
+        Assert.assertEquals(FAILED, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+
+        // ...and we should have sent a success message back to the coordinator
+        assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID));
+
+    }
+
+    /**
+     * If a ParentRepairSession wasn't previously created, we shouldn't
+     * create a session locally, but we should send a failure message to
+     * the coordinator.
+     */
+    @Test
+    public void prepareWithNonExistantParentSession()
+    {
+        UUID sessionID = UUIDGen.getTimeUUID();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        Assert.assertNull(sessions.getSession(sessionID));
+        assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID));
+    }
+
+    @Test
+    public void maybeSetRepairing()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        LocalSession session = sessions.prepareForTest(sessionID);
+        Assert.assertEquals(PREPARED, session.getState());
+
+        sessions.sentMessages.clear();
+        sessions.maybeSetRepairing(sessionID);
+        Assert.assertEquals(REPAIRING, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+    }
+
+    /**
+     * Multiple calls to maybeSetRepairing shouldn't cause any problems
+     */
+    @Test
+    public void maybeSetRepairingDuplicates()
+    {
+
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        LocalSession session = sessions.prepareForTest(sessionID);
+        Assert.assertEquals(PREPARED, session.getState());
+
+        // initial set
+        sessions.sentMessages.clear();
+        sessions.maybeSetRepairing(sessionID);
+        Assert.assertEquals(REPAIRING, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+
+        // repeated call 1
+        sessions.maybeSetRepairing(sessionID);
+        Assert.assertEquals(REPAIRING, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+
+        // repeated call 2
+        sessions.maybeSetRepairing(sessionID);
+        Assert.assertEquals(REPAIRING, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+    }
+
+    /**
+     * We shouldn't fail if we don't have a session for the given session id
+     */
+    @Test
+    public void maybeSetRepairingNonExistantSession()
+    {
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        UUID fakeID = UUIDGen.getTimeUUID();
+        sessions.maybeSetRepairing(fakeID);
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+    }
+
+    /**
+     * In the success case, session state should be set to FINALIZE_PROMISED 
and
+     * persisted, and a FinalizePromise message should be sent back to the 
coordinator
+     */
+    @Test
+    public void finalizeProposeSuccessCase()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        // create session and move to preparing
+        LocalSession session = sessions.prepareForTest(sessionID);
+        sessions.maybeSetRepairing(sessionID);
+
+        //
+        Assert.assertEquals(REPAIRING, session.getState());
+
+        // should send a promised message to coordinator and set session state 
accordingly
+        sessions.sentMessages.clear();
+        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(sessionID));
+        Assert.assertEquals(FINALIZE_PROMISED, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+        assertMessagesSent(sessions, COORDINATOR, new 
FinalizePromise(sessionID, PARTICIPANT1, true));
+    }
+
+    /**
+     * Trying to propose finalization when the session isn't in the repaired
+     * state should fail the session and send a failure message to the proposer
+     */
+    @Test
+    public void finalizeProposeInvalidStateFailure()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        LocalSession session = sessions.prepareForTest(sessionID);
+        Assert.assertEquals(PREPARED, session.getState());
+
+        // should fail the session and send a failure message to the 
coordinator
+        sessions.sentMessages.clear();
+        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(sessionID));
+        Assert.assertEquals(FAILED, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+        assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID));
+    }
+
+    @Test
+    public void finalizeProposeNonExistantSessionFailure()
+    {
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        UUID fakeID = UUIDGen.getTimeUUID();
+        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(fakeID));
+        Assert.assertNull(sessions.getSession(fakeID));
+        assertMessagesSent(sessions, COORDINATOR, new FailSession(fakeID));
+    }
+
+    /**
+     * Session state should be set to finalized, sstables should be promoted
+     * to repaired. No messages should be sent to the coordinator
+     */
+    @Test
+    public void finalizeCommitSuccessCase()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        // create session and move to finalized promised
+        sessions.prepareForTest(sessionID);
+        sessions.maybeSetRepairing(sessionID);
+        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(sessionID));
+
+        sessions.sentMessages.clear();
+        LocalSession session = sessions.getSession(sessionID);
+        sessions.handleFinalizeCommitMessage(PARTICIPANT1, new 
FinalizeCommit(sessionID));
+
+        Assert.assertEquals(FINALIZED, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+    }
+
+    @Test
+    public void finalizeCommitNonExistantSession()
+    {
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        UUID fakeID = UUIDGen.getTimeUUID();
+        sessions.handleFinalizeCommitMessage(PARTICIPANT1, new 
FinalizeCommit(fakeID));
+        Assert.assertNull(sessions.getSession(fakeID));
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+    }
+
+    @Test
+    public void failSession()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        LocalSession session = sessions.prepareForTest(sessionID);
+        Assert.assertEquals(PREPARED, session.getState());
+        sessions.sentMessages.clear();
+
+        // fail session
+        sessions.failSession(sessionID);
+        Assert.assertEquals(FAILED, session.getState());
+        assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID));
+    }
+
+    /**
+     * Session should be failed, but no messages should be sent
+     */
+    @Test
+    public void handleFailMessage()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        LocalSession session = sessions.prepareForTest(sessionID);
+        Assert.assertEquals(PREPARED, session.getState());
+        sessions.sentMessages.clear();
+
+        sessions.handleFailSessionMessage(PARTICIPANT1, new 
FailSession(sessionID));
+        Assert.assertEquals(FAILED, session.getState());
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+    }
+
+    @Test
+    public void sendStatusRequest() throws Exception
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        LocalSession session = sessions.prepareForTest(sessionID);
+
+        sessions.sentMessages.clear();
+        sessions.sendStatusRequest(session);
+
+        assertNoMessagesSent(sessions, PARTICIPANT1);
+        StatusRequest expected = new StatusRequest(sessionID);
+        assertMessagesSent(sessions, PARTICIPANT2, expected);
+        assertMessagesSent(sessions, PARTICIPANT3, expected);
+    }
+
+    @Test
+    public void handleStatusRequest() throws Exception
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        LocalSession session = sessions.prepareForTest(sessionID);
+        Assert.assertEquals(PREPARED, session.getState());
+
+        sessions.sentMessages.clear();
+        sessions.handleStatusRequest(PARTICIPANT2, new 
StatusRequest(sessionID));
+        assertNoMessagesSent(sessions, PARTICIPANT1);
+        assertMessagesSent(sessions, PARTICIPANT2, new 
StatusResponse(sessionID, PREPARED));
+        assertNoMessagesSent(sessions, PARTICIPANT3);
+    }
+
+    @Test
+    public void handleStatusRequestNoSession() throws Exception
+    {
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        sessions.sentMessages.clear();
+        UUID sessionID = UUIDGen.getTimeUUID();
+        sessions.handleStatusRequest(PARTICIPANT2, new 
StatusRequest(sessionID));
+        assertNoMessagesSent(sessions, PARTICIPANT1);
+        assertMessagesSent(sessions, PARTICIPANT2, new 
StatusResponse(sessionID, FAILED));
+        assertNoMessagesSent(sessions, PARTICIPANT3);
+    }
+
+    @Test
+    public void handleStatusResponseFinalized() throws Exception
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        LocalSession session = sessions.prepareForTest(sessionID);
+        session.setState(FINALIZE_PROMISED);
+
+        sessions.handleStatusResponse(PARTICIPANT1, new 
StatusResponse(sessionID, FINALIZED));
+        Assert.assertEquals(FINALIZED, session.getState());
+    }
+
+    @Test
+    public void handleStatusResponseFailed() throws Exception
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        LocalSession session = sessions.prepareForTest(sessionID);
+        session.setState(FINALIZE_PROMISED);
+
+        sessions.handleStatusResponse(PARTICIPANT1, new 
StatusResponse(sessionID, FAILED));
+        Assert.assertEquals(FAILED, session.getState());
+    }
+
+    @Test
+    public void handleStatusResponseNoop() throws Exception
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        LocalSession session = sessions.prepareForTest(sessionID);
+        session.setState(REPAIRING);
+
+        sessions.handleStatusResponse(PARTICIPANT1, new 
StatusResponse(sessionID, FINALIZE_PROMISED));
+        Assert.assertEquals(REPAIRING, session.getState());
+    }
+
+    @Test
+    public void handleStatusResponseNoSession() throws Exception
+    {
+        UUID sessionID = UUIDGen.getTimeUUID();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        sessions.handleStatusResponse(PARTICIPANT1, new 
StatusResponse(sessionID, FINALIZE_PROMISED));
+        Assert.assertNull(sessions.getSession(sessionID));
+    }
+
+    /**
+     * Check all states (except failed)
+     */
+    @Test
+    public void isSessionInProgress()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        sessions.pendingAntiCompactionFuture = SettableFuture.create();  // 
prevent moving to prepared
+        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+
+        LocalSession session = sessions.getSession(sessionID);
+        Assert.assertNotNull(session);
+        Assert.assertEquals(PREPARING, session.getState());
+        Assert.assertTrue(sessions.isSessionInProgress(sessionID));
+
+        session.setState(PREPARED);
+        Assert.assertTrue(sessions.isSessionInProgress(sessionID));
+
+        session.setState(REPAIRING);
+        Assert.assertTrue(sessions.isSessionInProgress(sessionID));
+
+        session.setState(FINALIZE_PROMISED);
+        Assert.assertTrue(sessions.isSessionInProgress(sessionID));
+
+        session.setState(FINALIZED);
+        Assert.assertFalse(sessions.isSessionInProgress(sessionID));
+    }
+
+    @Test
+    public void isSessionInProgressFailed()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        sessions.pendingAntiCompactionFuture = SettableFuture.create();
+        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        sessions.pendingAntiCompactionFuture.set(new Object());
+
+        Assert.assertTrue(sessions.isSessionInProgress(sessionID));
+        sessions.failSession(sessionID);
+        Assert.assertFalse(sessions.isSessionInProgress(sessionID));
+    }
+
+    @Test
+    public void isSessionInProgressNonExistantSession()
+    {
+        UUID fakeID = UUIDGen.getTimeUUID();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        Assert.assertFalse(sessions.isSessionInProgress(fakeID));
+    }
+
+    @Test
+    public void finalRepairedAtFinalized()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        sessions.prepareForTest(sessionID);
+        sessions.maybeSetRepairing(sessionID);
+        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(sessionID));
+        sessions.handleFinalizeCommitMessage(PARTICIPANT1, new 
FinalizeCommit(sessionID));
+
+        LocalSession session = sessions.getSession(sessionID);
+        Assert.assertTrue(session.repairedAt != 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        Assert.assertEquals(session.repairedAt, 
sessions.getFinalSessionRepairedAt(sessionID));
+    }
+
+    @Test
+    public void finalRepairedAtFailed()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        sessions.prepareForTest(sessionID);
+        sessions.failSession(sessionID);
+
+        LocalSession session = sessions.getSession(sessionID);
+        Assert.assertTrue(session.repairedAt != 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        long repairedAt = sessions.getFinalSessionRepairedAt(sessionID);
+        Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, 
repairedAt);
+    }
+
+    @Test
+    public void finalRepairedAtNoSession()
+    {
+        UUID fakeID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        long repairedAt = sessions.getFinalSessionRepairedAt(fakeID);
+        Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, 
repairedAt);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void finalRepairedAtInProgress()
+    {
+        UUID sessionID = registerSession();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        sessions.prepareForTest(sessionID);
+
+        sessions.getFinalSessionRepairedAt(sessionID);
+    }
+
+    /**
+     * Startup happy path
+     */
+    @Test
+    public void startup() throws Exception
+    {
+        InstrumentedLocalSessions initialSessions = new 
InstrumentedLocalSessions();
+        initialSessions.start();
+        Assert.assertEquals(0, initialSessions.getNumSessions());
+        UUID id1 = registerSession();
+        UUID id2 = registerSession();
+
+        initialSessions.prepareForTest(id1);
+        initialSessions.prepareForTest(id2);
+        Assert.assertEquals(2, initialSessions.getNumSessions());
+        LocalSession session1 = initialSessions.getSession(id1);
+        LocalSession session2 = initialSessions.getSession(id2);
+
+
+        // subsequent startups should load persisted sessions
+        InstrumentedLocalSessions nextSessions = new 
InstrumentedLocalSessions();
+        Assert.assertEquals(0, nextSessions.getNumSessions());
+        nextSessions.start();
+        Assert.assertEquals(2, nextSessions.getNumSessions());
+
+        Assert.assertEquals(session1, nextSessions.getSession(id1));
+        Assert.assertEquals(session2, nextSessions.getSession(id2));
+    }
+
+    /**
+     * If LocalSessions.start is called more than
+     * once, an exception should be thrown
+     */
+    @Test (expected = IllegalArgumentException.class)
+    public void multipleStartupFailure() throws Exception
+    {
+        InstrumentedLocalSessions initialSessions = new 
InstrumentedLocalSessions();
+        initialSessions.start();
+        initialSessions.start();
+    }
+
+    /**
+     * If there are problems with the rows we're reading out of the repair 
table, we should
+     * do the best we can to repair them, but not refuse to startup.
+     */
+    @Test
+    public void loadCorruptRow() throws Exception
+    {
+        LocalSessions sessions = new LocalSessions();
+        LocalSession session = createSession();
+        sessions.save(session);
+
+        sessions = new LocalSessions();
+        sessions.start();
+        Assert.assertNotNull(sessions.getSession(session.sessionID));
+
+        QueryProcessor.instance.executeInternal("DELETE participants FROM 
system.repairs WHERE parent_id=?", session.sessionID);
+
+        sessions = new LocalSessions();
+        sessions.start();
+        Assert.assertNull(sessions.getSession(session.sessionID));
+    }
+
+    private static LocalSession sessionWithTime(int started, int updated)
+    {
+        LocalSession.Builder builder = createBuilder();
+        builder.withStartedAt(started);
+        builder.withLastUpdate(updated);
+        return builder.build();
+    }
+
+    /**
+     * Sessions that shouldn't be failed or deleted are left alone
+     */
+    @Test
+    public void cleanupNoOp() throws Exception
+    {
+        LocalSessions sessions = new LocalSessions();
+        sessions.start();
+
+        int time = FBUtilities.nowInSeconds() - 
LocalSessions.AUTO_FAIL_TIMEOUT + 60;
+        LocalSession session = sessionWithTime(time - 1, time);
+
+        sessions.putSessionUnsafe(session);
+        Assert.assertNotNull(sessions.getSession(session.sessionID));
+
+        sessions.cleanup();
+
+        Assert.assertNotNull(sessions.getSession(session.sessionID));
+    }
+
+    /**
+     * Sessions past the auto fail cutoff should be failed
+     */
+    @Test
+    public void cleanupFail() throws Exception
+    {
+        LocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        int time = FBUtilities.nowInSeconds() - 
LocalSessions.AUTO_FAIL_TIMEOUT - 1;
+        LocalSession session = sessionWithTime(time - 1, time);
+        session.setState(REPAIRING);
+
+        sessions.putSessionUnsafe(session);
+        Assert.assertNotNull(sessions.getSession(session.sessionID));
+
+        sessions.cleanup();
+
+        Assert.assertNotNull(sessions.getSession(session.sessionID));
+        Assert.assertEquals(FAILED, session.getState());
+        Assert.assertEquals(session, sessions.loadUnsafe(session.sessionID));
+    }
+
+    /**
+     * Sessions past the auto delete cutoff should be deleted
+     */
+    @Test
+    public void cleanupDelete() throws Exception
+    {
+        LocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+
+        int time = FBUtilities.nowInSeconds() - 
LocalSessions.AUTO_FAIL_TIMEOUT - 1;
+        LocalSession failed = sessionWithTime(time - 1, time);
+        failed.setState(FAILED);
+
+        LocalSession finalized = sessionWithTime(time - 1, time);
+        finalized.setState(FINALIZED);
+
+        sessions.putSessionUnsafe(failed);
+        sessions.putSessionUnsafe(finalized);
+        Assert.assertNotNull(sessions.getSession(failed.sessionID));
+        Assert.assertNotNull(sessions.getSession(finalized.sessionID));
+
+        sessions.cleanup();
+
+        Assert.assertNull(sessions.getSession(failed.sessionID));
+        Assert.assertNull(sessions.getSession(finalized.sessionID));
+
+        Assert.assertNull(sessions.loadUnsafe(failed.sessionID));
+        Assert.assertNull(sessions.loadUnsafe(finalized.sessionID));
+    }
+
+    /**
+     * Sessions should start checking the status of their participants if
+     * there hasn't been activity for the CHECK_STATUS_TIMEOUT period
+     */
+    @Test
+    public void cleanupStatusRequest() throws Exception
+    {
+        AtomicReference<LocalSession> checkedSession = new AtomicReference<>();
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions() {
+            public void sendStatusRequest(LocalSession session)
+            {
+                Assert.assertTrue(checkedSession.compareAndSet(null, session));
+            }
+        };
+        sessions.start();
+
+        int time = FBUtilities.nowInSeconds() - 
LocalSessions.CHECK_STATUS_TIMEOUT - 1;
+        LocalSession session = sessionWithTime(time - 1, time);
+        session.setState(REPAIRING);
+
+        sessions.putSessionUnsafe(session);
+        Assert.assertNotNull(sessions.getSession(session.sessionID));
+
+        sessions.cleanup();
+
+        Assert.assertEquals(session, checkedSession.get());
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
 
b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
new file mode 100644
index 0000000..2cb6326
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public class PendingAntiCompactionTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(PendingAntiCompactionTest.class);
+    private static final Collection<Range<Token>> FULL_RANGE;
+    static
+    {
+        DatabaseDescriptor.daemonInitialization();
+        Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
+        FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken));
+    }
+
+    private String ks;
+    private final String tbl = "tbl";
+    private TableMetadata cfm;
+    private ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Before
+    public void setup()
+    {
+        ks = "ks_" + System.currentTimeMillis();
+        cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k 
INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+    }
+
+    private void makeSSTables(int num)
+    {
+        for (int i = 0; i < num; i++)
+        {
+            int val = i * 2;  // multiplied to prevent ranges from overlapping
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), val, val);
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), val+1, val+1);
+            cfs.forceBlockingFlush();
+        }
+        Assert.assertEquals(num, cfs.getLiveSSTables().size());
+    }
+
+    private static class InstrumentedAcquisitionCallback extends 
PendingAntiCompaction.AcquisitionCallback
+    {
+        public InstrumentedAcquisitionCallback(UUID parentRepairSession, 
Collection<Range<Token>> ranges)
+        {
+            super(parentRepairSession, ranges);
+        }
+
+        Set<TableId> submittedCompactions = new HashSet<>();
+
+        ListenableFuture<?> 
submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult result)
+        {
+            submittedCompactions.add(result.cfs.metadata.id);
+            result.abort();  // prevent ref leak complaints
+            return ListenableFutureTask.create(() -> {}, null);
+        }
+    }
+
+    /**
+     * verify the pending anti compaction happy path
+     */
+    @Test
+    public void successCase() throws Exception
+    {
+        Assert.assertSame(ByteOrderedPartitioner.class, 
DatabaseDescriptor.getPartitioner().getClass());
+        cfs.disableAutoCompaction();
+
+        // create 2 sstables, one that will be split, and another that will be 
moved
+        for (int i = 0; i < 8; i++)
+        {
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), i, i);
+        }
+        cfs.forceBlockingFlush();
+        for (int i = 8; i < 12; i++)
+        {
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), i, i);
+        }
+        cfs.forceBlockingFlush();
+        Assert.assertEquals(2, cfs.getLiveSSTables().size());
+
+        Token left = 
ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6));
+        Token right = 
ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16));
+        Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(left, right));
+
+        // create a session so the anti compaction can fine it
+        UUID sessionID = UUIDGen.getTimeUUID();
+        ActiveRepairService.instance.registerParentRepairSession(sessionID, 
InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true);
+
+        PendingAntiCompaction pac;
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try
+        {
+            pac = new PendingAntiCompaction(sessionID, ranges, executor);
+            pac.run().get();
+        }
+        finally
+        {
+            executor.shutdown();
+        }
+
+        Assert.assertEquals(3, cfs.getLiveSSTables().size());
+        int pendingRepair = 0;
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+        {
+            if (sstable.isPendingRepair())
+                pendingRepair++;
+        }
+        Assert.assertEquals(2, pendingRepair);
+    }
+
+    @Test
+    public void acquisitionSuccess() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(6);
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        List<SSTableReader> expected = sstables.subList(0, 3);
+        Collection<Range<Token>> ranges = new HashSet<>();
+        for (SSTableReader sstable : expected)
+        {
+            ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
+        }
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID());
+
+        logger.info("SSTables: {}", sstables);
+        logger.info("Expected: {}", expected);
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+        logger.info("Originals: {}", result.txn.originals());
+        Assert.assertEquals(3, result.txn.originals().size());
+        for (SSTableReader sstable : expected)
+        {
+            logger.info("Checking {}", sstable);
+            Assert.assertTrue(result.txn.originals().contains(sstable));
+        }
+
+        
Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, 
result.txn.state());
+        result.abort();
+    }
+
+    @Test
+    public void repairedSSTablesAreNotAcquired() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        Assert.assertEquals(2, sstables.size());
+        SSTableReader repaired = sstables.get(0);
+        SSTableReader unrepaired = sstables.get(1);
+        Assert.assertTrue(repaired.intersects(FULL_RANGE));
+        Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+
+        
repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 
1, null);
+        repaired.reloadSSTableMetadata();
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+
+        logger.info("Originals: {}", result.txn.originals());
+        Assert.assertEquals(1, result.txn.originals().size());
+        Assert.assertTrue(result.txn.originals().contains(unrepaired));
+        result.abort(); // release sstable refs
+    }
+
+    @Test
+    public void pendingRepairSSTablesAreNotAcquired() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        Assert.assertEquals(2, sstables.size());
+        SSTableReader repaired = sstables.get(0);
+        SSTableReader unrepaired = sstables.get(1);
+        Assert.assertTrue(repaired.intersects(FULL_RANGE));
+        Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+
+        
repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 
0, UUIDGen.getTimeUUID());
+        repaired.reloadSSTableMetadata();
+        Assert.assertTrue(repaired.isPendingRepair());
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+
+        logger.info("Originals: {}", result.txn.originals());
+        Assert.assertEquals(1, result.txn.originals().size());
+        Assert.assertTrue(result.txn.originals().contains(unrepaired));
+        result.abort();  // releases sstable refs
+    }
+
+    /**
+     * anti compaction task should be submitted if everything is ok
+     */
+    @Test
+    public void callbackSuccess() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+
+        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        cb.apply(Lists.newArrayList(result));
+
+        Assert.assertEquals(1, cb.submittedCompactions.size());
+        Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
+    }
+
+    /**
+     * If one of the supplied AcquireResults is null, either an Exception was 
thrown, or
+     * we couldn't get a transaction for the sstables. In either case we need 
to cancel the repair, and release
+     * any sstables acquired for other tables
+     */
+    @Test
+    public void callbackNullResult() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+        
Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, 
result.txn.state());
+
+        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        cb.apply(Lists.newArrayList(result, null));
+
+        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, 
result.txn.state());
+    }
+
+    /**
+     * If an AcquireResult has a null txn, there were no sstables to acquire 
references
+     * for, so no anti compaction should have been submitted.
+     */
+    @Test
+    public void callbackNullTxn() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+
+        ColumnFamilyStore cfs2 = 
Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system",
 "peers").id);
+        PendingAntiCompaction.AcquireResult fakeResult = new 
PendingAntiCompaction.AcquireResult(cfs2, null, null);
+
+        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        cb.apply(Lists.newArrayList(result, fakeResult));
+
+        Assert.assertEquals(1, cb.submittedCompactions.size());
+        Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
+        Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
 
b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
index 3924045..449a5dc 100644
--- 
a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -159,13 +159,6 @@ public class RepairMessageSerializationsTest
     }
 
     @Test
-    public void antiCompactionRequestMessage() throws IOException
-    {
-        AnticompactionRequest msg = new 
AnticompactionRequest(UUID.randomUUID(), buildTokenRanges());
-        serializeRoundTrip(msg, AnticompactionRequest.serializer);
-    }
-
-    @Test
     public void prepareMessage() throws IOException
     {
         PrepareMessage msg = new PrepareMessage(UUID.randomUUID(), new 
ArrayList<TableId>() {{add(TableId.generate());}},

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
 
b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
new file mode 100644
index 0000000..9789b38
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * verifies repair message serializers are working as advertised
+ */
+public class RepairMessageSerializerTest
+{
+    private static int MS_VERSION = MessagingService.current_version;
+
+    static RepairMessage serdes(RepairMessage message)
+    {
+        int expectedSize = (int) 
RepairMessage.serializer.serializedSize(message, MS_VERSION);
+        try (DataOutputBuffer out = new DataOutputBuffer(expectedSize))
+        {
+            RepairMessage.serializer.serialize(message, out, MS_VERSION);
+            Assert.assertEquals(expectedSize, out.buffer().limit());
+            try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false))
+            {
+                return RepairMessage.serializer.deserialize(in, MS_VERSION);
+            }
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    @Test
+    public void prepareConsistentRequest() throws Exception
+    {
+        InetAddress coordinator = InetAddress.getByName("10.0.0.1");
+        InetAddress peer1 = InetAddress.getByName("10.0.0.2");
+        InetAddress peer2 = InetAddress.getByName("10.0.0.3");
+        InetAddress peer3 = InetAddress.getByName("10.0.0.4");
+        RepairMessage expected = new 
PrepareConsistentRequest(UUIDGen.getTimeUUID(),
+                                                              coordinator,
+                                                              
Sets.newHashSet(peer1, peer2, peer3));
+        RepairMessage actual = serdes(expected);
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void prepareConsistentResponse() throws Exception
+    {
+        RepairMessage expected = new 
PrepareConsistentResponse(UUIDGen.getTimeUUID(),
+                                                               
InetAddress.getByName("10.0.0.2"),
+                                                               true);
+        RepairMessage actual = serdes(expected);
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void failSession() throws Exception
+    {
+        RepairMessage expected = new FailSession(UUIDGen.getTimeUUID());
+        RepairMessage actual = serdes(expected);
+        Assert.assertEquals(expected, actual);;
+    }
+
+    @Test
+    public void finalizeCommit() throws Exception
+    {
+        RepairMessage expected = new FinalizeCommit(UUIDGen.getTimeUUID());
+        RepairMessage actual = serdes(expected);
+        Assert.assertEquals(expected, actual);;
+    }
+
+    @Test
+    public void finalizePromise() throws Exception
+    {
+        RepairMessage expected = new FinalizePromise(UUIDGen.getTimeUUID(),
+                                                     
InetAddress.getByName("10.0.0.2"),
+                                                     true);
+        RepairMessage actual = serdes(expected);
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void finalizePropose() throws Exception
+    {
+        RepairMessage expected = new FinalizePropose(UUIDGen.getTimeUUID());
+        RepairMessage actual = serdes(expected);
+        Assert.assertEquals(expected, actual);;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java 
b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index 29d9756..9eb7c86 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@ -149,14 +149,16 @@ public class RepairOptionTest
     }
 
     @Test
-    public void testIncrementalRepairWithSubrangesIsNotGlobal() throws 
Exception
+    public void testNonGlobalIncrementalRepairParse() throws Exception
     {
-        RepairOption ro = 
RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", 
RepairOption.RANGES_KEY, "42:42"),
-                           Murmur3Partitioner.instance);
-        assertFalse(ro.isGlobal());
-        ro = RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, 
"true", RepairOption.RANGES_KEY, ""),
-                Murmur3Partitioner.instance);
-        assertTrue(ro.isGlobal());
+        Map<String, String> options = new HashMap<>();
+        options.put(RepairOption.PARALLELISM_KEY, "parallel");
+        options.put(RepairOption.PRIMARY_RANGE_KEY, "false");
+        options.put(RepairOption.INCREMENTAL_KEY, "true");
+        options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3");
+        options.put(RepairOption.HOSTS_KEY, "127.0.0.1, 127.0.0.2");
+        assertParseThrowsIllegalArgumentExceptionWithMessage(options, 
"Incremental repairs cannot be run against a subset of tokens or ranges");
+
     }
 
     private void 
assertParseThrowsIllegalArgumentExceptionWithMessage(Map<String, String> 
optionsToParse, String expectedErrorMessage)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/schema/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java 
b/test/unit/org/apache/cassandra/schema/MockSchema.java
index b94b49c..99fff32 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -118,7 +118,7 @@ public class MockSchema
         }
         SerializationHeader header = SerializationHeader.make(cfs.metadata(), 
Collections.emptyList());
         StatsMetadata metadata = (StatsMetadata) new 
MetadataCollector(cfs.metadata().comparator)
-                                                 
.finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 
0.01f, -1, header)
+                                                 
.finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 
0.01f, -1, null, header)
                                                  .get(MetadataType.STATS);
         SSTableReader reader = SSTableReader.internalOpen(descriptor, 
components, cfs.metadata,
                                                           
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), 
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java 
b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 44bd58c..8f8fe6d 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.dht.Range;
@@ -226,81 +225,7 @@ public class ActiveRepairServiceTest
     }
 
     @Test
-    public void testGetActiveRepairedSSTableRefs()
-    {
-        ColumnFamilyStore store = prepareColumnFamilyStore();
-        Set<SSTableReader> original = store.getLiveSSTables();
-
-        UUID prsId = UUID.randomUUID();
-        ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, 0, false);
-        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
-        prs.markSSTablesRepairing(store.metadata.id, prsId);
-
-        //retrieve all sstable references from parent repair sessions
-        Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId);
-        Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
-        assertEquals(original, retrieved);
-        refs.release();
-
-        //remove 1 sstable from data data tracker
-        Set<SSTableReader> newLiveSet = new HashSet<>(original);
-        Iterator<SSTableReader> it = newLiveSet.iterator();
-        final SSTableReader removed = it.next();
-        it.remove();
-        store.getTracker().dropSSTables(new 
com.google.common.base.Predicate<SSTableReader>()
-        {
-            public boolean apply(SSTableReader reader)
-            {
-                return removed.equals(reader);
-            }
-        }, OperationType.COMPACTION, null);
-
-        //retrieve sstable references from parent repair session again - 
removed sstable must not be present
-        refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId);
-        retrieved = Sets.newHashSet(refs.iterator());
-        assertEquals(newLiveSet, retrieved);
-        assertFalse(retrieved.contains(removed));
-        refs.release();
-    }
-
-    @Test
-    public void testAddingMoreSSTables()
-    {
-        ColumnFamilyStore store = prepareColumnFamilyStore();
-        Set<SSTableReader> original = 
Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> 
!s.isRepaired())).sstables);
-        UUID prsId = UUID.randomUUID();
-        ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, System.currentTimeMillis(), true);
-
-        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
-        prs.markSSTablesRepairing(store.metadata.id, prsId);
-        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId))
-        {
-            Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
-            assertEquals(original, retrieved);
-        }
-        createSSTables(store, 2);
-        boolean exception = false;
-        try
-        {
-            UUID newPrsId = UUID.randomUUID();
-            ActiveRepairService.instance.registerParentRepairSession(newPrsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, System.currentTimeMillis(), true);
-            
ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.id,
 newPrsId);
-        }
-        catch (Throwable t)
-        {
-            exception = true;
-        }
-        assertTrue(exception);
-
-        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id, prsId))
-        {
-            Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
-            assertEquals(original, retrieved);
-        }
-    }
-
-    @Test
-    public void testSnapshotAddSSTables() throws ExecutionException, 
InterruptedException
+    public void testSnapshotAddSSTables() throws Exception
     {
         ColumnFamilyStore store = prepareColumnFamilyStore();
         UUID prsId = UUID.randomUUID();
@@ -312,40 +237,7 @@ public class ActiveRepairServiceTest
         ActiveRepairService.instance.registerParentRepairSession(prsId2, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
         createSSTables(store, 2);
         
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id,
 prsId);
-        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id,
 prsId))
-        {
-            assertEquals(original, Sets.newHashSet(refs.iterator()));
-        }
-        store.forceMajorCompaction();
-        // after a major compaction the original sstables will be gone and we 
will have no sstables to anticompact:
-        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id,
 prsId))
-        {
-            assertEquals(0, refs.size());
-        }
-    }
-
-    @Test
-    public void testSnapshotMultipleRepairs()
-    {
-        ColumnFamilyStore store = prepareColumnFamilyStore();
-        Set<SSTableReader> original = 
Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> 
!s.isRepaired())).sstables);
-        UUID prsId = UUID.randomUUID();
-        ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
-        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id,
 prsId);
-
-        UUID prsId2 = UUID.randomUUID();
-        ActiveRepairService.instance.registerParentRepairSession(prsId2, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), 
true);
-        boolean exception = false;
-        try
-        {
-            
ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.id,
 prsId2);
-        }
-        catch (Throwable t)
-        {
-            exception = true;
-        }
-        assertTrue(exception);
-        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.id,
 prsId))
+        try (Refs<SSTableReader> refs = 
store.getSnapshotSSTableReaders(prsId.toString()))
         {
             assertEquals(original, Sets.newHashSet(refs.iterator()));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index ffe26a2..682e039 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -74,7 +74,7 @@ public class StreamTransferTaskTest
     public void testScheduleTimeout() throws Exception
     {
         InetAddress peer = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
false);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
false, null);
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
@@ -120,9 +120,9 @@ public class StreamTransferTaskTest
     public void testFailSessionDuringTransferShouldNotReleaseReferences() 
throws Exception
     {
         InetAddress peer = FBUtilities.getBroadcastAddress();
-        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, 
false, null, false);
+        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, 
false, null, false, null);
         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), 
"", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
-        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
false);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
false, null);
         session.init(future);
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 

Reply via email to