This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new dca76145c2 Add retries to IR messages
dca76145c2 is described below

commit dca76145c2c1f846ed624c93b9c64484ce1946b7
Author: David Capwell <[email protected]>
AuthorDate: Tue Oct 31 13:56:29 2023 -0700

    Add retries to IR messages
    
    patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-18962
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/net/Verb.java        |  12 +--
 .../cassandra/repair/RepairMessageVerbHandler.java |  17 ++-
 .../org/apache/cassandra/repair/SharedContext.java | 115 +++++++++++++++++++++
 .../repair/consistent/ConsistentSession.java       |   2 +-
 .../repair/consistent/CoordinatorSession.java      |  57 ++++++----
 .../repair/consistent/CoordinatorSessions.java     |  24 ++++-
 .../cassandra/repair/consistent/LocalSessions.java |  77 ++++++++++----
 .../cassandra/repair/messages/FailSession.java     |   6 ++
 .../cassandra/repair/messages/FinalizeCommit.java  |   6 ++
 .../cassandra/repair/messages/FinalizePromise.java |   6 ++
 .../cassandra/repair/messages/FinalizePropose.java |   6 ++
 .../repair/messages/PrepareConsistentRequest.java  |   6 ++
 .../repair/messages/PrepareConsistentResponse.java |   6 ++
 .../cassandra/repair/messages/RepairMessage.java   |  32 +++++-
 .../org/apache/cassandra/net/MatcherResponse.java  |  13 ++-
 .../repair/ConcurrentIrWithPreviewFuzzTest.java    |  18 +++-
 .../cassandra/repair/FailingRepairFuzzTest.java    |   1 +
 .../org/apache/cassandra/repair/FuzzTestBase.java  |  36 +++++--
 .../cassandra/repair/SlowMessageFuzzTest.java      |   1 +
 .../repair/consistent/CoordinatorSessionTest.java  |  64 ++++++------
 .../repair/consistent/CoordinatorSessionsTest.java |  24 +++--
 .../repair/consistent/LocalSessionTest.java        |  42 +++++---
 .../cassandra/repair/consistent/MockMessaging.java |  88 ++++++++++++++++
 24 files changed, 524 insertions(+), 136 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 8611719d56..f288f3206e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0-alpha2
+ * Add retries to IR messages (CASSANDRA-18962)
  * Add metrics and logging to repair retries (CASSANDRA-18952)
  * Remove deprecated code in Cassandra 1.x and 2.x (CASSANDRA-18959)
  * ClientRequestSize metrics should not treat CONTAINS restrictions as being 
