This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new ca0b77d743 Repair fuzz tests fail with paxos_variant: v2
ca0b77d743 is described below

commit ca0b77d7434aa75528e0cb625889825d29c5f122
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Mon Mar 25 14:40:09 2024 -0700

    Repair fuzz tests fail with paxos_variant: v2
    
    patch by David Capwell; reviewed by Blake Eggleston, Ekaterina Dimitrova 
for CASSANDRA-19042
---
 src/java/org/apache/cassandra/gms/Gossiper.java    |   2 +
 src/java/org/apache/cassandra/gms/IGossiper.java   |   3 +
 .../org/apache/cassandra/net/MessageDelivery.java  |   5 +
 .../org/apache/cassandra/net/MessagingService.java |   5 -
 .../org/apache/cassandra/repair/RepairJob.java     |   2 +-
 .../org/apache/cassandra/repair/SharedContext.java |  54 ++++++++
 .../cassandra/service/ActiveRepairService.java     |   2 +-
 .../apache/cassandra/service/StorageService.java   |   7 +-
 .../org/apache/cassandra/service/paxos/Paxos.java  |  12 +-
 .../cassandra/service/paxos/PaxosRepair.java       |  43 ++----
 .../service/paxos/cleanup/PaxosCleanup.java        |  35 +++--
 .../paxos/cleanup/PaxosCleanupComplete.java        |  28 ++--
 .../cleanup/PaxosCleanupLocalCoordinator.java      |  22 +--
 .../service/paxos/cleanup/PaxosCleanupRequest.java |  63 +++++----
 .../paxos/cleanup/PaxosCleanupResponse.java        |   8 +-
 .../service/paxos/cleanup/PaxosCleanupSession.java |  54 +++-----
 .../paxos/cleanup/PaxosFinishPrepareCleanup.java   | 112 ++-------------
 ...shPrepareCleanup.java => PaxosRepairState.java} | 138 ++++++++++++-------
 .../paxos/cleanup/PaxosStartPrepareCleanup.java    |  41 +++---
 .../service/paxos/cleanup/PaxosTableRepairs.java   |  24 ----
 .../paxos/uncommitted/PaxosUncommittedTracker.java |   4 +-
 .../distributed/test/PaxosRepairTest.java          |   7 +-
 .../org/apache/cassandra/repair/FuzzTestBase.java  | 152 +++++++++++++++++++--
 .../org/apache/cassandra/repair/RepairJobTest.java |   4 +-
 24 files changed, 468 insertions(+), 359 deletions(-)

diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index adf465de21..d907f76686 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1328,6 +1328,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean,
         return ep1.getHeartBeatState().getGeneration() - 
ep2.getHeartBeatState().getGeneration();
     }
 
+    @Override
     public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> 
remoteEpStateMap)
     {
         for (Entry<InetAddressAndPort, EndpointState> entry : 
remoteEpStateMap.entrySet())
@@ -1624,6 +1625,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean,
     }
 
     @VisibleForTesting
+    @Override
     public void applyStateLocally(Map<InetAddressAndPort, EndpointState> 
epStateMap)
     {
         checkProperThreadForStateMutation();
diff --git a/src/java/org/apache/cassandra/gms/IGossiper.java 
b/src/java/org/apache/cassandra/gms/IGossiper.java
index 0e33526d22..aa9d95a97d 100644
--- a/src/java/org/apache/cassandra/gms/IGossiper.java
+++ b/src/java/org/apache/cassandra/gms/IGossiper.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.gms;
 
+import java.util.Map;
 import javax.annotation.Nullable;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -30,6 +31,8 @@ public interface IGossiper
 
     @Nullable
     EndpointState getEndpointStateForEndpoint(InetAddressAndPort ep);
+    void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> 
remoteEpStateMap);
+    void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap);
     @Nullable
     default CassandraVersion getReleaseVersion(InetAddressAndPort ep)
     {
diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java 
b/src/java/org/apache/cassandra/net/MessageDelivery.java
index dd8c6ceeda..36001c4988 100644
--- a/src/java/org/apache/cassandra/net/MessageDelivery.java
+++ b/src/java/org/apache/cassandra/net/MessageDelivery.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.net;
 
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.concurrent.Future;
 
@@ -28,4 +29,8 @@ public interface MessageDelivery
     public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType 
specifyConnection);
     public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> 
message, InetAddressAndPort to);
     public <V> void respond(V response, Message<?> message);
+    public default void respondWithFailure(RequestFailureReason reason, 
Message<?> message)
+    {
+        send(Message.failureResponse(message.id(), message.expiresAtNanos(), 
reason), message.respondTo());
+    }
 }
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 903ef977b2..94586b41c8 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -449,11 +449,6 @@ public class MessagingService extends 
MessagingServiceMBeanImpl implements Messa
         send(message.responseWith(response), message.respondTo());
     }
 
-    public <V> void respondWithFailure(RequestFailureReason reason, Message<?> 
message)
-    {
-        send(Message.failureResponse(message.id(), message.expiresAtNanos(), 
reason), message.respondTo());
-    }
-
     public void send(Message message, InetAddressAndPort to, ConnectionType 
specifyConnection)
     {
         if (logger.isTraceEnabled())
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index c9966c5a05..c54336a6b3 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -129,7 +129,7 @@ public class RepairJob extends AsyncFuture<RepairResult> 
implements Runnable
         {
             logger.info("{} {}.{} starting paxos repair", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
             TableMetadata metadata = 
Schema.instance.getTableMetadata(desc.keyspace, desc.columnFamily);
-            paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, 
desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
+            paxosRepair = PaxosCleanup.cleanup(ctx, allEndpoints, metadata, 
desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
         }
         else
         {
diff --git a/src/java/org/apache/cassandra/repair/SharedContext.java 
b/src/java/org/apache/cassandra/repair/SharedContext.java
index 8ccc88f584..440da2cf45 100644
--- a/src/java/org/apache/cassandra/repair/SharedContext.java
+++ b/src/java/org/apache/cassandra/repair/SharedContext.java
@@ -37,6 +37,8 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageDelivery;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
@@ -57,6 +59,8 @@ public interface SharedContext
     ExecutorFactory executorFactory();
     MBeanWrapper mbean();
     ScheduledExecutorPlus optionalTasks();
+    ScheduledExecutorPlus nonPeriodicTasks();
+    ScheduledExecutorPlus scheduledTasks();
 
     MessageDelivery messaging();
     default SharedContext withMessaging(MessageDelivery messaging)
@@ -77,6 +81,8 @@ public interface SharedContext
     IValidationManager validationManager();
     TableRepairManager repairManager(ColumnFamilyStore store);
     StreamExecutor streamExecutor();
+    PendingRangeCalculatorService pendingRangeCalculator();
+    PaxosRepairState paxosRepairState();
 
     class Global implements SharedContext
     {
@@ -118,6 +124,18 @@ public interface SharedContext
             return ScheduledExecutors.optionalTasks;
         }
 
+        @Override
+        public ScheduledExecutorPlus nonPeriodicTasks()
+        {
+            return ScheduledExecutors.nonPeriodicTasks;
+        }
+
+        @Override
+        public ScheduledExecutorPlus scheduledTasks()
+        {
+            return ScheduledExecutors.scheduledTasks;
+        }
+
         @Override
         public MessageDelivery messaging()
         {
@@ -171,6 +189,18 @@ public interface SharedContext
         {
             return StreamPlan::execute;
         }
+
+        @Override
+        public PendingRangeCalculatorService pendingRangeCalculator()
+        {
+            return PendingRangeCalculatorService.instance;
+        }
+
+        @Override
+        public PaxosRepairState paxosRepairState()
+        {
+            return PaxosRepairState.instance();
+        }
     }
 
     class ForwardingSharedContext implements SharedContext
@@ -223,6 +253,18 @@ public interface SharedContext
             return delegate().optionalTasks();
         }
 
+        @Override
+        public ScheduledExecutorPlus nonPeriodicTasks()
+        {
+            return delegate().nonPeriodicTasks();
+        }
+
+        @Override
+        public ScheduledExecutorPlus scheduledTasks()
+        {
+            return delegate().scheduledTasks();
+        }
+
         @Override
         public MessageDelivery messaging()
         {
@@ -276,5 +318,17 @@ public interface SharedContext
         {
             return delegate().streamExecutor();
         }
+
+        @Override
+        public PendingRangeCalculatorService pendingRangeCalculator()
+        {
+            return delegate().pendingRangeCalculator();
+        }
+
+        @Override
+        public PaxosRepairState paxosRepairState()
+        {
+            return delegate().paxosRepairState();
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 7324b968a4..e120122c08 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -1136,7 +1136,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                                                              range, 
table.keyspace, table.name, pending, 
PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getKey()));
 
                 }
-                Future<Void> future = PaxosCleanup.cleanup(endpoints, table, 
Collections.singleton(range), false, repairCommandExecutor());
+                Future<Void> future = PaxosCleanup.cleanup(ctx, endpoints, 
table, Collections.singleton(range), false, repairCommandExecutor());
                 futures.add(future);
             }
         }
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 78be739301..cbdadee1c3 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -180,6 +180,7 @@ import org.apache.cassandra.net.AsyncOneResponse;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairCoordinator;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -199,7 +200,7 @@ import org.apache.cassandra.service.paxos.PaxosCommit;
 import org.apache.cassandra.service.paxos.PaxosRepair;
 import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator;