equality-based (CASSANDRA-18896)
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index cf0f2e30a3..c85f0ddeca 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -169,12 +169,12 @@ public enum Verb
     PREPARE_MSG            (105, P1, repairWithBackoffTimeout,      
ANTI_ENTROPY,      () -> PrepareMessage.serializer,            () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
     SNAPSHOT_MSG           (106, P1, repairWithBackoffTimeout,      
ANTI_ENTROPY,      () -> SnapshotMessage.serializer,           () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
     CLEANUP_MSG            (107, P1, repairWithBackoffTimeout,      
ANTI_ENTROPY,      () -> CleanupMessage.serializer,            () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
-    PREPARE_CONSISTENT_RSP (109, P1, repairTimeout,   ANTI_ENTROPY,      () -> 
PrepareConsistentResponse.serializer, () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
-    PREPARE_CONSISTENT_REQ (108, P1, repairTimeout,   ANTI_ENTROPY,      () -> 
PrepareConsistentRequest.serializer,  () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
-    FINALIZE_PROPOSE_MSG   (110, P1, repairTimeout,   ANTI_ENTROPY,      () -> 
FinalizePropose.serializer,           () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
-    FINALIZE_PROMISE_MSG   (111, P1, repairTimeout,   ANTI_ENTROPY,      () -> 
FinalizePromise.serializer,           () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
-    FINALIZE_COMMIT_MSG    (112, P1, repairTimeout,   ANTI_ENTROPY,      () -> 
FinalizeCommit.serializer,            () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
-    FAILED_SESSION_MSG     (113, P1, repairTimeout,   ANTI_ENTROPY,      () -> 
FailSession.serializer,               () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
+    PREPARE_CONSISTENT_RSP (109, P1, repairWithBackoffTimeout,      
ANTI_ENTROPY,      () -> PrepareConsistentResponse.serializer, () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
+    PREPARE_CONSISTENT_REQ (108, P1, repairWithBackoffTimeout,      
ANTI_ENTROPY,      () -> PrepareConsistentRequest.serializer,  () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
+    FINALIZE_PROPOSE_MSG   (110, P1, repairWithBackoffTimeout,      
ANTI_ENTROPY,      () -> FinalizePropose.serializer,           () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
+    FINALIZE_PROMISE_MSG   (111, P1, repairWithBackoffTimeout,      
ANTI_ENTROPY,      () -> FinalizePromise.serializer,           () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
+    FINALIZE_COMMIT_MSG    (112, P1, repairWithBackoffTimeout,      
ANTI_ENTROPY,      () -> FinalizeCommit.serializer,            () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
+    FAILED_SESSION_MSG     (113, P1, repairWithBackoffTimeout,      
ANTI_ENTROPY,      () -> FailSession.serializer,               () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
     STATUS_RSP             (115, P1, repairTimeout,   ANTI_ENTROPY,      () -> 
StatusResponse.serializer,            () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
     STATUS_REQ             (114, P1, repairTimeout,   ANTI_ENTROPY,      () -> 
StatusRequest.serializer,             () -> 
RepairMessageVerbHandler.instance(),   REPAIR_RSP          ),
 
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 988159860d..34f20cd708 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.repair.messages.*;
@@ -286,27 +285,28 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     break;
 
                 case PREPARE_CONSISTENT_REQ:
-                    
ctx.repair().consistent.local.handlePrepareMessage(message.from(), 
(PrepareConsistentRequest) message.payload);
+                    
ctx.repair().consistent.local.handlePrepareMessage(message);
                     break;
 
                 case PREPARE_CONSISTENT_RSP:
-                    
ctx.repair().consistent.coordinated.handlePrepareResponse((PrepareConsistentResponse)
 message.payload);
+                    
ctx.repair().consistent.coordinated.handlePrepareResponse(message);
                     break;
 
                 case FINALIZE_PROPOSE_MSG:
-                    
ctx.repair().consistent.local.handleFinalizeProposeMessage(message.from(), 
(FinalizePropose) message.payload);
+                    
ctx.repair().consistent.local.handleFinalizeProposeMessage(message);
                     break;
 
                 case FINALIZE_PROMISE_MSG:
-                    
ctx.repair().consistent.coordinated.handleFinalizePromiseMessage((FinalizePromise)
 message.payload);
+                    
ctx.repair().consistent.coordinated.handleFinalizePromiseMessage(message);
                     break;
 
                 case FINALIZE_COMMIT_MSG:
-                    
ctx.repair().consistent.local.handleFinalizeCommitMessage(message.from(), 
(FinalizeCommit) message.payload);
+                    
ctx.repair().consistent.local.handleFinalizeCommitMessage(message);
                     break;
 
                 case FAILED_SESSION_MSG:
                     FailSession failure = (FailSession) message.payload;
+                    sendAck(message);
                     
ctx.repair().consistent.coordinated.handleFailSessionMessage(failure);
                     
ctx.repair().consistent.local.handleFailSessionMessage(message.from(), failure);
                     break;
@@ -410,12 +410,11 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
 
     private void sendFailureResponse(Message<?> respondTo)
     {
-        Message<?> reply = 
respondTo.failureResponse(RequestFailureReason.UNKNOWN);
-        ctx.messaging().send(reply, respondTo.from());
+        RepairMessage.sendFailureResponse(ctx, respondTo);
     }
 
     private void sendAck(Message<RepairMessage> message)
     {
-        ctx.messaging().send(message.emptyResponse(), message.from());
+        RepairMessage.sendAck(ctx, message);
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/SharedContext.java 
b/src/java/org/apache/cassandra/repair/SharedContext.java
index defc59a83c..8ccc88f584 100644
--- a/src/java/org/apache/cassandra/repair/SharedContext.java
+++ b/src/java/org/apache/cassandra/repair/SharedContext.java
@@ -59,6 +59,16 @@ public interface SharedContext
     ScheduledExecutorPlus optionalTasks();
 
     MessageDelivery messaging();
+    default SharedContext withMessaging(MessageDelivery messaging)
+    {
+        return new ForwardingSharedContext(this) {
+            @Override
+            public MessageDelivery messaging()
+            {
+                return messaging;
+            }
+        };
+    }
     IFailureDetector failureDetector();
     IEndpointSnitch snitch();
     IGossiper gossiper();
@@ -162,4 +172,109 @@ public interface SharedContext
             return StreamPlan::execute;
         }
     }
+
+    class ForwardingSharedContext implements SharedContext
+    {
+        private final SharedContext delegate;
+
+        public ForwardingSharedContext(SharedContext delegate)
+        {
+            this.delegate = delegate;
+        }
+
+        protected SharedContext delegate()
+        {
+            return delegate;
+        }
+
+        @Override
+        public InetAddressAndPort broadcastAddressAndPort()
+        {
+            return delegate().broadcastAddressAndPort();
+        }
+
+        @Override
+        public Supplier<Random> random()
+        {
+            return delegate().random();
+        }
+
+        @Override
+        public Clock clock()
+        {
+            return delegate().clock();
+        }
+
+        @Override
+        public ExecutorFactory executorFactory()
+        {
+            return delegate().executorFactory();
+        }
+
+        @Override
+        public MBeanWrapper mbean()
+        {
+            return delegate().mbean();
+        }
+
+        @Override
+        public ScheduledExecutorPlus optionalTasks()
+        {
+            return delegate().optionalTasks();
+        }
+
+        @Override
+        public MessageDelivery messaging()
+        {
+            return delegate().messaging();
+        }
+
+        @Override
+        public IFailureDetector failureDetector()
+        {
+            return delegate().failureDetector();
+        }
+
+        @Override
+        public IEndpointSnitch snitch()
+        {
+            return delegate().snitch();
+        }
+
+        @Override
+        public IGossiper gossiper()
+        {
+            return delegate().gossiper();
+        }
+
+        @Override
+        public ICompactionManager compactionManager()
+        {
+            return delegate().compactionManager();
+        }
+
+        @Override
+        public ActiveRepairService repair()
+        {
+            return delegate().repair();
+        }
+
+        @Override
+        public IValidationManager validationManager()
+        {
+            return delegate().validationManager();
+        }
+
+        @Override
+        public TableRepairManager repairManager(ColumnFamilyStore store)
+        {
+            return delegate().repairManager(store);
+        }
+
+        @Override
+        public StreamExecutor streamExecutor()
+        {
+            return delegate().streamExecutor();
+        }
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java 
b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index 76645f1adc..8101f02912 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -70,7 +70,7 @@ import org.apache.cassandra.utils.TimeUUID;
  *  {@code PREPARED}, and a {@link PrepareConsistentResponse} is sent to the 
coordinator indicating success or failure.
  *  If the pending anti-compaction fails, the local session state is set to 
{@code FAILED}.
  *  <p/>
- *  (see {@link LocalSessions#handlePrepareMessage(InetAddressAndPort, 
PrepareConsistentRequest)}
+ *  (see {@link 
LocalSessions#handlePrepareMessage(org.apache.cassandra.net.Message)}
  *  <p/>
  *  Once the coordinator recieves positive {@code PrepareConsistentResponse} 
messages from all the participants, the
  *  coordinator begins the normal repair process.
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java 
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 0ca1576df1..6771dfe07d 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -31,6 +31,8 @@ import com.google.common.collect.Iterables;
 import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.repair.CoordinatedRepairResult;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 
@@ -47,10 +49,14 @@ import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizeCommit;
 import org.apache.cassandra.repair.messages.FinalizePropose;
 import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
-import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 
+import static org.apache.cassandra.repair.messages.RepairMessage.notDone;
+import static org.apache.cassandra.repair.messages.RepairMessage.sendAck;
+import static 
org.apache.cassandra.repair.messages.RepairMessage.sendFailureResponse;
+import static 
org.apache.cassandra.repair.messages.RepairMessage.sendMessageWithRetries;
+
 /**
  * Coordinator side logic and state of a consistent repair session. Like 
{@link ActiveRepairService.ParentRepairSession},
  * there is only one {@code CoordinatorSession} per user repair command, 
regardless of the number of tables and token
@@ -154,34 +160,37 @@ public class CoordinatorSession extends ConsistentSession
         return getState() == State.FAILED || 
Iterables.any(participantStates.values(), v -> v == State.FAILED);
     }
 
-    protected void sendMessage(InetAddressAndPort destination, 
Message<RepairMessage> message)
-    {
-        logger.trace("Sending {} to {}", message.payload, destination);
-
-        ctx.messaging().send(message, destination);
-    }
-
     public Future<Void> prepare()
     {
         Preconditions.checkArgument(allStates(State.PREPARING));
 
         logger.info("Beginning prepare phase of incremental repair session 
{}", sessionID);
-        Message<RepairMessage> message =
-            Message.out(Verb.PREPARE_CONSISTENT_REQ, new 
PrepareConsistentRequest(sessionID, coordinator, participants));
+
+        PrepareConsistentRequest request = new 
PrepareConsistentRequest(sessionID, coordinator, participants);
         for (final InetAddressAndPort participant : participants)
         {
-            sendMessage(participant, message);
+            sendMessageWithRetries(ctx,
+                                   notDone(prepareFuture),
+                                   request,
+                                   Verb.PREPARE_CONSISTENT_REQ,
+                                   participant);
         }
         return prepareFuture;
     }
 
-    public synchronized void handlePrepareResponse(InetAddressAndPort 
participant, boolean success)
+    public synchronized void 
handlePrepareResponse(Message<PrepareConsistentResponse> msg)
     {
+        InetAddressAndPort participant = msg.payload.participant;
+        boolean success = msg.payload.success;
         if (getState() == State.FAILED)
         {
             logger.trace("Incremental repair {} has failed, ignoring prepare 
response from {}", sessionID, participant);
+            sendFailureResponse(ctx, msg);
             return;
         }
+        sendAck(ctx, msg);
+        if (getParticipantState(participant) != State.PREPARING)
+            return;
         if (!success)
         {
             logger.warn("{} failed the prepare phase for incremental repair 
session {}", participant, sessionID);
@@ -218,19 +227,29 @@ public class CoordinatorSession extends ConsistentSession
     {
         Preconditions.checkArgument(allStates(State.REPAIRING));
         logger.info("Proposing finalization of repair session {}", sessionID);
-        Message<RepairMessage> message = 
Message.out(Verb.FINALIZE_PROPOSE_MSG, new FinalizePropose(sessionID));
+        FinalizePropose request = new FinalizePropose(sessionID);
         for (final InetAddressAndPort participant : participants)
         {
-            sendMessage(participant, message);
+            sendMessageWithRetries(ctx, notDone(finalizeProposeFuture), 
request, Verb.FINALIZE_PROPOSE_MSG, participant);
         }
         return finalizeProposeFuture;
     }
 
-    public synchronized void handleFinalizePromise(InetAddressAndPort 
participant, boolean success)
+    public synchronized void handleFinalizePromise(Message<FinalizePromise> 
message)
     {
+        InetAddressAndPort participant = message.payload.participant;
+        boolean success = message.payload.promised;
         if (getState() == State.FAILED)
         {
             logger.trace("Incremental repair {} has failed, ignoring finalize 
promise from {}", sessionID, participant);
+            sendFailureResponse(ctx, message);
+            return;
+        }
+        sendAck(ctx, message);
+        if (getParticipantState(participant) != State.REPAIRING)
+        {
+            // this message is a retry, or we failed the session; in either 
case there is nothing more to do than ack
+            return;
         }
         else if (!success)
         {
@@ -253,10 +272,10 @@ public class CoordinatorSession extends ConsistentSession
     {
         Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED));
         logger.info("Committing finalization of repair session {}", sessionID);
-        Message<RepairMessage> message = Message.out(Verb.FINALIZE_COMMIT_MSG, 
new FinalizeCommit(sessionID));
+        FinalizeCommit payload = new FinalizeCommit(sessionID);
         for (final InetAddressAndPort participant : participants)
         {
-            sendMessage(participant, message);
+            sendMessageWithRetries(ctx, payload, Verb.FINALIZE_COMMIT_MSG, 
participant);
         }
         setAll(State.FINALIZED);
         logger.info("Incremental repair session {} completed", sessionID);
@@ -264,12 +283,12 @@ public class CoordinatorSession extends ConsistentSession
 
     private void sendFailureMessageToParticipants()
     {
-        Message<RepairMessage> message = Message.out(Verb.FAILED_SESSION_MSG, 
new FailSession(sessionID));
+        FailSession payload = new FailSession(sessionID);
         for (final InetAddressAndPort participant : participants)
         {
             if (participantStates.get(participant) != State.FAILED)
             {
-                sendMessage(participant, message);
+                sendMessageWithRetries(ctx, payload, Verb.FAILED_SESSION_MSG, 
participant);
             }
         }
     }
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
index 1abab8c476..6f02f033d3 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
@@ -24,15 +24,19 @@ import java.util.Set;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.cassandra.net.Message;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizePromise;
 import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.repair.NoSuchRepairSessionException;
 import org.apache.cassandra.utils.TimeUUID;
 
+import static 
org.apache.cassandra.repair.messages.RepairMessage.sendFailureResponse;
+
 /**
  * Container for all consistent repair sessions a node is coordinating
  */
@@ -81,21 +85,31 @@ public class CoordinatorSessions
         return sessions.get(sessionId);
     }
 
-    public void handlePrepareResponse(PrepareConsistentResponse msg)
+    public void handlePrepareResponse(Message<? extends RepairMessage> msg)
     {
-        CoordinatorSession session = getSession(msg.parentSession);
+        PrepareConsistentResponse payload = (PrepareConsistentResponse) 
msg.payload;
+        CoordinatorSession session = getSession(payload.parentSession);
         if (session != null)
         {
-            session.handlePrepareResponse(msg.participant, msg.success);
+            session.handlePrepareResponse((Message<PrepareConsistentResponse>) 
msg);
+        }
+        else
+        {
+            sendFailureResponse(ctx, msg);
         }
     }
 
-    public void handleFinalizePromiseMessage(FinalizePromise msg)
+    public void handleFinalizePromiseMessage(Message<? extends RepairMessage> 
message)
     {
+        FinalizePromise msg = (FinalizePromise) message.payload;
         CoordinatorSession session = getSession(msg.sessionID);
         if (session != null)
         {
-            session.handleFinalizePromise(msg.participant, msg.promised);
+            session.handleFinalizePromise((Message<FinalizePromise>) message);
+        }
+        else
+        {
+            sendFailureResponse(ctx, message);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 85bdf15923..75c50e5dce 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
@@ -58,6 +59,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.KeyspaceRepairManager;
 import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
 import org.apache.cassandra.repair.consistent.admin.PendingStat;
@@ -105,6 +107,9 @@ import static 
org.apache.cassandra.net.Verb.PREPARE_CONSISTENT_RSP;
 import static org.apache.cassandra.net.Verb.STATUS_REQ;
 import static org.apache.cassandra.net.Verb.STATUS_RSP;
 import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
+import static org.apache.cassandra.repair.messages.RepairMessage.always;
+import static org.apache.cassandra.repair.messages.RepairMessage.sendAck;
+import static 
org.apache.cassandra.repair.messages.RepairMessage.sendFailureResponse;
 
 /**
  * Manages all consistent repair sessions a node is participating in.
@@ -335,11 +340,11 @@ public class LocalSessions
                                     sessionID, session.coordinator);
 
         setStateAndSave(session, FAILED);
-        Message<FailSession> message = Message.out(FAILED_SESSION_MSG, new 
FailSession(sessionID));
+        FailSession payload = new FailSession(sessionID);
         for (InetAddressAndPort participant : session.participants)
         {
             if (!participant.equals(getBroadcastAddressAndPort()))
-                sendMessage(participant, message);
+                sendMessageWithRetries(payload, FAILED_SESSION_MSG, 
participant);
         }
     }
 
@@ -637,10 +642,13 @@ public class LocalSessions
     }
 
     @VisibleForTesting
-    synchronized void putSessionUnsafe(LocalSession session)
+    synchronized boolean putSessionUnsafe(LocalSession session)
     {
+        if (sessions.containsKey(session.sessionID))
+            return false;
         putSession(session);
         save(session);
+        return true;
     }
 
     private synchronized void putSession(LocalSession session)
@@ -696,12 +704,19 @@ public class LocalSessions
 
     @VisibleForTesting
     void setStateAndSave(LocalSession session, ConsistentSession.State state)
+    {
+        maybeSetStateAndSave(session, null, state);
+    }
+
+    private boolean maybeSetStateAndSave(LocalSession session, @Nullable 
ConsistentSession.State expected, ConsistentSession.State state)
     {
         synchronized (session)
         {
             
Preconditions.checkArgument(session.getState().canTransitionTo(state),
                                         "Invalid state transition %s -> %s",
                                         session.getState(), state);
+            if (expected != null && session.getState() != expected)
+                return false;
             logger.trace("Changing LocalSession state from {} -> {} for {}", 
session.getState(), state, session.sessionID);
             boolean wasCompleted = session.isCompleted();
             session.setState(state);
@@ -714,6 +729,7 @@ public class LocalSessions
             }
             for (Listener listener : listeners)
                 listener.onIRStateChange(session);
+            return true;
         }
     }
 
@@ -746,7 +762,7 @@ public class LocalSessions
             }
             if (sendMessage)
             {
-                sendMessage(session.coordinator, 
Message.out(FAILED_SESSION_MSG, new FailSession(session.sessionID)));
+                sendMessageWithRetries(new FailSession(session.sessionID), 
FAILED_SESSION_MSG, session.coordinator);
             }
         }
     }
@@ -802,8 +818,10 @@ public class LocalSessions
      * successfully. If the data preparation fails, a failure message is sent 
to the coordinator,
      * cancelling the session.
      */
-    public void handlePrepareMessage(InetAddressAndPort from, 
PrepareConsistentRequest request)
+    public void handlePrepareMessage(Message<? extends RepairMessage> message)
     {
+        InetAddressAndPort from = message.from();
+        PrepareConsistentRequest request = (PrepareConsistentRequest) 
message.payload;
         logger.trace("received {} from {}", request, from);
         TimeUUID sessionID = request.parentSession;
         InetAddressAndPort coordinator = request.coordinator;
@@ -817,12 +835,15 @@ public class LocalSessions
         catch (Throwable e)
         {
             logger.error("Error retrieving ParentRepairSession for session {}, 
responding with failure", sessionID);
-            sendMessage(coordinator, Message.out(PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
+            sendFailureResponse(ctx, message);
+            sendMessageWithRetries(always(), new 
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false), 
PREPARE_CONSISTENT_RSP, coordinator);
             return;
         }
 
         LocalSession session = createSessionUnsafe(sessionID, parentSession, 
peers);
-        putSessionUnsafe(session);
+        sendAck(ctx, message);
+        if (!putSessionUnsafe(session))
+            return;
         logger.info("Beginning local incremental repair session {}", session);
 
         ExecutorService executor = ctx.executorFactory().pooled("Repair-" + 
sessionID, parentSession.getColumnFamilyStores().size());
@@ -841,10 +862,7 @@ public class LocalSessions
                     logger.info("Prepare phase for incremental repair session 
{} completed", sessionID);
                     if (!prepareSessionExceptFailed(session))
                         logger.info("Session {} failed before anticompaction 
completed", sessionID);
-                    Message<PrepareConsistentResponse> message =
-                        Message.out(PREPARE_CONSISTENT_RSP,
-                                    new PrepareConsistentResponse(sessionID, 
getBroadcastAddressAndPort(), session.getState() != FAILED));
-                    sendMessage(coordinator, message);
+                    sendMessageWithRetries(always(), new 
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), 
session.getState() != FAILED), PREPARE_CONSISTENT_RSP, coordinator);
                 }
                 finally
                 {
@@ -862,9 +880,7 @@ public class LocalSessions
                         logger.warn("No such repair session: {}", sessionID);
                     else
                         logger.error("Prepare phase for incremental repair 
session {} failed", sessionID, t);
-                    sendMessage(coordinator,
-                                Message.out(PREPARE_CONSISTENT_RSP,
-                                            new 
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
+                    sendMessageWithRetries(always(), new 
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false), 
PREPARE_CONSISTENT_RSP, coordinator);
                     failSession(sessionID, false);
                 }
                 finally
@@ -875,6 +891,16 @@ public class LocalSessions
         });
     }
 
+    private void sendMessageWithRetries(Supplier<Boolean> allowRetry, 
RepairMessage request, Verb verb, InetAddressAndPort endpoint)
+    {
+        RepairMessage.sendMessageWithRetries(ctx, allowRetry, request, verb, 
endpoint);
+    }
+
+    private void sendMessageWithRetries(RepairMessage request, Verb verb, 
InetAddressAndPort endpoint)
+    {
+        RepairMessage.sendMessageWithRetries(ctx, request, verb, endpoint);
+    }
+
     /**
      * Checks for the session state, and sets it to prepared unless it is on a 
failed state.
      * Making the checks inside a synchronized block to prevent the session 
state from
@@ -904,21 +930,26 @@ public class LocalSessions
         }
     }
 
-    public void handleFinalizeProposeMessage(InetAddressAndPort from, 
FinalizePropose propose)
+    public void handleFinalizeProposeMessage(Message<? extends RepairMessage> 
message)
     {
+        InetAddressAndPort from = message.from();
+        FinalizePropose propose = (FinalizePropose) message.payload;
         logger.trace("received {} from {}", propose, from);
         TimeUUID sessionID = propose.sessionID;
         LocalSession session = getSession(sessionID);
         if (session == null)
         {
             logger.info("Received FinalizePropose message for unknown repair 
session {}, responding with failure", sessionID);
-            sendMessage(from, Message.out(FAILED_SESSION_MSG, new 
FailSession(sessionID)));
+            sendFailureResponse(ctx, message);
+            sendMessageWithRetries(new FailSession(sessionID), 
FAILED_SESSION_MSG, from);
             return;
         }
 
+        sendAck(ctx, message);
         try
         {
-            setStateAndSave(session, FINALIZE_PROMISED);
+            if (!maybeSetStateAndSave(session, REPAIRING, FINALIZE_PROMISED))
+                return;
 
             /*
              Flushing the repairs table here, *before* responding to the 
coordinator prevents a scenario where we respond
@@ -929,7 +960,7 @@ public class LocalSessions
              */
             syncTable();
 
-            sendMessage(from, Message.out(FINALIZE_PROMISE_MSG, new 
FinalizePromise(sessionID, getBroadcastAddressAndPort(), true)));
+            RepairMessage.sendMessageWithRetries(ctx, new 
FinalizePromise(sessionID, getBroadcastAddressAndPort(), true), 
FINALIZE_PROMISE_MSG, from);
             logger.info("Received FinalizePropose message for incremental 
repair session {}, responded with FinalizePromise", sessionID);
         }
         catch (IllegalArgumentException e)
@@ -959,19 +990,23 @@ public class LocalSessions
      * as part of the compaction process, and avoids having to worry about in 
progress compactions interfering with the
      * promotion.
      */
-    public void handleFinalizeCommitMessage(InetAddressAndPort from, 
FinalizeCommit commit)
+    public void handleFinalizeCommitMessage(Message<? extends RepairMessage> 
message)
     {
+        InetAddressAndPort from = message.from();
+        FinalizeCommit commit = (FinalizeCommit) message.payload;
         logger.trace("received {} from {}", commit, from);
         TimeUUID sessionID = commit.sessionID;
         LocalSession session = getSession(sessionID);
         if (session == null)
         {
             logger.warn("Ignoring FinalizeCommit message for unknown repair 
session {}", sessionID);
+            sendFailureResponse(ctx, message);
             return;
         }
+        sendAck(ctx, message);
 
-        setStateAndSave(session, FINALIZED);
-        logger.info("Finalized local repair session {}", sessionID);
+        if (maybeSetStateAndSave(session, FINALIZE_PROMISED, FINALIZED))
+            logger.info("Finalized local repair session {}", sessionID);
     }
 
     public void handleFailSessionMessage(InetAddressAndPort from, FailSession 
msg)
diff --git a/src/java/org/apache/cassandra/repair/messages/FailSession.java 
b/src/java/org/apache/cassandra/repair/messages/FailSession.java
index f6826eec6f..19102fb511 100644
--- a/src/java/org/apache/cassandra/repair/messages/FailSession.java
+++ b/src/java/org/apache/cassandra/repair/messages/FailSession.java
@@ -36,6 +36,12 @@ public class FailSession extends RepairMessage
         this.sessionID = sessionID;
     }
 
+    @Override
+    public TimeUUID parentRepairSession()
+    {
+        return sessionID;
+    }
+
     public boolean equals(Object o)
     {
         if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java 
b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
index ca4e6d8a88..d1d9efa601 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
@@ -36,6 +36,12 @@ public class FinalizeCommit extends RepairMessage
         this.sessionID = sessionID;
     }
 
+    @Override
+    public TimeUUID parentRepairSession()
+    {
+        return sessionID;
+    }
+
     public boolean equals(Object o)
     {
         if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java 
b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
index c45f46d48e..ed514d859f 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
@@ -45,6 +45,12 @@ public class FinalizePromise extends RepairMessage
         this.promised = promised;
     }
 
+    @Override
+    public TimeUUID parentRepairSession()
+    {
+        return sessionID;
+    }
+
     public boolean equals(Object o)
     {
         if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java 
b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
index b3c4bfddb9..c1e76e347b 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
@@ -36,6 +36,12 @@ public class FinalizePropose extends RepairMessage
         this.sessionID = sessionID;
     }
 
+    @Override
+    public TimeUUID parentRepairSession()
+    {
+        return sessionID;
+    }
+
     public boolean equals(Object o)
     {
         if (this == o) return true;
diff --git 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
index 9ac1461f65..1a3a6ecd38 100644
--- 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
+++ 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
@@ -50,6 +50,12 @@ public class PrepareConsistentRequest extends RepairMessage
         this.participants = ImmutableSet.copyOf(participants);
     }
 
+    @Override
+    public TimeUUID parentRepairSession()
+    {
+        return parentSession;
+    }
+
     public boolean equals(Object o)
     {
         if (this == o) return true;
diff --git 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
index a422d7f9f7..80f62c1c03 100644
--- 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
+++ 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
@@ -45,6 +45,12 @@ public class PrepareConsistentResponse extends RepairMessage
         this.success = success;
     }
 
+    @Override
+    public TimeUUID parentRepairSession()
+    {
+        return parentSession;
+    }
+
     public boolean equals(Object o)
     {
         if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java 
b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 6411d12c78..f0cbf78f38 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -86,6 +86,13 @@ public abstract class RepairMessage
         map.put(Verb.SYNC_REQ, timeoutVersion);
         map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY);
         map.put(Verb.SYNC_RSP, SUPPORTS_RETRY);
+        // IR messages
+        map.put(Verb.PREPARE_CONSISTENT_REQ, SUPPORTS_RETRY);
+        map.put(Verb.PREPARE_CONSISTENT_RSP, SUPPORTS_RETRY);
+        map.put(Verb.FINALIZE_PROPOSE_MSG, SUPPORTS_RETRY);
+        map.put(Verb.FINALIZE_PROMISE_MSG, SUPPORTS_RETRY);
+        map.put(Verb.FINALIZE_COMMIT_MSG, SUPPORTS_RETRY);
+        map.put(Verb.FAILED_SESSION_MSG, SUPPORTS_RETRY);
         VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map);
 
         EnumSet<Verb> allowsRetry = EnumSet.noneOf(Verb.class);
@@ -96,6 +103,13 @@ public abstract class RepairMessage
         allowsRetry.add(Verb.SYNC_RSP);
         allowsRetry.add(Verb.SNAPSHOT_MSG);
         allowsRetry.add(Verb.CLEANUP_MSG);
+        // IR messages
+        allowsRetry.add(Verb.PREPARE_CONSISTENT_REQ);
+        allowsRetry.add(Verb.PREPARE_CONSISTENT_RSP);
+        allowsRetry.add(Verb.FINALIZE_PROPOSE_MSG);
+        allowsRetry.add(Verb.FINALIZE_PROMISE_MSG);
+        allowsRetry.add(Verb.FINALIZE_COMMIT_MSG);
+        allowsRetry.add(Verb.FAILED_SESSION_MSG);
         ALLOWS_RETRY = Collections.unmodifiableSet(allowsRetry);
     }
 
@@ -154,6 +168,11 @@ public abstract class RepairMessage
         sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, 
verb, endpoint, NOOP_CALLBACK, 0);
     }
 
+    public static void sendMessageWithRetries(SharedContext ctx, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint)
+    {
+        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, 
verb, endpoint, NOOP_CALLBACK, 0);
+    }
+
     @VisibleForTesting
     static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff, 
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, 
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
     {
@@ -214,7 +233,7 @@ public abstract class RepairMessage
                 }
                 else
                 {
-                    noSpam.warn("{} Failure for repair verb " + verb + "; 
could not complete within {} attempts", prefix, attempt);
+                    noSpam.warn("{} {} failure for repair verb " + verb + "; 
could not complete within {} attempts", prefix, reason, attempt);
                     RepairMetrics.retryFailure(verb);
                 }
             }
@@ -279,4 +298,15 @@ public abstract class RepairMessage
             return ErrorHandling.TIMEOUT;
         return ErrorHandling.NONE;
     }
+
+    public static void sendFailureResponse(SharedContext ctx, Message<?> 
respondTo)
+    {
+        Message<?> reply = 
respondTo.failureResponse(RequestFailureReason.UNKNOWN);
+        ctx.messaging().send(reply, respondTo.from());
+    }
+
+    public static void sendAck(SharedContext ctx, Message<? extends 
RepairMessage> message)
+    {
+        ctx.messaging().send(message.emptyResponse(), message.from());
+    }
 }
diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java 
b/test/unit/org/apache/cassandra/net/MatcherResponse.java
index d7b37591df..cad03fcc71 100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@ -187,11 +187,18 @@ public class MatcherResponse
                         Message<?> response = fnResponse.apply(message, to);
                         if (response != null)
                         {
-                            RequestCallbacks.CallbackInfo cb = 
MessagingService.instance().callbacks.get(message.id(), to);
-                            if (cb != null)
-                                cb.callback.onResponse(response);
+                            if (response.verb().isResponse())
+                            {
+                                RequestCallbacks.CallbackInfo cb = 
MessagingService.instance().callbacks.get(message.id(), to);
+                                if (cb != null)
+                                    cb.callback.onResponse(response);
+                                else
+                                    processResponse(response);
+                            }
                             else
+                            {
                                 processResponse(response);
+                            }
 
                             spy.matchingResponse(response);
                         }
diff --git 
a/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java 
b/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
index eef13dc8b6..54ae644e34 100644
--- a/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
+++ b/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
@@ -28,9 +28,11 @@ import accord.utils.Gen;
 import accord.utils.Gens;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.RetrySpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.repair.consistent.LocalSessions;
 import org.apache.cassandra.repair.state.Completable;
 import org.apache.cassandra.utils.Closeable;
+import org.apache.cassandra.utils.FailingBiConsumer;
 import org.assertj.core.api.Assertions;
 
 import static accord.utils.Property.qt;
@@ -62,7 +64,7 @@ public class ConcurrentIrWithPreviewFuzzTest extends 
FuzzTestBase
                 // cause a delay in validation to have more failing previews
                 closeables.add(cluster.nodes.get(pickParticipant(rs, 
previewCoordinator, preview)).doValidation(next -> (cfs, validator) -> {
                     if 
(validator.desc.parentSessionId.equals(preview.state.id))
-                        cluster.unorderedScheduled.schedule(() -> 
next.accept(cfs, validator), 1, TimeUnit.HOURS);
+                        delayValidation(cluster, ir, next, cfs, validator);
                     else next.acceptOrFail(cfs, validator);
                 }));
                 // make sure listeners don't leak
@@ -88,4 +90,18 @@ public class ConcurrentIrWithPreviewFuzzTest extends 
FuzzTestBase
             }
         });
     }
+
+    private void delayValidation(Cluster cluster, RepairCoordinator ir, 
FailingBiConsumer<ColumnFamilyStore, Validator> next, ColumnFamilyStore cfs, 
Validator validator)
+    {
+        cluster.unorderedScheduled.schedule(() -> {
+            // make sure to wait for IR to complete...
+            Completable.Result result = ir.state.getResult();
+            if (result == null)
+            {
+                delayValidation(cluster, ir, next, cfs, validator);
+                return;
+            }
+            next.accept(cfs, validator);
+        }, 1, TimeUnit.HOURS);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java 
b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
index 7c6a4601e1..39576c95f0 100644
--- a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
+++ b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
@@ -114,6 +114,7 @@ public class FailingRepairFuzzTest extends FuzzTestBase
                 }
 
                 cluster.processAll();
+                
Assertions.assertThat(repair.state.isComplete()).describedAs("Repair job did 
not complete, and no work is pending...").isTrue();
                 
Assertions.assertThat(repair.state.getResult().kind).describedAs("Unexpected 
state: %s -> %s; example %d", repair.state, repair.state.getResult(), 
example).isEqualTo(Completable.Result.Kind.FAILURE);
                 switch (stage)
                 {
diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java 
b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index d29f35799e..fb7702cd82 100644
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -73,6 +74,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Digest;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.ICompactionManager;
 import org.apache.cassandra.db.marshal.EmptyType;
 import org.apache.cassandra.db.repair.CassandraTableRepairManager;
@@ -280,6 +282,12 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
         createSchema();
     }
 
+    protected void cleanupRepairTables()
+    {
+        for (String table : Arrays.asList(SystemKeyspace.REPAIRS))
+            execute(String.format("TRUNCATE %s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, table));
+    }
+
     private void createSchema()
     {
         // The main reason to use random here with a fixed seed is just to 
have a set of tables that are not hard coded.
@@ -329,12 +337,8 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                 switch (message.verb())
                 {
                     // these messages are not resilent to ephemeral issues
-                    case PREPARE_CONSISTENT_REQ:
-                    case PREPARE_CONSISTENT_RSP:
-                    case FINALIZE_PROPOSE_MSG:
-                    case FINALIZE_PROMISE_MSG:
-                    case FINALIZE_COMMIT_MSG:
-                    case FAILED_SESSION_MSG:
+                    case STATUS_REQ:
+                    case STATUS_RSP:
                         noFaults.add(message.id());
                         return Faults.NONE;
                     default:
@@ -357,7 +361,10 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
 
     static void assertSuccess(int example, boolean shouldSync, 
RepairCoordinator repair)
     {
-        
Assertions.assertThat(repair.state.getResult()).describedAs("Unexpected state: 
%s -> %s; example %d", repair.state, repair.state.getResult(), 
example).isEqualTo(Completable.Result.success(repairSuccessMessage(repair)));
+        Completable.Result result = repair.state.getResult();
+        Assertions.assertThat(result)
+                  .describedAs("Expected repair to have completed with 
success, but is still running... %s; example %d", repair.state, 
example).isNotNull()
+                  .describedAs("Unexpected state: %s -> %s; example %d", 
repair.state, result, 
example).isEqualTo(Completable.Result.success(repairSuccessMessage(repair)));
         
Assertions.assertThat(repair.state.getStateTimesMillis().keySet()).isEqualTo(EnumSet.allOf(CoordinatorState.State.class));
         Assertions.assertThat(repair.state.getSessions()).isNotEmpty();
         boolean shouldSnapshot = repair.state.options.getParallelism() != 
RepairParallelism.PARALLEL
@@ -643,6 +650,8 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
             orderedExecutor = 
globalExecutor.configureSequential("ignore").build();
             unorderedScheduled = globalExecutor.scheduled("ignored");
 
+
+
             // We run tests in an isolated JVM per class, so not cleaing up is 
safe... but if that assumption ever changes, will need to cleanup
             Stage.ANTI_ENTROPY.unsafeSetExecutor(orderedExecutor);
             Stage.INTERNAL_RESPONSE.unsafeSetExecutor(unorderedScheduled);
@@ -747,7 +756,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
             }
         }
 
-        private static class CallbackContext
+        private class CallbackContext
         {
             final RequestCallback callback;
 
@@ -803,6 +812,8 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                 CallbackContext cb;
                 if (callback != null)
                 {
+                    if (callbacks.containsKey(message.id()))
+                        throw new AssertionError("Message id " + message.id() 
+ " already has a callback");
                     cb = new CallbackContext(callback);
                     callbacks.put(message.id(), cb);
                 }
@@ -854,7 +865,14 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                             if (ctx != null)
                             {
                                 assert ctx == cb;
-                                ctx.onFailure(to, 
RequestFailureReason.TIMEOUT);
+                                try
+                                {
+                                    ctx.onFailure(to, 
RequestFailureReason.TIMEOUT);
+                                }
+                                catch (Throwable t)
+                                {
+                                    failures.add(t);
+                                }
                             }
                         }, message.verb().expiresAfterNanos(), 
TimeUnit.NANOSECONDS);
                     }
diff --git a/test/unit/org/apache/cassandra/repair/SlowMessageFuzzTest.java 
b/test/unit/org/apache/cassandra/repair/SlowMessageFuzzTest.java
index 00fb2817cb..03c151ec68 100644
--- a/test/unit/org/apache/cassandra/repair/SlowMessageFuzzTest.java
+++ b/test/unit/org/apache/cassandra/repair/SlowMessageFuzzTest.java
@@ -39,6 +39,7 @@ public class SlowMessageFuzzTest extends FuzzTestBase
         // to avoid unlucky timing issues, retry until success; given enough 
retries we should eventually become success
         DatabaseDescriptor.getRepairRetrySpec().maxAttempts = new 
RetrySpec.MaxAttempt(Integer.MAX_VALUE);
         qt().withPure(false).withExamples(10).check(rs -> {
+            cleanupRepairTables();
             Cluster cluster = new Cluster(rs);
             enableMessageFaults(cluster);
 
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index d10aa16cf8..1f91b99795 100644
--- 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.repair.consistent;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +33,7 @@ import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -44,8 +44,10 @@ import org.apache.cassandra.repair.CoordinatedRepairResult;
 import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePromise;
 import org.apache.cassandra.repair.messages.FinalizePropose;
 import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
@@ -82,7 +84,10 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
 
     static InstrumentedCoordinatorSession createInstrumentedSession()
     {
-        return new InstrumentedCoordinatorSession(createBuilder());
+        MockMessaging msg = new MockMessaging();
+        CoordinatorSession.Builder builder = createBuilder();
+        builder.withContext(SharedContext.Global.instance.withMessaging(msg));
+        return new InstrumentedCoordinatorSession(msg, builder);
     }
 
     private static RepairSessionResult createResult(CoordinatorSession 
coordinator)
@@ -99,20 +104,11 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
 
     private static class InstrumentedCoordinatorSession extends 
CoordinatorSession
     {
-        public InstrumentedCoordinatorSession(Builder builder)
+        private final Map<InetAddressAndPort, List<RepairMessage>> 
sentMessages;
+        public InstrumentedCoordinatorSession(MockMessaging messaging, Builder 
builder)
         {
             super(builder);
-        }
-
-        Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new 
HashMap<>();
-
-        protected void sendMessage(InetAddressAndPort destination, 
Message<RepairMessage> message)
-        {
-            if (!sentMessages.containsKey(destination))
-            {
-                sentMessages.put(destination, new ArrayList<>());
-            }
-            sentMessages.get(destination).add(message.payload);
+            this.sentMessages = messaging.sentMessages;
         }
 
         Runnable onSetRepairing = null;
@@ -242,17 +238,17 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
 
         // participants respond to coordinator, and repair begins once all 
participants have responded with success
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
-
-        coordinator.handlePrepareResponse(PARTICIPANT1, true);
+        
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT1, true)));
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
-
-        coordinator.handlePrepareResponse(PARTICIPANT2, true);
+        
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT2, true)));
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
 
         // set the setRepairing callback to verify the correct state when it's 
called
         Assert.assertFalse(coordinator.setRepairingCalled);
         coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED, 
coordinator.getState());
-        coordinator.handlePrepareResponse(PARTICIPANT3, true);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT3, true)));
         Assert.assertTrue(coordinator.setRepairingCalled);
         Assert.assertTrue(repairSubmitted.get());
 
@@ -276,16 +272,16 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         coordinator.sentMessages.clear();
         Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
 
-        coordinator.handleFinalizePromise(PARTICIPANT1, true);
+        
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new 
FinalizePromise(coordinator.sessionID, PARTICIPANT1, true)));
         Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
 
-        coordinator.handleFinalizePromise(PARTICIPANT2, true);
+        
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new 
FinalizePromise(coordinator.sessionID, PARTICIPANT2, true)));
         Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
 
         // set the finalizeCommit callback so we can verify the state when 
it's called
         Assert.assertFalse(coordinator.finalizeCommitCalled);
         coordinator.onFinalizeCommit = () -> 
Assert.assertEquals(FINALIZE_PROMISED, coordinator.getState());
-        coordinator.handleFinalizePromise(PARTICIPANT3, true);
+        
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new 
FinalizePromise(coordinator.sessionID, PARTICIPANT3, true)));
         Assert.assertTrue(coordinator.finalizeCommitCalled);
 
         Assert.assertEquals(ConsistentSession.State.FINALIZED, 
coordinator.getState());
@@ -324,16 +320,16 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         // participants respond to coordinator, and repair begins once all 
participants have responded with success
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
 