-import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
+import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 import org.apache.cassandra.service.snapshot.SnapshotManager;
 import org.apache.cassandra.service.snapshot.TableSnapshot;
 import org.apache.cassandra.streaming.StreamManager;
@@ -4863,7 +4864,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             return ImmediateFuture.success(null);
 
         List<Range<Token>> ranges = getLocalAndPendingRanges(table.keyspace);
-        PaxosCleanupLocalCoordinator coordinator = 
PaxosCleanupLocalCoordinator.createForAutoRepair(tableId, ranges);
+        PaxosCleanupLocalCoordinator coordinator = 
PaxosCleanupLocalCoordinator.createForAutoRepair(SharedContext.Global.instance, 
tableId, ranges);
         ScheduledExecutors.optionalTasks.submit(coordinator::start);
         return coordinator;
     }
@@ -7349,7 +7350,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     public void clearPaxosRepairs()
     {
         logger.info("StorageService#clearPaxosRepairs called via jmx");
-        PaxosTableRepairs.clearRepairs();
+        PaxosRepairState.instance().clearRepairs();
     }
 
     public void setSkipPaxosRepairCompatibilityCheck(boolean v)
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java 
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 3b87ffb545..473b5741ad 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import javax.annotation.Nullable;
@@ -85,9 +86,9 @@ import org.apache.cassandra.service.CASRequest;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.FailureRecordingCallback.AsMap;
 import org.apache.cassandra.service.paxos.Commit.Proposal;
+import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 import org.apache.cassandra.service.reads.DataResolver;
 import org.apache.cassandra.service.reads.repair.NoopReadRepair;
-import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.CassandraVersion;
@@ -385,13 +386,18 @@ public class Paxos
         }
 
         static Participants get(TableMetadata table, Token token, 
ConsistencyLevel consistencyForConsensus)
+        {
+            return get(table, token, consistencyForConsensus, 
FailureDetector.isReplicaAlive);
+        }
+
+        static Participants get(TableMetadata table, Token token, 
ConsistencyLevel consistencyForConsensus, Predicate<Replica> isReplicaAlive)
         {
             Keyspace keyspace = Keyspace.open(table.keyspace);
             ReplicaLayout.ForTokenWrite all = 
forTokenWriteLiveAndDown(keyspace, token);
             ReplicaLayout.ForTokenWrite electorate = 
consistencyForConsensus.isDatacenterLocal()
                                                      ? 
all.filter(InOurDc.replicas()) : all;
 
-            EndpointsForToken live = 
all.all().filter(FailureDetector.isReplicaAlive);
+            EndpointsForToken live = all.all().filter(isReplicaAlive);
 
             return new Participants(keyspace, consistencyForConsensus, all, 
electorate, live);
         }
@@ -1255,6 +1261,6 @@ public class Paxos
 
     public static void evictHungRepairs()
     {
-        PaxosTableRepairs.evictHungRepairs();
+        PaxosRepairState.instance().evictHungRepairs();
     }
 }
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
index 45a3664731..ae5bc557c7 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
@@ -42,10 +42,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.gms.IGossiper;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -55,6 +52,7 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -659,45 +657,24 @@ public class PaxosRepair extends AbstractPaxosRepair
         return (version.major == 4 && version.minor > 0) || version.major > 4;
     }
 
-    static String getPeerVersion(InetAddressAndPort peer)
+    static boolean validatePeerCompatibility(IGossiper gossiper, Replica peer)
     {
-        EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(peer);
-        if (epState == null)
-            return null;
-
-        VersionedValue value = 
epState.getApplicationState(ApplicationState.RELEASE_VERSION);
-        if (value == null)
-            return null;
-
-        try
-        {
-            return value.value;
-        }
-        catch (IllegalArgumentException e)
-        {
-            return null;
-        }
-    }
-
-    static boolean validatePeerCompatibility(Replica peer)
-    {
-        String versionString = getPeerVersion(peer.endpoint());
-        CassandraVersion version = versionString != null ? new 
CassandraVersion(versionString) : null;
+        CassandraVersion version = gossiper.getReleaseVersion(peer.endpoint());
         boolean result = validateVersionCompatibility(version);
         if (!result)
-            logger.info("PaxosRepair isn't supported by {} on version {}", 
peer, versionString);
+            logger.info("PaxosRepair isn't supported by {} on version {}", 
peer, version);
         return result;
     }
 
-    static boolean validatePeerCompatibility(TableMetadata table, Range<Token> 
range)
+    static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata 
table, Range<Token> range)
     {
-        Participants participants = Participants.get(table, range.right, 
ConsistencyLevel.SERIAL);
-        return Iterables.all(participants.all, 
PaxosRepair::validatePeerCompatibility);
+        Participants participants = Participants.get(table, range.right, 
ConsistencyLevel.SERIAL, r -> ctx.failureDetector().isAlive(r.endpoint()));
+        return Iterables.all(participants.all, r -> 
validatePeerCompatibility(ctx.gossiper(), r));
     }
 
-    public static boolean validatePeerCompatibility(TableMetadata table, 
Collection<Range<Token>> ranges)
+    public static boolean validatePeerCompatibility(SharedContext ctx, 
TableMetadata table, Collection<Range<Token>> ranges)
     {
-        return Iterables.all(ranges, range -> validatePeerCompatibility(table, 
range));
+        return Iterables.all(ranges, range -> validatePeerCompatibility(ctx, 
table, range));
     }
 
     public static void shutdownAndWait(long timeout, TimeUnit units) throws 
InterruptedException, TimeoutException
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
index 6eb1ebd574..7b4163e2a2 100644
--- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
+++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
@@ -29,32 +29,30 @@ import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Ballot;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getCasContentionTimeout;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout;
-import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
 public class PaxosCleanup extends AsyncFuture<Void> implements Runnable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PaxosCleanup.class);
 
+    private final SharedContext ctx;
     private final Collection<InetAddressAndPort> endpoints;
     private final TableMetadata table;
     private final Collection<Range<Token>> ranges;
@@ -67,8 +65,9 @@ public class PaxosCleanup extends AsyncFuture<Void> 
implements Runnable
     private PaxosCleanupSession session;
     private PaxosCleanupComplete complete;
 
-    public PaxosCleanup(Collection<InetAddressAndPort> endpoints, 
TableMetadata table, Collection<Range<Token>> ranges, boolean skippedReplicas, 
Executor executor)
+    public PaxosCleanup(SharedContext ctx, Collection<InetAddressAndPort> 
endpoints, TableMetadata table, Collection<Range<Token>> ranges, boolean 
skippedReplicas, Executor executor)
     {
+        this.ctx = ctx;
         this.endpoints = endpoints;
         this.table = table;
         this.ranges = ranges;
@@ -81,51 +80,51 @@ public class PaxosCleanup extends AsyncFuture<Void> 
implements Runnable
         future.addCallback(onComplete, this::tryFailure);
     }
 
-    public static PaxosCleanup cleanup(Collection<InetAddressAndPort> 
endpoints, TableMetadata table, Collection<Range<Token>> ranges, boolean 
skippedReplicas, Executor executor)
+    public static PaxosCleanup cleanup(SharedContext ctx, 
Collection<InetAddressAndPort> endpoints, TableMetadata table, 
Collection<Range<Token>> ranges, boolean skippedReplicas, Executor executor)
     {
-        PaxosCleanup cleanup = new PaxosCleanup(endpoints, table, ranges, 
skippedReplicas, executor);
+        PaxosCleanup cleanup = new PaxosCleanup(ctx, endpoints, table, ranges, 
skippedReplicas, executor);
         executor.execute(cleanup);
         return cleanup;
     }
 
     public void run()
     {
-        EndpointState localEpState = 
Gossiper.instance.getEndpointStateForEndpoint(getBroadcastAddressAndPort());
-        startPrepare = PaxosStartPrepareCleanup.prepare(table.id, endpoints, 
localEpState, ranges);
+        EndpointState localEpState = 
ctx.gossiper().getEndpointStateForEndpoint(ctx.broadcastAddressAndPort());
+        startPrepare = PaxosStartPrepareCleanup.prepare(ctx, table.id, 
endpoints, localEpState, ranges);
         addCallback(startPrepare, this::finishPrepare);
     }
 
     private void finishPrepare(PaxosCleanupHistory result)
     {
-        ScheduledExecutors.nonPeriodicTasks.schedule(() -> {
-            finishPrepare = PaxosFinishPrepareCleanup.finish(endpoints, 
result);
+        ctx.nonPeriodicTasks().schedule(() -> {
+            finishPrepare = PaxosFinishPrepareCleanup.finish(ctx, endpoints, 
result);
             addCallback(finishPrepare, (v) -> startSession(result.highBound));
         }, Math.min(getCasContentionTimeout(MILLISECONDS), 
getWriteRpcTimeout(MILLISECONDS)), MILLISECONDS);
     }
 
     private void startSession(Ballot lowBound)
     {
-        session = new PaxosCleanupSession(endpoints, table.id, ranges);
+        session = new PaxosCleanupSession(ctx, endpoints, table.id, ranges);
         addCallback(session, (v) -> finish(lowBound));
         executor.execute(session);
     }
 
     private void finish(Ballot lowBound)
     {
-        complete = new PaxosCleanupComplete(endpoints, table.id, ranges, 
lowBound, skippedReplicas);
+        complete = new PaxosCleanupComplete(ctx, endpoints, table.id, ranges, 
lowBound, skippedReplicas);
         addCallback(complete, this::trySuccess);
         executor.execute(complete);
     }
 
-    private static boolean isOutOfRange(String ksName, 
Collection<Range<Token>> repairRanges)
+    private static boolean isOutOfRange(SharedContext ctx, String ksName, 
Collection<Range<Token>> repairRanges)
     {
         Keyspace keyspace = Keyspace.open(ksName);
         List<Range<Token>> localRanges = 
Range.normalize(keyspace.getReplicationStrategy()
                                                                  
.getAddressReplicas()
-                                                                 
.get(FBUtilities.getBroadcastAddressAndPort())
+                                                                 
.get(ctx.broadcastAddressAndPort())
                                                                  .ranges());
 
-        RangesAtEndpoint pendingRanges = 
StorageService.instance.getTokenMetadata().getPendingRanges(ksName, 
FBUtilities.getBroadcastAddressAndPort());
+        RangesAtEndpoint pendingRanges = 
StorageService.instance.getTokenMetadata().getPendingRanges(ksName, 
ctx.broadcastAddressAndPort());
         if (!pendingRanges.isEmpty())
         {
             localRanges.addAll(pendingRanges.ranges());
@@ -140,14 +139,14 @@ public class PaxosCleanup extends AsyncFuture<Void> 
implements Runnable
         return false;
     }
 
-    static boolean isInRangeAndShouldProcess(Collection<Range<Token>> ranges, 
TableId tableId)
+    static boolean isInRangeAndShouldProcess(SharedContext ctx, 
Collection<Range<Token>> ranges, TableId tableId)
     {
         TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
 
         Keyspace keyspace = Keyspace.open(metadata.keyspace);
         Preconditions.checkNotNull(keyspace);
 
-        if (!isOutOfRange(metadata.keyspace, ranges))
+        if (!isOutOfRange(ctx, metadata.keyspace, ranges))
             return true;
 
         logger.warn("Out of range PaxosCleanup request for {}: {}", metadata, 
ranges);
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java
index 0196e9cce0..8742af84e0 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java
@@ -31,7 +31,10 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.paxos.Ballot;
@@ -48,9 +51,11 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> 
implements RequestCa
     final Collection<Range<Token>> ranges;
     final Ballot lowBound;
     final boolean skippedReplicas;
+    private final SharedContext ctx;
 
-    PaxosCleanupComplete(Collection<InetAddressAndPort> endpoints, TableId 
tableId, Collection<Range<Token>> ranges, Ballot lowBound, boolean 
skippedReplicas)
+    PaxosCleanupComplete(SharedContext ctx, Collection<InetAddressAndPort> 
endpoints, TableId tableId, Collection<Range<Token>> ranges, Ballot lowBound, 
boolean skippedReplicas)
     {
+        this.ctx = ctx;
         this.waitingResponse = new HashSet<>(endpoints);
         this.tableId = tableId;
         this.ranges = ranges;
@@ -64,7 +69,7 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> 
implements RequestCa
                                            : new Request(tableId, 
Ballot.none(), Collections.emptyList());
         Message<Request> message = Message.out(PAXOS2_CLEANUP_COMPLETE_REQ, 
request);
         for (InetAddressAndPort endpoint : waitingResponse)
-            MessagingService.instance().sendWithCallback(message, endpoint, 
this);
+            ctx.messaging().sendWithCallback(message, endpoint, this);
     }
 
     @Override
@@ -86,7 +91,7 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> 
implements RequestCa
             trySuccess(null);
     }
 
-    static class Request
+    public static class Request
     {
         final TableId tableId;
         final Ballot lowBound;
@@ -136,9 +141,14 @@ public class PaxosCleanupComplete extends 
AsyncFuture<Void> implements RequestCa
         }
     };
 
-    public static final IVerbHandler<Request> verbHandler = (in) -> {
-        ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId);
-        cfs.onPaxosRepairComplete(in.payload.ranges, in.payload.lowBound);
-        MessagingService.instance().respond(noPayload, in);
-    };
+    public static IVerbHandler<Request> createVerbHandler(SharedContext ctx)
+    {
+        return (in) -> {
+            ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId);
+            cfs.onPaxosRepairComplete(in.payload.ranges, in.payload.lowBound);
+            ctx.messaging().respond(noPayload, in);
+        };
+    }
+
+    public static final IVerbHandler<Request> verbHandler = 
createVerbHandler(SharedContext.Global.instance);
 }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java
 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java
index 3378714a0c..14c970b7db 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -43,7 +44,6 @@ import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 
 import static 
org.apache.cassandra.service.paxos.cleanup.PaxosCleanupSession.TIMEOUT_NANOS;
-import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 public class PaxosCleanupLocalCoordinator extends 
AsyncFuture<PaxosCleanupResponse>
 {
@@ -55,21 +55,23 @@ public class PaxosCleanupLocalCoordinator extends 
AsyncFuture<PaxosCleanupRespon
     private final TableMetadata table;
     private final Collection<Range<Token>> ranges;
     private final CloseableIterator<UncommittedPaxosKey> uncommittedIter;
+    private final SharedContext ctx;
     private int count = 0;
     private final long deadline;
 
     private final Map<DecoratedKey, AbstractPaxosRepair> inflight = new 
ConcurrentHashMap<>();
     private final PaxosTableRepairs tableRepairs;
 
-    private PaxosCleanupLocalCoordinator(UUID session, TableId tableId, 
Collection<Range<Token>> ranges, CloseableIterator<UncommittedPaxosKey> 
uncommittedIter)
+    private PaxosCleanupLocalCoordinator(SharedContext ctx, UUID session, 
TableId tableId, Collection<Range<Token>> ranges, 
CloseableIterator<UncommittedPaxosKey> uncommittedIter)
     {
+        this.ctx = ctx;
         this.session = session;
         this.tableId = tableId;
         this.table = Schema.instance.getTableMetadata(tableId);
         this.ranges = ranges;
         this.uncommittedIter = uncommittedIter;
-        this.tableRepairs = PaxosTableRepairs.getForTable(tableId);
-        this.deadline = TIMEOUT_NANOS + nanoTime();
+        this.tableRepairs = ctx.paxosRepairState().getForTable(tableId);
+        this.deadline = TIMEOUT_NANOS + ctx.clock().nanoTime();
     }
 
     public synchronized void start()
@@ -80,7 +82,7 @@ public class PaxosCleanupLocalCoordinator extends 
AsyncFuture<PaxosCleanupRespon
             return;
         }
 
-        if (!PaxosRepair.validatePeerCompatibility(table, ranges))
+        if (!PaxosRepair.validatePeerCompatibility(ctx, table, ranges))
         {
             fail("Unsupported peer versions for " + tableId + ' ' + 
ranges.toString());
             return;
@@ -91,16 +93,16 @@ public class PaxosCleanupLocalCoordinator extends 
AsyncFuture<PaxosCleanupRespon
         scheduleKeyRepairsOrFinish();
     }
 
-    public static PaxosCleanupLocalCoordinator create(PaxosCleanupRequest 
request)
+    public static PaxosCleanupLocalCoordinator create(SharedContext ctx, 
PaxosCleanupRequest request)
     {
         CloseableIterator<UncommittedPaxosKey> iterator = 
PaxosState.uncommittedTracker().uncommittedKeyIterator(request.tableId, 
request.ranges);
-        return new PaxosCleanupLocalCoordinator(request.session, 
request.tableId, request.ranges, iterator);
+        return new PaxosCleanupLocalCoordinator(ctx, request.session, 
request.tableId, request.ranges, iterator);
     }
 