-        coordinator.handlePrepareResponse(PARTICIPANT1, true);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT1, true)));
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
 
-        coordinator.handlePrepareResponse(PARTICIPANT2, true);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT2, true)));
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
 
         // set the setRepairing callback to verify the correct state when it's 
called
         Assert.assertFalse(coordinator.setRepairingCalled);
         coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED, 
coordinator.getState());
-        coordinator.handlePrepareResponse(PARTICIPANT3, true);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT3, true)));
         Assert.assertTrue(coordinator.setRepairingCalled);
         Assert.assertTrue(repairSubmitted.get());
 
@@ -388,14 +384,14 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         // participants respond to coordinator, and repair begins once all 
participants have responded
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
 
-        coordinator.handlePrepareResponse(PARTICIPANT1, true);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT1, true)));
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
         Assert.assertEquals(PREPARED, 
coordinator.getParticipantState(PARTICIPANT1));
         Assert.assertFalse(sessionResult.isDone());
 
         // participant 2 fails to prepare for consistent repair
         Assert.assertFalse(coordinator.failCalled);
-        coordinator.handlePrepareResponse(PARTICIPANT2, false);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT2, false)));
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
         // we should have sent failure messages to the other participants, but 
not yet marked them failed internally
         assertMessageSent(coordinator, PARTICIPANT1, new 
FailSession(coordinator.sessionID));
@@ -411,7 +407,7 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         // last outstanding response should cause repair to complete in failed 
state
         Assert.assertFalse(coordinator.setRepairingCalled);
         coordinator.onSetRepairing = Assert::fail;
-        coordinator.handlePrepareResponse(PARTICIPANT3, true);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT3, true)));
         Assert.assertTrue(coordinator.failCalled);
         Assert.assertFalse(coordinator.setRepairingCalled);
         Assert.assertFalse(repairSubmitted.get());
@@ -453,16 +449,16 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         // participants respond to coordinator, and repair begins once all 
participants have responded with success
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
 
-        coordinator.handlePrepareResponse(PARTICIPANT1, true);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT1, true)));
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
 
-        coordinator.handlePrepareResponse(PARTICIPANT2, true);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT2, true)));
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
 
         // set the setRepairing callback to verify the correct state when it's 
called
         Assert.assertFalse(coordinator.setRepairingCalled);
         coordinator.onSetRepairing = () -> Assert.assertEquals(PREPARED, 
coordinator.getState());
-        coordinator.handlePrepareResponse(PARTICIPANT3, true);
+        
coordinator.handlePrepareResponse(Message.out(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(coordinator.sessionID, PARTICIPANT3, true)));
         Assert.assertTrue(coordinator.setRepairingCalled);
         Assert.assertTrue(repairSubmitted.get());
 
@@ -486,18 +482,18 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         coordinator.sentMessages.clear();
         Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
 
-        coordinator.handleFinalizePromise(PARTICIPANT1, true);
+        
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new 
FinalizePromise(coordinator.sessionID, PARTICIPANT1, true)));
         Assert.assertEquals(ConsistentSession.State.REPAIRING, 
coordinator.getState());
 
         Assert.assertFalse(coordinator.failCalled);