-    public static PaxosCleanupLocalCoordinator createForAutoRepair(TableId 
tableId, Collection<Range<Token>> ranges)
+    public static PaxosCleanupLocalCoordinator 
createForAutoRepair(SharedContext ctx, TableId tableId, 
Collection<Range<Token>> ranges)
     {
         CloseableIterator<UncommittedPaxosKey> iterator = 
PaxosState.uncommittedTracker().uncommittedKeyIterator(tableId, ranges);
-        return new PaxosCleanupLocalCoordinator(INTERNAL_SESSION, tableId, 
ranges, iterator);
+        return new PaxosCleanupLocalCoordinator(ctx, INTERNAL_SESSION, 
tableId, ranges, iterator);
     }
 
     /**
@@ -113,7 +115,7 @@ public class PaxosCleanupLocalCoordinator extends 
AsyncFuture<PaxosCleanupRespon
         Preconditions.checkArgument(parallelism > 0);
         if (inflight.size() < parallelism)
         {
-            if (nanoTime() - deadline >= 0)
+            if (ctx.clock().nanoTime() - deadline >= 0)
             {
                 fail("timeout");
                 return;
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
index 4db457f4af..2dbbc58d69 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
@@ -38,10 +38,10 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.UUIDSerializer;
 
-import static org.apache.cassandra.net.MessagingService.instance;
 import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.net.Verb.PAXOS2_CLEANUP_RSP2;
 
@@ -68,40 +68,45 @@ public class PaxosCleanupRequest
         this.ranges = rangesOrMin(ranges);
     }
 
-    public static final IVerbHandler<PaxosCleanupRequest> verbHandler = in -> {
-        PaxosCleanupRequest request = in.payload;
-
-        if (!PaxosCleanup.isInRangeAndShouldProcess(request.ranges, 
request.tableId))
-        {
-            String msg = String.format("Rejecting cleanup request %s from %s. 
Some ranges are not replicated (%s)",
-                                       request.session, in.from(), 
request.ranges);
-            Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
msg));
-            instance().send(response, in.respondTo());
-            return;
-        }
-
-        PaxosCleanupLocalCoordinator coordinator = 
PaxosCleanupLocalCoordinator.create(request);
+    public static IVerbHandler<PaxosCleanupRequest> 
createVerbHandler(SharedContext ctx)
+    {
+        return in -> {
+            PaxosCleanupRequest request = in.payload;
 
-        coordinator.addCallback(new FutureCallback<PaxosCleanupResponse>()
-        {
-            public void onSuccess(@Nullable PaxosCleanupResponse finished)
+            if (!PaxosCleanup.isInRangeAndShouldProcess(ctx, request.ranges, 
request.tableId))
             {
-                Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow());
-                instance().send(response, in.respondTo());
+                String msg = String.format("Rejecting cleanup request %s from 
%s. Some ranges are not replicated (%s)",
+                                           request.session, in.from(), 
request.ranges);
+                Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
msg));
+                ctx.messaging().send(response, in.respondTo());
+                return;
             }
 
-            public void onFailure(Throwable throwable)
-            {
-                Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
throwable.getMessage()));
-                instance().send(response, in.respondTo());
-            }
-        });
+            PaxosCleanupLocalCoordinator coordinator = 
PaxosCleanupLocalCoordinator.create(ctx, request);
 
-        // ack the request so the coordinator knows we've started
-        instance().respond(noPayload, in);
+            coordinator.addCallback(new FutureCallback<>()
+            {
+                public void onSuccess(@Nullable PaxosCleanupResponse finished)
+                {
+                    Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow());
+                    ctx.messaging().send(response, in.respondTo());
+                }
+
+                public void onFailure(Throwable throwable)
+                {
+                    Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
throwable.getMessage()));
+                    ctx.messaging().send(response, in.respondTo());
+                }
+            });
+
+            // ack the request so the coordinator knows we've started
+            ctx.messaging().respond(noPayload, in);
+
+            coordinator.start();
+        };
+    }
 
-        coordinator.start();
-    };
+    public static final IVerbHandler<PaxosCleanupRequest> verbHandler = 
createVerbHandler(SharedContext.Global.instance);
 
     public static final IVersionedSerializer<PaxosCleanupRequest> serializer = 
new IVersionedSerializer<PaxosCleanupRequest>()
     {
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupResponse.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupResponse.java
index 1c90162001..2315d687bc 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupResponse.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupResponse.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 public class PaxosCleanupResponse
@@ -53,7 +54,12 @@ public class PaxosCleanupResponse
         return new PaxosCleanupResponse(session, false, message);
     }
 
-    public static final IVerbHandler<PaxosCleanupResponse> verbHandler = 
(message) -> PaxosCleanupSession.finishSession(message.from(), message.payload);
+    public static IVerbHandler<PaxosCleanupResponse> 
createVerbHandler(SharedContext ctx)
+    {
+        return message -> ctx.paxosRepairState().finishSession(message.from(), 
message.payload);
+    }
+
+    public static final IVerbHandler<PaxosCleanupResponse> verbHandler = 
createVerbHandler(SharedContext.Global.instance);
 
     public static final IVersionedSerializer<PaxosCleanupResponse> serializer 
= new IVersionedSerializer<PaxosCleanupResponse>()
     {
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java
index 5f1eea6b68..681f67a56c 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupSession.java
@@ -20,14 +20,12 @@ package org.apache.cassandra.service.paxos.cleanup;
 
 import java.lang.ref.WeakReference;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RequestFailureReason;
@@ -38,23 +36,21 @@ import 
org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_CLEANUP_SESSION_TIMEOUT_SECONDS;
 import static org.apache.cassandra.net.Verb.PAXOS2_CLEANUP_REQ;
-import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable,
                                                                       
IEndpointStateChangeSubscriber,
                                                                       
IFailureDetectionEventListener,
                                                                       
RequestCallbackWithFailure<Void>
 {
-    private static final Map<UUID, PaxosCleanupSession> sessions = new 
ConcurrentHashMap<>();
-
     static final long TIMEOUT_NANOS;
+
     static
     {
         long timeoutSeconds = PAXOS_CLEANUP_SESSION_TIMEOUT_SECONDS.getLong();
@@ -63,10 +59,12 @@ public class PaxosCleanupSession extends AsyncFuture<Void> 
implements Runnable,
 
     private static class TimeoutTask implements Runnable
     {
+        private final SharedContext ctx;
         private final WeakReference<PaxosCleanupSession> ref;
 
         TimeoutTask(PaxosCleanupSession session)
         {
+            this.ctx = session.ctx;
             this.ref = new WeakReference<>(session);
         }
 
@@ -77,7 +75,7 @@ public class PaxosCleanupSession extends AsyncFuture<Void> 
implements Runnable,
             if (session == null || session.isDone())
                 return;
 
-            long remaining = session.lastMessageSentNanos + TIMEOUT_NANOS - 
nanoTime();
+            long remaining = session.lastMessageSentNanos + TIMEOUT_NANOS - 
ctx.clock().nanoTime();
             if (remaining > 0)
                 schedule(remaining);
             else
@@ -86,7 +84,7 @@ public class PaxosCleanupSession extends AsyncFuture<Void> 
implements Runnable,
 
         ScheduledFuture<?> schedule(long delayNanos)
         {
-            return 
ScheduledExecutors.scheduledTasks.scheduleTimeoutWithDelay(this, delayNanos, 
TimeUnit.NANOSECONDS);
+            return ctx.scheduledTasks().scheduleTimeoutWithDelay(this, 
delayNanos, TimeUnit.NANOSECONDS);
         }
 
         private static ScheduledFuture<?> schedule(PaxosCleanupSession session)
@@ -95,38 +93,29 @@ public class PaxosCleanupSession extends AsyncFuture<Void> 
implements Runnable,
         }
     }
 
-    private final UUID session = UUID.randomUUID();
+    private final SharedContext ctx;
+    public final UUID session = UUID.randomUUID();
     private final TableId tableId;
     private final Collection<Range<Token>> ranges;
     private final Queue<InetAddressAndPort> pendingCleanups = new 
ConcurrentLinkedQueue<>();
     private InetAddressAndPort inProgress = null;
-    private volatile long lastMessageSentNanos = nanoTime();
+    private volatile long lastMessageSentNanos;
     private ScheduledFuture<?> timeout;
 
-    PaxosCleanupSession(Collection<InetAddressAndPort> endpoints, TableId 
tableId, Collection<Range<Token>> ranges)
+    PaxosCleanupSession(SharedContext ctx, Collection<InetAddressAndPort> 
endpoints, TableId tableId, Collection<Range<Token>> ranges)
     {
+        this.ctx = ctx;
         this.tableId = tableId;
         this.ranges = ranges;
 
         pendingCleanups.addAll(endpoints);
-    }
-
-    private static void setSession(PaxosCleanupSession session)
-    {
-        Preconditions.checkState(!sessions.containsKey(session.session));
-        sessions.put(session.session, session);
-    }
-
-    private static void removeSession(PaxosCleanupSession session)
-    {
-        Preconditions.checkState(sessions.containsKey(session.session));
-        sessions.remove(session.session);
+        lastMessageSentNanos = ctx.clock().nanoTime();
     }
 
     @Override
     public void run()
     {
-        setSession(this);
+        ctx.paxosRepairState().setSession(this);
         startNextOrFinish();
         if (!isDone())
             timeout = TimeoutTask.schedule(this);
@@ -134,10 +123,10 @@ public class PaxosCleanupSession extends 
AsyncFuture<Void> implements Runnable,
 
     private void startCleanup(InetAddressAndPort endpoint)
     {
-        lastMessageSentNanos = nanoTime();
+        lastMessageSentNanos = ctx.clock().nanoTime();
         PaxosCleanupRequest completer = new PaxosCleanupRequest(session, 
tableId, ranges);
         Message<PaxosCleanupRequest> msg = Message.out(PAXOS2_CLEANUP_REQ, 
completer);
-        MessagingService.instance().sendWithCallback(msg, endpoint, this);
+        ctx.messaging().sendWithCallback(msg, endpoint, this);
     }
 
     private synchronized void startNextOrFinish()
@@ -157,7 +146,7 @@ public class PaxosCleanupSession extends AsyncFuture<Void> 
implements Runnable,
         }
         else
         {
-            removeSession(this);
+            ctx.paxosRepairState().removeSession(this);
             trySuccess(null);
             if (timeout != null)
                 timeout.cancel(true);
@@ -168,13 +157,13 @@ public class PaxosCleanupSession extends 
AsyncFuture<Void> implements Runnable,
     {
         if (isDone())
             return;
-        removeSession(this);
+        ctx.paxosRepairState().removeSession(this);
         tryFailure(new PaxosCleanupException(message));
         if (timeout != null)
             timeout.cancel(true);
     }
 
-    private synchronized void finish(InetAddressAndPort from, 
PaxosCleanupResponse finished)
+    synchronized void finish(InetAddressAndPort from, PaxosCleanupResponse 
finished)
     {
         Preconditions.checkArgument(from.equals(inProgress), "Received 
unexpected cleanup complete response from %s for session %s. Expected %s", 
from, session, inProgress);
         inProgress = null;
@@ -189,13 +178,6 @@ public class PaxosCleanupSession extends AsyncFuture<Void> 
implements Runnable,
         }
     }
 
-    public static void finishSession(InetAddressAndPort from, 
PaxosCleanupResponse response)
-    {
-        PaxosCleanupSession session = sessions.get(response.session);
-        if (session != null)
-            session.finish(from, response);
-    }
-
     private synchronized void maybeKillSession(InetAddressAndPort unavailable, 
String reason)
     {
         // don't fail if we've already completed the cleanup for the 
unavailable endpoint,
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
index 92d8d35028..b3104788d2 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
@@ -18,25 +18,16 @@
 
 package org.apache.cassandra.service.paxos.cleanup;
 
-import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.service.paxos.Ballot;
-import org.apache.cassandra.service.paxos.PaxosState;
-import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
-import org.apache.cassandra.utils.concurrent.IntrusiveStack;
-
-import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
-import static org.apache.cassandra.net.NoPayload.noPayload;
 
 public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements 
RequestCallbackWithFailure<Void>
 {
@@ -47,14 +38,14 @@ public class PaxosFinishPrepareCleanup extends 
AsyncFuture<Void> implements Requ
         this.waitingResponse = new HashSet<>(endpoints);
     }
 
-    public static PaxosFinishPrepareCleanup 
finish(Collection<InetAddressAndPort> endpoints, PaxosCleanupHistory result)
+    public static PaxosFinishPrepareCleanup finish(SharedContext ctx, 
Collection<InetAddressAndPort> endpoints, PaxosCleanupHistory result)
     {
         PaxosFinishPrepareCleanup callback = new 
PaxosFinishPrepareCleanup(endpoints);
         synchronized (callback)
         {
             Message<PaxosCleanupHistory> message = 
Message.out(Verb.PAXOS2_CLEANUP_FINISH_PREPARE_REQ, result);
             for (InetAddressAndPort endpoint : endpoints)
-                MessagingService.instance().sendWithCallback(message, 
endpoint, callback);
+                ctx.messaging().sendWithCallback(message, endpoint, callback);
         }
         return callback;
     }
@@ -77,93 +68,10 @@ public class PaxosFinishPrepareCleanup extends 
AsyncFuture<Void> implements Requ
             trySuccess(null);
     }
 
-    static class PendingCleanup extends IntrusiveStack<PendingCleanup>
+    public static IVerbHandler<PaxosCleanupHistory> 
createVerbHandler(SharedContext ctx)
     {
-        private static final AtomicReference<PendingCleanup> pendingCleanup = 
new AtomicReference();
-        private static final Runnable CLEANUP = () -> {
-            PendingCleanup list = pendingCleanup.getAndSet(null);
-            if (list == null)
-                return;
-
-            Ballot highBound = Ballot.none();
-            for (PendingCleanup pending : IntrusiveStack.iterable(list))
-            {
-                PaxosCleanupHistory cleanupHistory = pending.message.payload;
-                if (cleanupHistory.highBound.compareTo(highBound) > 0)
-                    highBound = cleanupHistory.highBound;
-            }
-            try
-            {
-                try
-                {
-                    PaxosState.ballotTracker().updateLowBound(highBound);
-                }
-                catch (IOException e)
-                {
-                    throw new FSWriteError(e);
-                }
-            }
-            catch (Throwable t)
-            {
-                for (PendingCleanup pending : IntrusiveStack.iterable(list))
-                    MessagingService.instance().respondWithFailure(UNKNOWN, 
pending.message);
-                throw t;
-            }
-
-            Set<PendingCleanup> failed = null;
-            Throwable fail = null;
-            for (PendingCleanup pending : IntrusiveStack.iterable(list))
-            {
-                try
-                {
-                    
Schema.instance.getColumnFamilyStoreInstance(pending.message.payload.tableId)
-                                   
.syncPaxosRepairHistory(pending.message.payload.history, false);
-                }
-                catch (Throwable t)
-                {
-                    fail = Throwables.merge(fail, t);
-                    if (failed == null)
-                        failed = Collections.newSetFromMap(new 
IdentityHashMap<>());
-                    failed.add(pending);
-                    MessagingService.instance().respondWithFailure(UNKNOWN, 
pending.message);
-                }
-            }
-
-            try
-            {
-                SystemKeyspace.flushPaxosRepairHistory();
-                for (PendingCleanup pending : IntrusiveStack.iterable(list))
-                {
-                    if (failed == null || !failed.contains(pending))
-                        MessagingService.instance().respond(noPayload, 
pending.message);
-                }
-            }
-            catch (Throwable t)
-            {
-                fail = Throwables.merge(fail, t);
-                for (PendingCleanup pending : IntrusiveStack.iterable(list))
-                {
-                    if (failed == null || !failed.contains(pending))
-                        
MessagingService.instance().respondWithFailure(UNKNOWN, pending.message);
-                }
-            }
-            Throwables.maybeFail(fail);
-        };
-
-        final Message<PaxosCleanupHistory> message;
-        PendingCleanup(Message<PaxosCleanupHistory> message)
-        {
-            this.message = message;
-        }
-
-        public static void add(Message<PaxosCleanupHistory> message)
-        {
-            PendingCleanup next = new PendingCleanup(message);
-            PendingCleanup prev = IntrusiveStack.push(AtomicReference::get, 
AtomicReference::compareAndSet, pendingCleanup, next);
-            if (prev == null)
-                Stage.MISC.execute(CLEANUP);
-        }
+        return ctx.paxosRepairState()::addCleanupHistory;
     }
 
-    public static final IVerbHandler<PaxosCleanupHistory> verbHandler = 
PendingCleanup::add;
+    public static final IVerbHandler<PaxosCleanupHistory> verbHandler = 
createVerbHandler(SharedContext.Global.instance);
 }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
 b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosRepairState.java
similarity index 56%
copy from 
src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
copy to 
src/java/org/apache/cassandra/service/paxos/cleanup/PaxosRepairState.java
index 92d8d35028..5636d5cb73 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosFinishPrepareCleanup.java
+++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosRepairState.java
@@ -19,68 +19,124 @@
 package org.apache.cassandra.service.paxos.cleanup;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.utils.Throwables;
-import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.IntrusiveStack;
 
 import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
 import static org.apache.cassandra.net.NoPayload.noPayload;
 
-public class PaxosFinishPrepareCleanup extends AsyncFuture<Void> implements 
RequestCallbackWithFailure<Void>
+/**
+ * Tracks the state of paxos repair cleanup work
+ */
+public class PaxosRepairState
 {
-    private final Set<InetAddressAndPort> waitingResponse;
+    private final SharedContext ctx;
+    private final AtomicReference<PendingCleanup> pendingCleanup = new 
AtomicReference<>();
+    private final Map<UUID, PaxosCleanupSession> sessions = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<TableId, PaxosTableRepairs> tableRepairsMap = 
new ConcurrentHashMap<>();
 
-    PaxosFinishPrepareCleanup(Collection<InetAddressAndPort> endpoints)
+    public PaxosRepairState(SharedContext ctx)
     {
-        this.waitingResponse = new HashSet<>(endpoints);
+        this.ctx = ctx;
     }
 
-    public static PaxosFinishPrepareCleanup 
finish(Collection<InetAddressAndPort> endpoints, PaxosCleanupHistory result)
+    public static PaxosRepairState instance()
     {
-        PaxosFinishPrepareCleanup callback = new 
PaxosFinishPrepareCleanup(endpoints);
-        synchronized (callback)
-        {
-            Message<PaxosCleanupHistory> message = 
Message.out(Verb.PAXOS2_CLEANUP_FINISH_PREPARE_REQ, result);
-            for (InetAddressAndPort endpoint : endpoints)
-                MessagingService.instance().sendWithCallback(message, 
endpoint, callback);
-        }
-        return callback;
+        return Holder.instance;
+    }
+
+    PaxosTableRepairs getForTable(TableId tableId)
+    {
+        return tableRepairsMap.computeIfAbsent(tableId, k -> new 
PaxosTableRepairs());
+    }
+
+    public void evictHungRepairs()
+    {
+        long deadline = ctx.clock().nanoTime() - TimeUnit.MINUTES.toNanos(5);
+        for (PaxosTableRepairs repairs : tableRepairsMap.values())
+            repairs.evictHungRepairs(deadline);
+    }
+
+    public void clearRepairs()
+    {
+        for (PaxosTableRepairs repairs : tableRepairsMap.values())
+            repairs.clear();
     }
 
-    @Override
-    public void onFailure(InetAddressAndPort from, RequestFailureReason reason)
+
+    public void setSession(PaxosCleanupSession session)
     {
-        tryFailure(new PaxosCleanupException(reason + " failure response from 
" + from));
+        Preconditions.checkState(!sessions.containsKey(session.session));
+        sessions.put(session.session, session);
     }
 
-    public synchronized void onResponse(Message<Void> msg)
+    public void removeSession(PaxosCleanupSession session)
     {
-        if (isDone())
-            return;
+        Preconditions.checkState(sessions.containsKey(session.session));
+        sessions.remove(session.session);
+    }
 
-        if (!waitingResponse.remove(msg.from()))
-            throw new IllegalArgumentException("Received unexpected response 
from " + msg.from());
+    public void finishSession(InetAddressAndPort from, PaxosCleanupResponse 
response)
+    {
+        PaxosCleanupSession session = sessions.get(response.session);
+        if (session != null)
+            session.finish(from, response);
+    }
+    
+    public void addCleanupHistory(Message<PaxosCleanupHistory> message)
+    {
+        PendingCleanup.add(ctx, pendingCleanup, message);
+    }
 
-        if (waitingResponse.isEmpty())
-            trySuccess(null);
+    /**
+     * This is not required, but it helps to see what escapes simulation... 
can put a break point on instance() while running the tests
+     */
+    private static class Holder
+    {
+        private static final PaxosRepairState instance = new 
PaxosRepairState(SharedContext.Global.instance);
     }
 
     static class PendingCleanup extends IntrusiveStack<PendingCleanup>
     {
-        private static final AtomicReference<PendingCleanup> pendingCleanup = 
new AtomicReference();
-        private static final Runnable CLEANUP = () -> {
+        private final Message<PaxosCleanupHistory> message;
+
+        PendingCleanup(Message<PaxosCleanupHistory> message)
+        {
+            this.message = message;
+        }
+
+        private static void add(SharedContext ctx, 
AtomicReference<PendingCleanup> pendingCleanup, Message<PaxosCleanupHistory> 
message)
+        {
+            PendingCleanup next = new PendingCleanup(message);
+            PendingCleanup prev = IntrusiveStack.push(AtomicReference::get, 
AtomicReference::compareAndSet, pendingCleanup, next);
+            if (prev == null)
+                Stage.MISC.execute(() -> cleanup(ctx, pendingCleanup));
+        }
+
+        private static void cleanup(SharedContext ctx, 
AtomicReference<PendingCleanup> pendingCleanup)
+        {
             PendingCleanup list = pendingCleanup.getAndSet(null);
             if (list == null)
                 return;
@@ -106,7 +162,7 @@ public class PaxosFinishPrepareCleanup extends 
AsyncFuture<Void> implements Requ
             catch (Throwable t)
             {
                 for (PendingCleanup pending : IntrusiveStack.iterable(list))
-                    MessagingService.instance().respondWithFailure(UNKNOWN, 
pending.message);
+                    ctx.messaging().respondWithFailure(UNKNOWN, 
pending.message);
                 throw t;
             }
 
@@ -125,7 +181,7 @@ public class PaxosFinishPrepareCleanup extends 
AsyncFuture<Void> implements Requ
                     if (failed == null)
                         failed = Collections.newSetFromMap(new 
IdentityHashMap<>());
                     failed.add(pending);
-                    MessagingService.instance().respondWithFailure(UNKNOWN, 
pending.message);
+                    ctx.messaging().respondWithFailure(UNKNOWN, 
pending.message);
                 }
             }
 
@@ -135,7 +191,7 @@ public class PaxosFinishPrepareCleanup extends 
AsyncFuture<Void> implements Requ
                 for (PendingCleanup pending : IntrusiveStack.iterable(list))
                 {
                     if (failed == null || !failed.contains(pending))
-                        MessagingService.instance().respond(noPayload, 
pending.message);
+                        ctx.messaging().respond(noPayload, pending.message);
                 }
             }
             catch (Throwable t)
@@ -144,26 +200,10 @@ public class PaxosFinishPrepareCleanup extends 
AsyncFuture<Void> implements Requ
                 for (PendingCleanup pending : IntrusiveStack.iterable(list))
                 {
                     if (failed == null || !failed.contains(pending))
-                        
MessagingService.instance().respondWithFailure(UNKNOWN, pending.message);
+                        ctx.messaging().respondWithFailure(UNKNOWN, 
pending.message);
                 }
             }
             Throwables.maybeFail(fail);
-        };
-
-        final Message<PaxosCleanupHistory> message;
-        PendingCleanup(Message<PaxosCleanupHistory> message)
-        {
-            this.message = message;
-        }
-
-        public static void add(Message<PaxosCleanupHistory> message)
-        {
-            PendingCleanup next = new PendingCleanup(message);
-            PendingCleanup prev = IntrusiveStack.push(AtomicReference::get, 
AtomicReference::compareAndSet, pendingCleanup, next);
-            if (prev == null)
-                Stage.MISC.execute(CLEANUP);
         }
     }
-
-    public static final IVerbHandler<PaxosCleanupHistory> verbHandler = 
PendingCleanup::add;
 }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
index 9f30692ad4..12a319bcf4 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
@@ -36,10 +36,12 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosRepairHistory;
@@ -75,14 +77,14 @@ public class PaxosStartPrepareCleanup extends 
AsyncFuture<PaxosCleanupHistory> i
      * prepare message to prevent racing with gossip dissemination and 
guarantee that every repair participant is aware
      * of the pending ring change during repair.
      */
-    public static PaxosStartPrepareCleanup prepare(TableId tableId, 
Collection<InetAddressAndPort> endpoints, EndpointState localEpState, 
Collection<Range<Token>> ranges)
+    public static PaxosStartPrepareCleanup prepare(SharedContext ctx, TableId 
tableId, Collection<InetAddressAndPort> endpoints, EndpointState localEpState, 
Collection<Range<Token>> ranges)
     {
         PaxosStartPrepareCleanup callback = new 
PaxosStartPrepareCleanup(tableId, endpoints);
         synchronized (callback)
         {
             Message<Request> message = 
Message.out(PAXOS2_CLEANUP_START_PREPARE_REQ, new Request(tableId, 
localEpState, ranges));
             for (InetAddressAndPort endpoint : endpoints)
-                MessagingService.instance().sendWithCallback(message, 
endpoint, callback);
+                ctx.messaging().sendWithCallback(message, endpoint, callback);
         }
         return callback;
     }