-        coordinator.handleFinalizePromise(PARTICIPANT2, false);
+        
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new 
FinalizePromise(coordinator.sessionID, PARTICIPANT2, false)));
         Assert.assertEquals(ConsistentSession.State.FAILED, 
coordinator.getState());
         Assert.assertTrue(coordinator.failCalled);
 
         // additional success messages should be ignored
         Assert.assertFalse(coordinator.finalizeCommitCalled);
         coordinator.onFinalizeCommit = Assert::fail;
-        coordinator.handleFinalizePromise(PARTICIPANT3, true);
+        
coordinator.handleFinalizePromise(Message.out(Verb.FINALIZE_PROMISE_MSG, new 
FinalizePromise(coordinator.sessionID, PARTICIPANT3, true)));
         Assert.assertFalse(coordinator.finalizeCommitCalled);
         Assert.assertEquals(ConsistentSession.State.FAILED, 
coordinator.getState());
 
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
index 9f0ee4635d..cd9dfba4ed 100644
--- 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
@@ -26,6 +26,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.repair.AbstractRepairTest;
@@ -60,21 +62,23 @@ public class CoordinatorSessionsTest extends 
AbstractRepairTest
         int prepareResponseCalls = 0;
         InetAddressAndPort preparePeer = null;
         boolean prepareSuccess = false;