@@ -110,24 +112,24 @@ public class PaxosStartPrepareCleanup extends 
AsyncFuture<PaxosCleanupHistory> i
             trySuccess(new PaxosCleanupHistory(table, maxBallot, history));
     }
 
-    private static void maybeUpdateTopology(InetAddressAndPort endpoint, 
EndpointState remote)
+    private static void maybeUpdateTopology(SharedContext ctx, 
InetAddressAndPort endpoint, EndpointState remote)
     {
-        EndpointState local = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        EndpointState local = 
ctx.gossiper().getEndpointStateForEndpoint(endpoint);
         if (local == null || local.isSupersededBy(remote))
         {
             logger.trace("updating endpoint info for {} with {}", endpoint, 
remote);
             Map<InetAddressAndPort, EndpointState> states = 
Collections.singletonMap(endpoint, remote);
 
             Gossiper.runInGossipStageBlocking(() -> {
-                Gossiper.instance.notifyFailureDetector(states);
-                Gossiper.instance.applyStateLocally(states);
+                ctx.gossiper().notifyFailureDetector(states);
+                ctx.gossiper().applyStateLocally(states);
             });
             // TODO: We should also wait for schema pulls/pushes, however this 
would be quite an involved change to MigrationManager
             //       (which currently drops some migration tasks on the floor).
             //       Note it would be fine for us to fail to complete the 
migration task and simply treat this response as a failure/timeout.
         }
         // even if we have th latest gossip info, wait until pending range 
calculations are complete
-        PendingRangeCalculatorService.instance.blockUntilFinished();
+        ctx.pendingRangeCalculator().blockUntilFinished();
     }
 
     public static class Request
@@ -181,12 +183,17 @@ public class PaxosStartPrepareCleanup extends 
AsyncFuture<PaxosCleanupHistory> i
         }
     }
 