-        public synchronized void handlePrepareResponse(InetAddressAndPort 
participant, boolean success)
+        @Override
+        public synchronized void 
handlePrepareResponse(Message<PrepareConsistentResponse> msg)
         {
             prepareResponseCalls++;
-            preparePeer = participant;
-            prepareSuccess = success;
+            preparePeer = msg.payload.participant;
+            prepareSuccess = msg.payload.success;
         }
 
         int finalizePromiseCalls = 0;
         InetAddressAndPort promisePeer = null;
         boolean promiseSuccess = false;
-        public synchronized void handleFinalizePromise(InetAddressAndPort 
participant, boolean success)
+        @Override
+        public synchronized void 
handleFinalizePromise(Message<FinalizePromise> message)
         {
             finalizePromiseCalls++;
-            promisePeer = participant;
-            promiseSuccess = success;
+            promisePeer = message.payload.participant;
+            promiseSuccess = message.payload.promised;
         }
 
         int failCalls = 0;
@@ -150,7 +154,7 @@ public class CoordinatorSessionsTest extends 
AbstractRepairTest
         InstrumentedCoordinatorSession session = 
sessions.registerSession(sessionID, PARTICIPANTS, false);
         Assert.assertEquals(0, session.prepareResponseCalls);
 
-        sessions.handlePrepareResponse(new 
PrepareConsistentResponse(sessionID, PARTICIPANT1, true));
+        
sessions.handlePrepareResponse(Message.builder(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(sessionID, PARTICIPANT1, true)).build());
         Assert.assertEquals(1, session.prepareResponseCalls);
         Assert.assertEquals(PARTICIPANT1, session.preparePeer);
         Assert.assertTrue(session.prepareSuccess);
@@ -162,7 +166,7 @@ public class CoordinatorSessionsTest extends 
AbstractRepairTest
         InstrumentedCoordinatorSessions sessions = new 
InstrumentedCoordinatorSessions();
         TimeUUID fakeID = nextTimeUUID();
 
-        sessions.handlePrepareResponse(new PrepareConsistentResponse(fakeID, 
PARTICIPANT1, true));
+        
sessions.handlePrepareResponse(Message.builder(Verb.PREPARE_CONSISTENT_RSP, new 
PrepareConsistentResponse(fakeID, PARTICIPANT1, true)).build());
         Assert.assertNull(sessions.getSession(fakeID));
     }
 
@@ -175,7 +179,7 @@ public class CoordinatorSessionsTest extends 
AbstractRepairTest
         InstrumentedCoordinatorSession session = 
sessions.registerSession(sessionID, PARTICIPANTS, false);
         Assert.assertEquals(0, session.finalizePromiseCalls);
 
-        sessions.handleFinalizePromiseMessage(new FinalizePromise(sessionID, 
PARTICIPANT1, true));
+        
sessions.handleFinalizePromiseMessage(Message.builder(Verb.FINALIZE_PROMISE_MSG,
 new FinalizePromise(sessionID, PARTICIPANT1, true)).build());
         Assert.assertEquals(1, session.finalizePromiseCalls);
         Assert.assertEquals(PARTICIPANT1, session.promisePeer);
         Assert.assertTrue(session.promiseSuccess);
@@ -187,7 +191,7 @@ public class CoordinatorSessionsTest extends 
AbstractRepairTest
         InstrumentedCoordinatorSessions sessions = new 
InstrumentedCoordinatorSessions();
         TimeUUID fakeID = nextTimeUUID();
 
-        sessions.handleFinalizePromiseMessage(new FinalizePromise(fakeID, 
PARTICIPANT1, true));
+        
sessions.handleFinalizePromiseMessage(Message.builder(Verb.FINALIZE_PROMISE_MSG,
 new FinalizePromise(fakeID, PARTICIPANT1, true)).build());
         Assert.assertNull(sessions.getSession(fakeID));
     }
 
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index 0f9912437e..e19f8abca9 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -39,6 +39,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -130,13 +131,20 @@ public class LocalSessionTest extends AbstractRepairTest
 
     static class InstrumentedLocalSessions extends LocalSessions
     {
-        Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new 
HashMap<>();
+        final Map<InetAddressAndPort, List<RepairMessage>> sentMessages;
 
         public InstrumentedLocalSessions()
         {
-            super(SharedContext.Global.instance);
+            this(new MockMessaging());
         }
 
+        private InstrumentedLocalSessions(MockMessaging messaging)
+        {
+            super(SharedContext.Global.instance.withMessaging(messaging));
+            sentMessages = messaging.sentMessages;
+        }
+
+        @Override
         protected void sendMessage(InetAddressAndPort destination, Message<? 
extends RepairMessage> message)
         {
             if (!sentMessages.containsKey(destination))
@@ -178,7 +186,7 @@ public class LocalSessionTest extends AbstractRepairTest
         public LocalSession prepareForTest(TimeUUID sessionID)
         {
             prepareSessionFuture = new AsyncPromise<>();
-            handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+            handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, 
new PrepareConsistentRequest(sessionID, COORDINATOR, 
PARTICIPANTS)).from(PARTICIPANT1).build());
             prepareSessionFuture.trySuccess(null);
             sentMessages.clear();
             return getSession(sessionID);
@@ -281,7 +289,7 @@ public class LocalSessionTest extends AbstractRepairTest
         // replacing future so we can inspect state before and after anti 
compaction callback
         sessions.prepareSessionFuture = new AsyncPromise<>();
         Assert.assertFalse(sessions.prepareSessionCalled);
-        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new 
PrepareConsistentRequest(sessionID, COORDINATOR, 
PARTICIPANTS)).from(PARTICIPANT1).build());
         Assert.assertTrue(sessions.prepareSessionCalled);
         Assert.assertTrue(sessions.sentMessages.isEmpty());
 