-    public static final IVerbHandler<Request> verbHandler = in -> {
-        ColumnFamilyStore table = 
Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId);
-        maybeUpdateTopology(in.from(), in.payload.epState);
-        Ballot highBound = newBallot(ballotTracker().getHighBound(), 
ConsistencyLevel.SERIAL);
-        PaxosRepairHistory history = 
table.getPaxosRepairHistoryForRanges(in.payload.ranges);
-        Message<PaxosCleanupHistory> out = in.responseWith(new 
PaxosCleanupHistory(table.metadata.id, highBound, history));
-        MessagingService.instance().send(out, in.respondTo());
-    };
+    public static IVerbHandler<Request> createVerbHandler(SharedContext ctx)
+    {
+        return in -> {
+            ColumnFamilyStore table = 
Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId);
+            maybeUpdateTopology(ctx, in.from(), in.payload.epState);
+            Ballot highBound = newBallot(ballotTracker().getHighBound(), 
ConsistencyLevel.SERIAL);
+            PaxosRepairHistory history = 
table.getPaxosRepairHistoryForRanges(in.payload.ranges);
+            Message<PaxosCleanupHistory> out = in.responseWith(new 
PaxosCleanupHistory(table.metadata.id, highBound, history));
+            ctx.messaging().send(out, in.respondTo());
+        };
+    }
+
+    public static final IVerbHandler<Request> verbHandler = 
createVerbHandler(SharedContext.Global.instance);
 }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosTableRepairs.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosTableRepairs.java
index 6da4e0bce1..cf7f547cb4 100644
--- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosTableRepairs.java
+++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosTableRepairs.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.service.paxos.cleanup;
 import java.util.ArrayDeque;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -34,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.AbstractPaxosRepair;
 import org.apache.cassandra.service.paxos.Ballot;
@@ -42,7 +40,6 @@ import org.apache.cassandra.service.paxos.PaxosRepair;
 import org.apache.cassandra.utils.NoSpamLogger;
 
 import static org.apache.cassandra.service.paxos.Commit.isAfter;