@@ -316,7 +324,7 @@ public class LocalSessionTest extends AbstractRepairTest
         // replacing future so we can inspect state before and after anti 
compaction callback
         sessions.prepareSessionFuture = new AsyncPromise<>();
         Assert.assertFalse(sessions.prepareSessionCalled);
-        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new 
PrepareConsistentRequest(sessionID, COORDINATOR, 
PARTICIPANTS)).from(PARTICIPANT1).build());
         Assert.assertTrue(sessions.prepareSessionCalled);
         Assert.assertTrue(sessions.sentMessages.isEmpty());
 
@@ -348,7 +356,7 @@ public class LocalSessionTest extends AbstractRepairTest
     {
         TimeUUID sessionID = nextTimeUUID();
         InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
-        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new 
PrepareConsistentRequest(sessionID, COORDINATOR, 
PARTICIPANTS)).from(PARTICIPANT1).build());
         Assert.assertNull(sessions.getSession(sessionID));
         assertMessagesSent(sessions, COORDINATOR, new 
PrepareConsistentResponse(sessionID, PARTICIPANT1, false));
     }
@@ -372,7 +380,7 @@ public class LocalSessionTest extends AbstractRepairTest
         };
         sessions.start();
 
-        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new 
PrepareConsistentRequest(sessionID, COORDINATOR, 
PARTICIPANTS)).from(PARTICIPANT1).build());
 
         BooleanSupplier isCancelled = isCancelledRef.get();
         Assert.assertNotNull(isCancelled);
@@ -470,7 +478,7 @@ public class LocalSessionTest extends AbstractRepairTest
 
         // should send a promised message to coordinator and set session state 
accordingly
         sessions.sentMessages.clear();
-        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(sessionID));
+        
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
 new FinalizePropose(sessionID)).from(COORDINATOR).build());
         Assert.assertEquals(FINALIZE_PROMISED, session.getState());
         Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
         assertMessagesSent(sessions, COORDINATOR, new 
FinalizePromise(sessionID, PARTICIPANT1, true));
@@ -492,7 +500,7 @@ public class LocalSessionTest extends AbstractRepairTest
 
         // should fail the session and send a failure message to the 
coordinator
         sessions.sentMessages.clear();
-        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(sessionID));
+        
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
 new FinalizePropose(sessionID)).from(COORDINATOR).build());
         Assert.assertEquals(FAILED, session.getState());
         Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
         assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID));
@@ -503,7 +511,7 @@ public class LocalSessionTest extends AbstractRepairTest
     {
         InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
         TimeUUID fakeID = nextTimeUUID();
-        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(fakeID));
+        
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
 new FinalizePropose(fakeID)).from(COORDINATOR).build());
         Assert.assertNull(sessions.getSession(fakeID));
         assertMessagesSent(sessions, COORDINATOR, new FailSession(fakeID));
     }
@@ -522,12 +530,12 @@ public class LocalSessionTest extends AbstractRepairTest
         // create session and move to finalized promised
         sessions.prepareForTest(sessionID);
         sessions.maybeSetRepairing(sessionID);
-        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(sessionID));
+        
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
 new FinalizePropose(sessionID)).from(COORDINATOR).build());
 
         Assert.assertEquals(0, (int) 
sessions.completedSessions.getOrDefault(sessionID, 0));
         sessions.sentMessages.clear();
         LocalSession session = sessions.getSession(sessionID);
-        sessions.handleFinalizeCommitMessage(PARTICIPANT1, new 
FinalizeCommit(sessionID));
+        
sessions.handleFinalizeCommitMessage(Message.builder(Verb.FINALIZE_COMMIT_MSG, 
new FinalizeCommit(sessionID)).from(PARTICIPANT1).build());
 
         Assert.assertEquals(FINALIZED, session.getState());
         Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
@@ -541,7 +549,7 @@ public class LocalSessionTest extends AbstractRepairTest
         InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
         sessions.start();
         TimeUUID fakeID = nextTimeUUID();
-        sessions.handleFinalizeCommitMessage(PARTICIPANT1, new 
FinalizeCommit(fakeID));
+        
sessions.handleFinalizeCommitMessage(Message.builder(Verb.FINALIZE_COMMIT_MSG, 
new FinalizeCommit(fakeID)).from(PARTICIPANT1).build());
         Assert.assertNull(sessions.getSession(fakeID));
         Assert.assertTrue(sessions.sentMessages.isEmpty());
     }
@@ -717,7 +725,7 @@ public class LocalSessionTest extends AbstractRepairTest
         InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
         sessions.start();
         sessions.prepareSessionFuture = new AsyncPromise<>();  // prevent 
moving to prepared
-        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new 
PrepareConsistentRequest(sessionID, COORDINATOR, 
PARTICIPANTS)).from(PARTICIPANT1).build());
 
         LocalSession session = sessions.getSession(sessionID);
         Assert.assertNotNull(session);
@@ -744,7 +752,7 @@ public class LocalSessionTest extends AbstractRepairTest
         InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
         sessions.start();
         sessions.prepareSessionFuture = new AsyncPromise<>();
-        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+        
sessions.handlePrepareMessage(Message.builder(Verb.PREPARE_CONSISTENT_REQ, new 
PrepareConsistentRequest(sessionID, COORDINATOR, 
PARTICIPANTS)).from(PARTICIPANT1).build());
         sessions.prepareSessionFuture.trySuccess(null);
 
         Assert.assertTrue(sessions.isSessionInProgress(sessionID));
@@ -770,8 +778,8 @@ public class LocalSessionTest extends AbstractRepairTest
 
         sessions.prepareForTest(sessionID);
         sessions.maybeSetRepairing(sessionID);
-        sessions.handleFinalizeProposeMessage(COORDINATOR, new 
FinalizePropose(sessionID));
-        sessions.handleFinalizeCommitMessage(PARTICIPANT1, new 
FinalizeCommit(sessionID));
+        
sessions.handleFinalizeProposeMessage(Message.builder(Verb.FINALIZE_PROPOSE_MSG,
 new FinalizePropose(sessionID)).from(COORDINATOR).build());
+        
sessions.handleFinalizeCommitMessage(Message.builder(Verb.FINALIZE_COMMIT_MSG, 
new FinalizeCommit(sessionID)).from(PARTICIPANT1).build());
 
         LocalSession session = sessions.getSession(sessionID);
         Assert.assertTrue(session.repairedAt != 
ActiveRepairService.UNREPAIRED_SSTABLE);
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/MockMessaging.java 
b/test/unit/org/apache/cassandra/repair/consistent/MockMessaging.java
new file mode 100644
index 0000000000..5a59ef2639
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/MockMessaging.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.ConnectionType;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.utils.concurrent.Future;
+
+class MockMessaging implements MessageDelivery
+{
+    Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new 
HashMap<>();
+    Map<InetAddressAndPort, Integer> acks = new HashMap<>();
+    Map<InetAddressAndPort, Integer> failures = new HashMap<>();
+
+    @Override
+    public <REQ> void send(Message<REQ> message, InetAddressAndPort 
destination)
+    {
+        if (message.verb() == Verb.REPAIR_RSP && message.payload instanceof 
NoPayload)
+        {
+            acks.compute(destination, (ignore, accum) -> accum == null ? 1 : 
accum + 1);
+            return;
+        }
+        if (message.verb() == Verb.FAILURE_RSP)
+        {
+            failures.compute(destination, (ignore, accum) -> accum ==  null ? 
1 : accum + 1);
+            return;
+        }
+        if (!(message.payload instanceof RepairMessage))
+            throw new AssertionError("Unexpected message: " + message);
+
+        if (!sentMessages.containsKey(destination))
+        {
+            sentMessages.put(destination, new ArrayList<>());
+        }
+        sentMessages.get(destination).add((RepairMessage) message.payload);
+    }
+
+    @Override
+    public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb)
+    {
+        send(message, to);
+    }
+
+    @Override
+    public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType 
specifyConnection)
+    {
+        send(message, to);
+    }
+
+    @Override
+    public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> 
message, InetAddressAndPort to)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <V> void respond(V response, Message<?> message)
+    {
+        throw new UnsupportedOperationException();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to