-import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 /**
  * Coordinates repairs on a given key to prevent multiple repairs being 
scheduled for a single key
@@ -211,25 +208,4 @@ public class PaxosTableRepairs implements 
AbstractPaxosRepair.Listener
     {
         return PaxosRepair.create(consistency, key, incompleteBallot, table);
     }
-
-    private static final ConcurrentMap<TableId, PaxosTableRepairs> 
tableRepairsMap = new ConcurrentHashMap<>();
-
-    static PaxosTableRepairs getForTable(TableId tableId)
-    {
-        return tableRepairsMap.computeIfAbsent(tableId, k -> new 
PaxosTableRepairs());
-    }
-
-    public static void evictHungRepairs()
-    {
-        long deadline = nanoTime() - TimeUnit.MINUTES.toNanos(5);
-        for (PaxosTableRepairs repairs : tableRepairsMap.values())
-            repairs.evictHungRepairs(deadline);
-    }
-
-    public static void clearRepairs()
-    {
-        for (PaxosTableRepairs repairs : tableRepairsMap.values())
-            repairs.clear();
-    }
-
 }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java
 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java
index adbd537930..062de3cf59 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
+import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 import org.apache.cassandra.utils.CloseableIterator;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.AUTO_REPAIR_FREQUENCY_SECONDS;
@@ -322,7 +322,7 @@ public class PaxosUncommittedTracker
     {
         runAndLogException("file consolidation", this::consolidateFiles);
         runAndLogException("schedule auto repairs", 
this::schedulePaxosAutoRepairs);
-        runAndLogException("evict hung repairs", 
PaxosTableRepairs::evictHungRepairs);
+        runAndLogException("evict hung repairs", 
PaxosRepairState.instance()::evictHungRepairs);
     }
 
     public synchronized void startAutoRepairs()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
index dd1aa07e20..86b8a5199d 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
@@ -340,7 +341,7 @@ public class PaxosRepairTest extends TestBaseImpl
             List<InetAddressAndPort> endpoints = 
cluster.stream().map(IInstance::broadcastAddress).map(InetAddressAndPort::getByAddress).collect(Collectors.toList());
             Future<?> cleanup = cluster.get(1).appliesOnInstance((List<? 
extends InetSocketAddress> es, ExecutorService exec)-> {
                 TableMetadata metadata = 
Keyspace.open(KEYSPACE).getMetadata().getTableOrViewNullable(TABLE);
-                return 
PaxosCleanup.cleanup(es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()),
 metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec);
+                return PaxosCleanup.cleanup(SharedContext.Global.instance, 
es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()), 
metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec);
             }).apply(endpoints, executor);
 
             Uninterruptibles.awaitUninterruptibly(haveFetchedLowBound);
@@ -404,7 +405,7 @@ public class PaxosRepairTest extends TestBaseImpl
             List<InetAddressAndPort> endpoints = cluster.stream().map(i -> 
InetAddressAndPort.getByAddress(i.broadcastAddress())).collect(Collectors.toList());
             Future<?> cleanup = cluster.get(1).appliesOnInstance((List<? 
extends InetSocketAddress> es, ExecutorService exec)-> {
                 TableMetadata metadata = 
Keyspace.open(KEYSPACE).getMetadata().getTableOrViewNullable(TABLE);
-                return 
PaxosCleanup.cleanup(es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()),
 metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec);
+                return PaxosCleanup.cleanup(SharedContext.Global.instance, 
es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()), 
metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec);
             }).apply(endpoints, executor);
 
             IMessageFilters.Filter dropAllTo1 = 
cluster.verbs(PAXOS2_PREPARE_REQ, PAXOS2_PROPOSE_REQ, 
PAXOS_COMMIT_REQ).from(2).to(1).outbound().drop();
@@ -483,7 +484,7 @@ public class PaxosRepairTest extends TestBaseImpl
             List<InetAddressAndPort> endpoints = cluster.stream().map(i -> 
InetAddressAndPort.getByAddress(i.broadcastAddress())).collect(Collectors.toList());
             Future<?> cleanup = cluster.get(1).appliesOnInstance((List<? 
extends InetSocketAddress> es, ExecutorService exec)-> {
                 TableMetadata metadata = 
Keyspace.open(KEYSPACE).getMetadata().getTableOrViewNullable(TABLE);
-                return 
PaxosCleanup.cleanup(es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()),
 metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec);
+                return PaxosCleanup.cleanup(SharedContext.Global.instance, 
es.stream().map(InetAddressAndPort::getByAddress).collect(Collectors.toSet()), 
metadata, StorageService.instance.getLocalRanges(KEYSPACE), false, exec);
             }).apply(endpoints, executor);
 
             cleanup.get();
diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java 
b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index c819cc465f..81b341bed9 100644
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@ -50,7 +50,6 @@ import javax.annotation.Nullable;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-import org.apache.cassandra.config.UnitConfigOverride;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
@@ -58,7 +57,6 @@ import accord.utils.DefaultRandom;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
-import org.agrona.collections.Long2ObjectHashMap;
 import org.agrona.collections.LongHashSet;
 import org.apache.cassandra.concurrent.ExecutorBuilder;
 import org.apache.cassandra.concurrent.ExecutorBuilderFactory;
@@ -71,6 +69,7 @@ import org.apache.cassandra.concurrent.SequentialExecutorPlus;
 import org.apache.cassandra.concurrent.SimulatedExecutorFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.UnitConfigOverride;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Digest;
@@ -119,7 +118,15 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupComplete;
+import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupHistory;
+import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupRequest;
+import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse;
+import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
+import org.apache.cassandra.service.paxos.cleanup.PaxosFinishPrepareCleanup;
+import org.apache.cassandra.service.paxos.cleanup.PaxosStartPrepareCleanup;
 import org.apache.cassandra.streaming.StreamEventHandler;
 import org.apache.cassandra.streaming.StreamReceiveException;
 import org.apache.cassandra.streaming.StreamSession;
@@ -339,6 +346,12 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                     // these messages are not resilent to ephemeral issues
                     case STATUS_REQ:
                     case STATUS_RSP:
+                    // paxos repair does not support faults and will cause a 
TIMEOUT error, failing the repair
+                    case PAXOS2_CLEANUP_COMPLETE_REQ:
+                    case PAXOS2_CLEANUP_REQ:
+                    case PAXOS2_CLEANUP_RSP2:
+                    case PAXOS2_CLEANUP_START_PREPARE_REQ:
+                    case PAXOS2_CLEANUP_FINISH_PREPARE_REQ:
                         noFaults.add(message.id());
                         return Faults.NONE;
                     default:
@@ -654,6 +667,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
 
             // We run tests in an isolated JVM per class, so not cleaing up is 
safe... but if that assumption ever changes, will need to cleanup
             Stage.ANTI_ENTROPY.unsafeSetExecutor(orderedExecutor);
+            Stage.MISC.unsafeSetExecutor(orderedExecutor);
             Stage.INTERNAL_RESPONSE.unsafeSetExecutor(unorderedScheduled);
             
Mockito.when(failureDetector.isAlive(Mockito.any())).thenReturn(true);
             Thread expectedThread = Thread.currentThread();
@@ -776,10 +790,46 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
             }
         }
 
+        private static class CallbackKey
+        {
+            private final long id;
+            private final InetAddressAndPort peer;
+
+            private CallbackKey(long id, InetAddressAndPort peer)
+            {
+                this.id = id;
+                this.peer = peer;
+            }
+
+            @Override
+            public boolean equals(Object o)
+            {
+                if (this == o) return true;
+                if (o == null || getClass() != o.getClass()) return false;
+                CallbackKey that = (CallbackKey) o;
+                return id == that.id && peer.equals(that.peer);
+            }
+
+            @Override
+            public int hashCode()
+            {
+                return Objects.hash(id, peer);
+            }
+
+            @Override
+            public String toString()
+            {
+                return "CallbackKey{" +
+                       "id=" + id +
+                       ", peer=" + peer +
+                       '}';
+            }
+        }
+
         private class Messaging implements MessageDelivery
         {
             final InetAddressAndPort broadcastAddressAndPort;
-            final Long2ObjectHashMap<CallbackContext> callbacks = new 
Long2ObjectHashMap<>();
+            final Map<CallbackKey, CallbackContext> callbacks = new 
HashMap<>();
 
             private Messaging(InetAddressAndPort broadcastAddressAndPort)
             {
@@ -812,10 +862,11 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                 CallbackContext cb;
                 if (callback != null)
                 {
-                    if (callbacks.containsKey(message.id()))
-                        throw new AssertionError("Message id " + message.id() 
+ " already has a callback");
+                    CallbackKey key = new CallbackKey(message.id(), to);
+                    if (callbacks.containsKey(key))
+                        throw new AssertionError("Message id " + message.id() 
+ " to " + to + " already has a callback");
                     cb = new CallbackContext(callback);
-                    callbacks.put(message.id(), cb);
+                    callbacks.put(key, cb);
                 }
                 else
                 {
@@ -861,7 +912,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                     if (cb != null)
                     {
                         unorderedScheduled.schedule(() -> {
-                            CallbackContext ctx = 
callbacks.remove(message.id());
+                            CallbackContext ctx = callbacks.remove(new 
CallbackKey(message.id(), to));
                             if (ctx != null)
                             {
                                 assert ctx == cb;
@@ -952,6 +1003,21 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
             {
                 return endpoints.get(ep);
             }
+
+            @Override
+            public void notifyFailureDetector(Map<InetAddressAndPort, 
EndpointState> remoteEpStateMap)
+            {
+
+            }
+
+            @Override
+            public void applyStateLocally(Map<InetAddressAndPort, 
EndpointState> epStateMap)
+            {
+                // If we were testing paxos this would be wrong...
+                // CASSANDRA-18917 added support for simulating Gossip, but 
gossip issues were found so couldn't merge that patch...
+                // For the paxos repair, since we don't care about paxos 
messages, this is ok to no-op for now, but if paxos cleanup
+                // ever was to be tested this logic would need to be 
implemented
+            }
         }
 
         class Node implements SharedContext
@@ -965,6 +1031,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
             final Messaging messaging;
             final IValidationManager validationManager;
             private FailingBiConsumer<ColumnFamilyStore, Validator> 
doValidation = DEFAULT_VALIDATION;
+            final PaxosRepairState paxosRepairState;
             private final StreamExecutor defaultStreamExecutor = plan -> {
                 long delayNanos = rs.nextLong(TimeUnit.SECONDS.toNanos(5), 
TimeUnit.MINUTES.toNanos(10));
                 unorderedScheduled.schedule(() -> {
@@ -983,6 +1050,7 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                 this.tokens = tokens;
                 this.messaging = messaging;
                 this.activeRepairService = new ActiveRepairService(this);
+                this.paxosRepairState = new PaxosRepairState(this);
                 this.validationManager = (cfs, validator) -> 
unorderedScheduled.submit(() -> {
                     try
                     {
@@ -993,7 +1061,39 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                         validator.fail(e);
                     }
                 });
-                this.verbHandler = new RepairMessageVerbHandler(this);
+                this.verbHandler = new IVerbHandler<>()
+                {
+                    private final RepairMessageVerbHandler repairVerbHandler = 
new RepairMessageVerbHandler(Node.this);
+                    private final 
IVerbHandler<PaxosStartPrepareCleanup.Request> paxosStartPrepareCleanup = 
PaxosStartPrepareCleanup.createVerbHandler(Node.this);
+                    private final IVerbHandler<PaxosCleanupRequest> 
paxosCleanupRequestIVerbHandler = 
PaxosCleanupRequest.createVerbHandler(Node.this);
+                    private final IVerbHandler<PaxosCleanupHistory> 
paxosFinishPrepareCleanup = 
PaxosFinishPrepareCleanup.createVerbHandler(Node.this);
+                    private final IVerbHandler<PaxosCleanupResponse> 
paxosCleanupResponse = PaxosCleanupResponse.createVerbHandler(Node.this);
+                    private final IVerbHandler<PaxosCleanupComplete.Request> 
paxosCleanupComplete = PaxosCleanupComplete.createVerbHandler(Node.this);
+                    @Override
+                    public void doVerb(Message message) throws IOException
+                    {
+                        switch (message.verb())
+                        {
+                            case PAXOS2_CLEANUP_START_PREPARE_REQ:
+                                paxosStartPrepareCleanup.doVerb(message);
+                                break;
+                            case PAXOS2_CLEANUP_REQ:
+                                
paxosCleanupRequestIVerbHandler.doVerb(message);
+                                break;
+                            case PAXOS2_CLEANUP_FINISH_PREPARE_REQ:
+                                paxosFinishPrepareCleanup.doVerb(message);
+                                break;
+                            case PAXOS2_CLEANUP_RSP2:
+                                paxosCleanupResponse.doVerb(message);
+                                break;
+                            case PAXOS2_CLEANUP_COMPLETE_REQ:
+                                paxosCleanupComplete.doVerb(message);
+                                break;
+                            default:
+                                repairVerbHandler.doVerb(message);
+                        }
+                    }
+                };
 
                 activeRepairService.start();
             }
@@ -1036,10 +1136,12 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                 if (msg.verb().isResponse())
                 {
                     // handle callbacks
-                    if (messaging.callbacks.containsKey(msg.id()))
+                    CallbackKey key = new CallbackKey(msg.id(), msg.from());
+                    if (messaging.callbacks.containsKey(key))
                     {
-                        CallbackContext callback = 
messaging.callbacks.remove(msg.id());
-                        if (callback == null) return;
+                        CallbackContext callback = 
messaging.callbacks.remove(key);
+                        if (callback == null)
+                            return;
                         try
                         {
                             if (msg.isFailureResponse())
@@ -1114,6 +1216,18 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                 return unorderedScheduled;
             }
 
+            @Override
+            public ScheduledExecutorPlus nonPeriodicTasks()
+            {
+                return unorderedScheduled;
+            }
+
+            @Override
+            public ScheduledExecutorPlus scheduledTasks()
+            {
+                return unorderedScheduled;
+            }
+
             @Override
             public Supplier<Random> random()
             {
@@ -1198,6 +1312,18 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
             {
                 return streamExecutor;
             }
+
+            @Override
+            public PendingRangeCalculatorService pendingRangeCalculator()
+            {
+                return PendingRangeCalculatorService.instance;
+            }
+
+            @Override
+            public PaxosRepairState paxosRepairState()
+            {
+                return paxosRepairState;
+            }
         }
 
         private Message serde(Message msg)
@@ -1285,6 +1411,10 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                         next = it.next();
                     }
                     if 
(FuzzTestBase.class.getName().equals(next.getClassName())) return 
Access.MAIN_THREAD_ONLY;
+                    // this is non-deterministic... but since the scope of the 
work is testing repair and not paxos... this is unblocked for now...
+                    if 
(("org.apache.cassandra.service.paxos.Paxos".equals(next.getClassName()) && 
"newBallot".equals(next.getMethodName()))
+                        || 
("org.apache.cassandra.service.paxos.uncommitted.PaxosBallotTracker".equals(next.getClassName())
 && "updateLowBound".equals(next.getMethodName())))
+                        return Access.MAIN_THREAD_ONLY;
                     if 
(next.getClassName().startsWith("org.apache.cassandra.db.") || 
next.getClassName().startsWith("org.apache.cassandra.gms.") || 
next.getClassName().startsWith("org.apache.cassandra.cql3.") || 
next.getClassName().startsWith("org.apache.cassandra.metrics.") || 
next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.")
                         || 
next.getClassName().startsWith("org.apache.cassandra.utils.TimeUUID") // this 
would be good to solve
                         || 
next.getClassName().startsWith(PendingAntiCompaction.class.getName()))
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java 
b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 165df1bbc9..872ee99abb 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -40,6 +40,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.cassandra.repair.messages.SyncResponse;
 import org.apache.cassandra.repair.messages.ValidationResponse;
+import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
@@ -67,7 +68,6 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.paxos.Paxos;
 import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupRequest;
 import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse;
-import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupSession;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -875,7 +875,7 @@ public class RepairJobTest
                 if (message.verb() == PAXOS2_CLEANUP_REQ)
                 {
                     PaxosCleanupRequest request = (PaxosCleanupRequest) 
message.payload;
-                    PaxosCleanupSession.finishSession(to, new 
PaxosCleanupResponse(request.session, true, null));
+                    PaxosRepairState.instance().finishSession(to, new 
PaxosCleanupResponse(request.session, true, null));
                     return false;
                 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to