This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c9625e0102dab66f41d3ef2338c54d499e73a8c5
Merge: c2a78639de ca0b77d743
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Tue Mar 26 13:04:03 2024 -0700

    Merge branch 'cassandra-5.0' into trunk

 src/java/org/apache/cassandra/gms/Gossiper.java    |   2 +
 src/java/org/apache/cassandra/gms/IGossiper.java   |   3 +
 .../org/apache/cassandra/net/MessageDelivery.java  |   4 +
 .../org/apache/cassandra/net/MessagingService.java |   5 -
 .../org/apache/cassandra/repair/RepairJob.java     |   2 +-
 .../org/apache/cassandra/repair/SharedContext.java |  40 +++++
 .../cassandra/service/ActiveRepairService.java     |   4 +-
 .../apache/cassandra/service/StorageService.java   |   7 +-
 .../org/apache/cassandra/service/paxos/Paxos.java  |  12 +-
 .../cassandra/service/paxos/PaxosRepair.java       |   9 +-
 .../service/paxos/cleanup/PaxosCleanup.java        |  34 ++---
 .../paxos/cleanup/PaxosCleanupComplete.java        |  28 ++--
 .../cleanup/PaxosCleanupLocalCoordinator.java      |  22 +--
 .../service/paxos/cleanup/PaxosCleanupRequest.java |  70 ++++-----
 .../paxos/cleanup/PaxosCleanupResponse.java        |   8 +-
 .../service/paxos/cleanup/PaxosCleanupSession.java |  54 +++----
 .../paxos/cleanup/PaxosFinishPrepareCleanup.java   | 112 ++------------
 ...shPrepareCleanup.java => PaxosRepairState.java} | 138 +++++++++++-------
 .../paxos/cleanup/PaxosStartPrepareCleanup.java    |  38 +++--
 .../service/paxos/cleanup/PaxosTableRepairs.java   |  24 ---
 .../paxos/uncommitted/PaxosUncommittedTracker.java |   4 +-
 .../org/apache/cassandra/tcm/ClusterMetadata.java  |   7 +-
 .../distributed/test/PaxosRepairTest.java          |   7 +-
 .../org/apache/cassandra/repair/FuzzTestBase.java  | 162 ++++++++++++++++++---
 .../org/apache/cassandra/repair/RepairJobTest.java |   4 +-
 25 files changed, 459 insertions(+), 341 deletions(-)

diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 21d4ab5b8f,d907f76686..35cf57d3e1
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -1058,6 -1317,18 +1058,7 @@@ public class Gossiper implements IFailu
          return reqdEndpointState;
      }
  
 -    /**
 -     * determine which endpoint started up earlier
 -     */
 -    public int compareEndpointStartup(InetAddressAndPort addr1, 
InetAddressAndPort addr2)
 -    {
 -        EndpointState ep1 = getEndpointStateForEndpoint(addr1);
 -        EndpointState ep2 = getEndpointStateForEndpoint(addr2);
 -        assert ep1 != null && ep2 != null;
 -        return ep1.getHeartBeatState().getGeneration() - 
ep2.getHeartBeatState().getGeneration();
 -    }
 -
+     @Override
      public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> 
remoteEpStateMap)
      {
          for (Entry<InetAddressAndPort, EndpointState> entry : 
remoteEpStateMap.entrySet())
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index ceae703097,94586b41c8..d1e2f7b260
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -472,41 -449,8 +472,36 @@@ public class MessagingService extends M
          send(message.responseWith(response), message.respondTo());
      }
  
 +    public <RSP> Future<RSP> sendWithResponse(InetAddressAndPort to, 
Message<?> msg)
 +    {
 +        Promise<RSP> future = AsyncPromise.uncancellable();
 +        MessagingService.instance().sendWithCallback(msg, to,
 +                                                     new 
RequestCallback<RSP>()
 +                                                     {
 +                                                         @Override
 +                                                         public void 
onResponse(Message<RSP> msg)
 +                                                         {
 +                                                             
future.setSuccess(msg.payload);
 +                                                         }
 +
 +                                                         @Override
 +                                                         public void 
onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
 +                                                         {
 +                                                             
future.setFailure(new RuntimeException(failureReason.toString()));
 +                                                         }
 +                                                     });
 +
 +        return future;
 +    }
 +
-     public <V> void respondWithFailure(RequestFailureReason reason, 
Message<?> message)
-     {
-         send(Message.failureResponse(message.id(), message.expiresAtNanos(), 
reason), message.respondTo());
-     }
- 
      public void send(Message message, InetAddressAndPort to, ConnectionType 
specifyConnection)
      {
 +        if (isShuttingDown)
 +        {
 +            logger.error("Cannot send the message {} to {}, as messaging 
service is shutting down", message, to);
 +            return;
 +        }
 +
          if (logger.isTraceEnabled())
          {
              logger.trace("{} sending {} to {}@{}", 
FBUtilities.getBroadcastAddressAndPort(), message.verb(), message.id(), to);
diff --cc src/java/org/apache/cassandra/repair/SharedContext.java
index 8ccc88f584,440da2cf45..6c13ae99c9
--- a/src/java/org/apache/cassandra/repair/SharedContext.java
+++ b/src/java/org/apache/cassandra/repair/SharedContext.java
@@@ -37,6 -37,8 +37,7 @@@ import org.apache.cassandra.locator.Ine
  import org.apache.cassandra.net.MessageDelivery;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.ActiveRepairService;
 -import org.apache.cassandra.service.PendingRangeCalculatorService;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
  import org.apache.cassandra.streaming.StreamPlan;
  import org.apache.cassandra.utils.Clock;
  import org.apache.cassandra.utils.FBUtilities;
@@@ -77,6 -81,8 +80,7 @@@ public interface SharedContex
      IValidationManager validationManager();
      TableRepairManager repairManager(ColumnFamilyStore store);
      StreamExecutor streamExecutor();
 -    PendingRangeCalculatorService pendingRangeCalculator();
+     PaxosRepairState paxosRepairState();
  
      class Global implements SharedContext
      {
@@@ -171,6 -189,18 +187,12 @@@
          {
              return StreamPlan::execute;
          }
+ 
 -        @Override
 -        public PendingRangeCalculatorService pendingRangeCalculator()
 -        {
 -            return PendingRangeCalculatorService.instance;
 -        }
 -
+         @Override
+         public PaxosRepairState paxosRepairState()
+         {
+             return PaxosRepairState.instance();
+         }
      }
  
      class ForwardingSharedContext implements SharedContext
@@@ -276,5 -318,17 +310,11 @@@
          {
              return delegate().streamExecutor();
          }
+ 
 -        @Override
 -        public PendingRangeCalculatorService pendingRangeCalculator()
 -        {
 -            return delegate().pendingRangeCalculator();
 -        }
 -
+         @Override
+         public PaxosRepairState paxosRepairState()
+         {
+             return delegate().paxosRepairState();
+         }
      }
  }
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 6b2e814185,e120122c08..a7252eca2b
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -1157,15 -1128,16 +1157,15 @@@ public class ActiveRepairService implem
                                                               "Skipping this 
check can lead to paxos correctness issues",
                                                               range, ksName, 
reason, downEndpoints, SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE.getKey(), 
SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_KEYSPACES.getKey()));
                  }
 -                EndpointsForToken pending = 
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(range.right,
 ksName);
 -                if (pending.size() > 1 && 
!PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getBoolean())
 +                // todo: can probably be removed with TrM
 +                if 
(ClusterMetadata.current().hasPendingRangesFor(keyspace.getMetadata(), 
range.right) && PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getBoolean())
                  {
 -                    throw new RuntimeException(String.format("Cannot begin 
paxos auto repair for %s in %s.%s, multiple pending endpoints exist for range 
(%s). " +
 +                    throw new RuntimeException(String.format("Cannot begin 
paxos auto repair for %s in %s.%s, multiple pending endpoints exist for range 
(metadata = %s). " +
                                                               "Set -D%s=true 
to skip this check",
 -                                                             range, 
table.keyspace, table.name, pending, 
PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getKey()));
 +                                                             range, 
table.keyspace, table.name, ClusterMetadata.current(), 
PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getKey()));
  
                  }
-                 futures.add(() -> PaxosCleanup.cleanup(endpoints, table, 
Collections.singleton(range), false, repairCommandExecutor()));
 -                Future<Void> future = PaxosCleanup.cleanup(ctx, endpoints, 
table, Collections.singleton(range), false, repairCommandExecutor());
 -                futures.add(future);
++                futures.add(() -> PaxosCleanup.cleanup(ctx, endpoints, table, 
Collections.singleton(range), false, repairCommandExecutor()));
              }
          }
  
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 36dabcd807,cbdadee1c3..928be162a0
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -154,8 -175,12 +154,9 @@@ import org.apache.cassandra.locator.Sys
  import org.apache.cassandra.metrics.Sampler;
  import org.apache.cassandra.metrics.SamplingManager;
  import org.apache.cassandra.metrics.StorageMetrics;
 -import org.apache.cassandra.net.AsyncOneResponse;
 -import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.repair.RepairCoordinator;
+ import org.apache.cassandra.repair.SharedContext;
  import org.apache.cassandra.repair.messages.RepairOption;
  import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
  import org.apache.cassandra.schema.KeyspaceMetadata;
@@@ -3332,8 -4863,8 +3333,8 @@@ public class StorageService extends Not
          if (table == null)
              return ImmediateFuture.success(null);
  
 -        List<Range<Token>> ranges = getLocalAndPendingRanges(table.keyspace);
 +        Collection<Range<Token>> ranges = 
getLocalAndPendingRanges(table.keyspace);
-         PaxosCleanupLocalCoordinator coordinator = 
PaxosCleanupLocalCoordinator.createForAutoRepair(tableId, ranges);
+         PaxosCleanupLocalCoordinator coordinator = 
PaxosCleanupLocalCoordinator.createForAutoRepair(SharedContext.Global.instance, 
tableId, ranges);
          ScheduledExecutors.optionalTasks.submit(coordinator::start);
          return coordinator;
      }
diff --cc src/java/org/apache/cassandra/service/paxos/Paxos.java
index 99570ba917,473b5741ad..f3938f1ccb
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@@ -87,12 -86,9 +88,12 @@@ import org.apache.cassandra.service.CAS
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.FailureRecordingCallback.AsMap;
  import org.apache.cassandra.service.paxos.Commit.Proposal;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 +import org.apache.cassandra.tcm.ClusterMetadata;
  import org.apache.cassandra.service.reads.DataResolver;
  import org.apache.cassandra.service.reads.repair.NoopReadRepair;
- import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
 +import org.apache.cassandra.tcm.Epoch;
 +import org.apache.cassandra.tcm.membership.NodeId;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.triggers.TriggerExecutor;
  import org.apache.cassandra.utils.CassandraVersion;
@@@ -394,43 -385,21 +395,48 @@@ public class Paxo
              return electorateNatural;
          }
  
 -        static Participants get(TableMetadata table, Token token, 
ConsistencyLevel consistencyForConsensus)
 +        @Override
 +        public boolean stillAppliesTo(ClusterMetadata newMetadata)
          {
 -            return get(table, token, consistencyForConsensus, 
FailureDetector.isReplicaAlive);
 +            if (newMetadata.epoch.equals(epoch))
 +                return true;
 +
 +            Participants newParticipants = recompute.apply(newMetadata);
 +            return newParticipants.electorate.equals(electorate);
 +        }
 +
 +        @Override
 +        public void collectSuccess(InetAddressAndPort inetAddressAndPort)
 +        {
 +
 +        }
 +
 +        @Override
 +        public void collectFailure(InetAddressAndPort inetAddressAndPort, 
RequestFailureReason t)
 +        {
 +
          }
  
 -        static Participants get(TableMetadata table, Token token, 
ConsistencyLevel consistencyForConsensus, Predicate<Replica> isReplicaAlive)
 +        static Participants get(ClusterMetadata metadata, TableMetadata 
table, Token token, ConsistencyLevel consistencyForConsensus)
+         {
 -            Keyspace keyspace = Keyspace.open(table.keyspace);
 -            ReplicaLayout.ForTokenWrite all = 
forTokenWriteLiveAndDown(keyspace, token);
++            return get(metadata, table, token, consistencyForConsensus, 
FailureDetector.isReplicaAlive);
++        }
++
++        static Participants get(ClusterMetadata metadata, TableMetadata 
table, Token token, ConsistencyLevel consistencyForConsensus, 
Predicate<Replica> isReplicaAlive)
 +        {
 +            KeyspaceMetadata keyspaceMetadata = 
metadata.schema.getKeyspaceMetadata(table.keyspace);
 +            ReplicaLayout.ForTokenWrite all = 
forTokenWriteLiveAndDown(keyspaceMetadata, token);
              ReplicaLayout.ForTokenWrite electorate = 
consistencyForConsensus.isDatacenterLocal()
                                                       ? 
all.filter(InOurDc.replicas()) : all;
  
-             EndpointsForToken live = 
all.all().filter(FailureDetector.isReplicaAlive);
+             EndpointsForToken live = all.all().filter(isReplicaAlive);
 +            return new Participants(metadata.epoch, 
Keyspace.open(table.keyspace), consistencyForConsensus, all, electorate, live,
 +                                    (cm) -> get(cm, table, token, 
consistencyForConsensus));
 +        }
  
 -            return new Participants(keyspace, consistencyForConsensus, all, 
electorate, live);
 +        static Participants get(TableMetadata table, Token token, 
ConsistencyLevel consistencyForConsensus)
 +        {
 +            return get(ClusterMetadata.current(), table, token, 
consistencyForConsensus);
          }
  
          static Participants get(TableMetadata cfm, DecoratedKey key, 
ConsistencyLevel consistency)
diff --cc src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
index 368c16211c,ae5bc557c7..fd220f83b7
--- a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
@@@ -667,16 -666,15 +668,16 @@@ public class PaxosRepair extends Abstra
          return result;
      }
  
-     static boolean validatePeerCompatibility(TableMetadata table, 
Range<Token> range)
+     static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata 
table, Range<Token> range)
      {
 -        Participants participants = Participants.get(table, range.right, 
ConsistencyLevel.SERIAL, r -> ctx.failureDetector().isAlive(r.endpoint()));
 -        return Iterables.all(participants.all, r -> 
validatePeerCompatibility(ctx.gossiper(), r));
 +        ClusterMetadata metadata = ClusterMetadata.current();
-         Participants participants = Participants.get(table, range.right, 
ConsistencyLevel.SERIAL);
++        Participants participants = Participants.get(metadata, table, 
range.right, ConsistencyLevel.SERIAL, r -> 
ctx.failureDetector().isAlive(r.endpoint()));
 +        return Iterables.all(participants.all, (participant) -> 
validatePeerCompatibility(metadata, participant));
      }
  
-     public static boolean validatePeerCompatibility(TableMetadata table, 
Collection<Range<Token>> ranges)
+     public static boolean validatePeerCompatibility(SharedContext ctx, 
TableMetadata table, Collection<Range<Token>> ranges)
      {
-         return Iterables.all(ranges, range -> 
validatePeerCompatibility(table, range));
+         return Iterables.all(ranges, range -> validatePeerCompatibility(ctx, 
table, range));
      }
  
      public static void shutdownAndWait(long timeout, TimeUnit units) throws 
InterruptedException, TimeoutException
diff --cc src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
index c71577d389,7b4163e2a2..feaa64bd1e
--- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
+++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
@@@ -24,7 -25,7 +24,6 @@@ import java.util.function.Consumer
  
  import com.google.common.base.Preconditions;
  import com.google.common.collect.Iterables;
--
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -33,8 -33,9 +31,8 @@@ import org.apache.cassandra.db.Keyspace
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.gms.EndpointState;
- import org.apache.cassandra.gms.Gossiper;
  import org.apache.cassandra.locator.InetAddressAndPort;
 -import org.apache.cassandra.locator.RangesAtEndpoint;
+ import org.apache.cassandra.repair.SharedContext;
  import org.apache.cassandra.schema.Schema;
  import org.apache.cassandra.schema.TableId;
  import org.apache.cassandra.schema.TableMetadata;
@@@ -114,10 -116,21 +113,11 @@@ public class PaxosCleanup extends Async
          executor.execute(complete);
      }
  
-     private static boolean isOutOfRange(String ksName, 
Collection<Range<Token>> repairRanges)
+     private static boolean isOutOfRange(SharedContext ctx, String ksName, 
Collection<Range<Token>> repairRanges)
      {
          Keyspace keyspace = Keyspace.open(ksName);
-         Collection<Range<Token>> localRanges = 
Range.normalize(ClusterMetadata.current().localWriteRanges(keyspace.getMetadata()));
 -        List<Range<Token>> localRanges = 
Range.normalize(keyspace.getReplicationStrategy()
 -                                                                 
.getAddressReplicas()
 -                                                                 
.get(ctx.broadcastAddressAndPort())
 -                                                                 .ranges());
 -
 -        RangesAtEndpoint pendingRanges = 
StorageService.instance.getTokenMetadata().getPendingRanges(ksName, 
ctx.broadcastAddressAndPort());
 -        if (!pendingRanges.isEmpty())
 -        {
 -            localRanges.addAll(pendingRanges.ranges());
 -            localRanges = Range.normalize(localRanges);
 -        }
++        Collection<Range<Token>> localRanges = 
Range.normalize(ClusterMetadata.current().writeRanges(keyspace.getMetadata(), 
ctx.broadcastAddressAndPort()));
+ 
          for (Range<Token> repairRange : Range.normalize(repairRanges))
          {
              if (!Iterables.any(localRanges, localRange -> 
localRange.contains(repairRange)))
diff --cc 
src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
index 33d6fbd173,2dbbc58d69..2eaeaf2537
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
@@@ -38,12 -38,10 +38,12 @@@ import org.apache.cassandra.io.util.Dat
  import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.net.IVerbHandler;
  import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.repair.SharedContext;
  import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.tcm.ClusterMetadata;
 +import org.apache.cassandra.tcm.ClusterMetadataService;
  import org.apache.cassandra.utils.UUIDSerializer;
  
- import static org.apache.cassandra.net.MessagingService.instance;
  import static org.apache.cassandra.net.NoPayload.noPayload;
  import static org.apache.cassandra.net.Verb.PAXOS2_CLEANUP_RSP2;
  
@@@ -70,43 -68,45 +70,47 @@@ public class PaxosCleanupReques
          this.ranges = rangesOrMin(ranges);
      }
  
-     public static final IVerbHandler<PaxosCleanupRequest> verbHandler = in -> 
{
-         PaxosCleanupRequest request = in.payload;
- 
-         if (!PaxosCleanup.isInRangeAndShouldProcess(request.ranges, 
request.tableId))
-         {
-             // Try catching up, in case it's us
-             
ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(ClusterMetadata.current(),
 in.from(),in.epoch());
- 
-             String msg = String.format("Rejecting cleanup request %s from %s. 
Some ranges are not replicated (%s)",
-                                        request.session, in.from(), 
request.ranges);
-             Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
msg));
-             instance().send(response, in.respondTo());
-             return;
-         }
- 
-         PaxosCleanupLocalCoordinator coordinator = 
PaxosCleanupLocalCoordinator.create(request);
- 
-         coordinator.addCallback(new FutureCallback<PaxosCleanupResponse>()
-         {
-             public void onSuccess(@Nullable PaxosCleanupResponse finished)
-             {
-                 Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow());
-                 instance().send(response, in.respondTo());
-             }
+     public static IVerbHandler<PaxosCleanupRequest> 
createVerbHandler(SharedContext ctx)
+     {
+         return in -> {
+             PaxosCleanupRequest request = in.payload;
  
-             public void onFailure(Throwable throwable)
+             if (!PaxosCleanup.isInRangeAndShouldProcess(ctx, request.ranges, 
request.tableId))
              {
-                 Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
throwable.getMessage()));
-                 instance().send(response, in.respondTo());
++                // Try catching up, in case it's us
++                
ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(ClusterMetadata.current(),
 in.from(),in.epoch());
++
+                 String msg = String.format("Rejecting cleanup request %s from 
%s. Some ranges are not replicated (%s)",
+                                            request.session, in.from(), 
request.ranges);
+                 Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
msg));
+                 ctx.messaging().send(response, in.respondTo());
+                 return;
              }
-         });
  
-         // ack the request so the coordinator knows we've started
-         instance().respond(noPayload, in);
+             PaxosCleanupLocalCoordinator coordinator = 
PaxosCleanupLocalCoordinator.create(ctx, request);
  
-         coordinator.start();
-     };
 -            coordinator.addCallback(new FutureCallback<>()
++            coordinator.addCallback(new FutureCallback<PaxosCleanupResponse>()
+             {
+                 public void onSuccess(@Nullable PaxosCleanupResponse finished)
+                 {
+                     Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, coordinator.getNow());
+                     ctx.messaging().send(response, in.respondTo());
+                 }
+ 
+                 public void onFailure(Throwable throwable)
+                 {
+                     Message<PaxosCleanupResponse> response = 
Message.out(PAXOS2_CLEANUP_RSP2, PaxosCleanupResponse.failed(request.session, 
throwable.getMessage()));
+                     ctx.messaging().send(response, in.respondTo());
+                 }
+             });
+ 
+             // ack the request so the coordinator knows we've started
+             ctx.messaging().respond(noPayload, in);
+ 
+             coordinator.start();
+         };
+     }
 -
+     public static final IVerbHandler<PaxosCleanupRequest> verbHandler = 
createVerbHandler(SharedContext.Global.instance);
  
      public static final IVersionedSerializer<PaxosCleanupRequest> serializer 
= new IVersionedSerializer<PaxosCleanupRequest>()
      {
diff --cc src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 3224b29466,0000000000..cc1636bf14
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@@ -1,1007 -1,0 +1,1012 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.tcm;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.Set;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.ImmutableSet;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CassandraRelevantProperties;
 +import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.locator.EndpointsForToken;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.net.CMSIdentifierMismatchException;
 +import org.apache.cassandra.schema.DistributedSchema;
 +import org.apache.cassandra.schema.KeyspaceMetadata;
 +import org.apache.cassandra.schema.Keyspaces;
 +import org.apache.cassandra.schema.ReplicationParams;
 +import org.apache.cassandra.tcm.extensions.ExtensionKey;
 +import org.apache.cassandra.tcm.extensions.ExtensionValue;
 +import org.apache.cassandra.tcm.membership.Directory;
 +import org.apache.cassandra.tcm.membership.Location;
 +import org.apache.cassandra.tcm.membership.NodeAddresses;
 +import org.apache.cassandra.tcm.membership.NodeId;
 +import org.apache.cassandra.tcm.membership.NodeState;
 +import org.apache.cassandra.tcm.membership.NodeVersion;
 +import org.apache.cassandra.tcm.ownership.DataPlacement;
 +import org.apache.cassandra.tcm.ownership.DataPlacements;
 +import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator;
 +import org.apache.cassandra.tcm.ownership.PlacementForRange;
 +import org.apache.cassandra.tcm.ownership.TokenMap;
 +import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
 +import org.apache.cassandra.tcm.sequences.InProgressSequences;
 +import org.apache.cassandra.tcm.sequences.LockedRanges;
 +import org.apache.cassandra.tcm.serialization.MetadataSerializer;
 +import org.apache.cassandra.tcm.serialization.Version;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.vint.VIntCoding;
 +
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR;
 +import static org.apache.cassandra.db.TypeSizes.sizeof;
 +
 +public class ClusterMetadata
 +{
 +    public static final int EMPTY_METADATA_IDENTIFIER = 0;
 +    public static final Serializer serializer = new Serializer();
 +
 +    public final int metadataIdentifier;
 +
 +    public final Epoch epoch;
 +    public final long period;
 +    public final boolean lastInPeriod;
 +    public final IPartitioner partitioner;       // Set during (initial) 
construction and not modifiable via Transformer
 +
 +    public final DistributedSchema schema;
 +    public final Directory directory;
 +    public final TokenMap tokenMap;
 +    public final DataPlacements placements;
 +    public final LockedRanges lockedRanges;
 +    public final InProgressSequences inProgressSequences;
 +    public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> 
extensions;
 +
 +    // These two fields are lazy but only for the test purposes, since their 
computation requires initialization of the log ks
 +    private Set<Replica> fullCMSReplicas;
 +    private Set<InetAddressAndPort> fullCMSEndpoints;
 +
 +    public ClusterMetadata(IPartitioner partitioner)
 +    {
 +        this(partitioner, Directory.EMPTY);
 +    }
 +
 +    @VisibleForTesting
 +    public ClusterMetadata(IPartitioner partitioner, Directory directory)
 +    {
 +        this(partitioner, directory, DistributedSchema.first());
 +    }
 +
 +    @VisibleForTesting
 +    public ClusterMetadata(IPartitioner partitioner, Directory directory, 
DistributedSchema schema)
 +    {
 +        this(EMPTY_METADATA_IDENTIFIER,
 +             Epoch.EMPTY,
 +             Period.EMPTY,
 +             true,
 +             partitioner,
 +             schema,
 +             directory,
 +             new TokenMap(partitioner),
 +             DataPlacements.EMPTY,
 +             LockedRanges.EMPTY,
 +             InProgressSequences.EMPTY,
 +             ImmutableMap.of());
 +    }
 +
 +    public ClusterMetadata(Epoch epoch,
 +                           long period,
 +                           boolean lastInPeriod,
 +                           IPartitioner partitioner,
 +                           DistributedSchema schema,
 +                           Directory directory,
 +                           TokenMap tokenMap,
 +                           DataPlacements placements,
 +                           LockedRanges lockedRanges,
 +                           InProgressSequences inProgressSequences,
 +                           Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
 +    {
 +        this(EMPTY_METADATA_IDENTIFIER,
 +             epoch,
 +             period,
 +             lastInPeriod,
 +             partitioner,
 +             schema,
 +             directory,
 +             tokenMap,
 +             placements,
 +             lockedRanges,
 +             inProgressSequences,
 +             extensions);
 +    }
 +
 +    private ClusterMetadata(int metadataIdentifier,
 +                           Epoch epoch,
 +                           long period,
 +                           boolean lastInPeriod,
 +                           IPartitioner partitioner,
 +                           DistributedSchema schema,
 +                           Directory directory,
 +                           TokenMap tokenMap,
 +                           DataPlacements placements,
 +                           LockedRanges lockedRanges,
 +                           InProgressSequences inProgressSequences,
 +                           Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
 +    {
 +        // TODO: token map is a feature of the specific placement strategy, 
and so may not be a relevant component of
 +        //  ClusterMetadata in the long term. We need to consider how the 
actual components of metadata can be evolved
 +        //  over time.
 +        assert tokenMap == null || 
tokenMap.partitioner().getClass().equals(partitioner.getClass()) : "Partitioner 
for TokenMap doesn't match base partitioner";
 +        this.metadataIdentifier = metadataIdentifier;
 +        this.epoch = epoch;
 +        this.period = period;
 +        this.lastInPeriod = lastInPeriod;
 +        this.partitioner = partitioner;
 +        this.schema = schema;
 +        this.directory = directory;
 +        this.tokenMap = tokenMap;
 +        this.placements = placements;
 +        this.lockedRanges = lockedRanges;
 +        this.inProgressSequences = inProgressSequences;
 +        this.extensions = ImmutableMap.copyOf(extensions);
 +    }
 +
 +    public Set<InetAddressAndPort> fullCMSMembers()
 +    {
 +        if (fullCMSEndpoints == null)
 +            this.fullCMSEndpoints = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet());
 +        return fullCMSEndpoints;
 +    }
 +
 +    public Set<Replica> fullCMSMembersAsReplicas()
 +    {
 +        if (fullCMSReplicas == null)
 +            this.fullCMSReplicas = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().flattenValues());
 +        return fullCMSReplicas;
 +    }
 +
 +    public boolean isCMSMember(InetAddressAndPort endpoint)
 +    {
 +        return fullCMSMembers().contains(endpoint);
 +    }
 +
 +    public Transformer transformer()
 +    {
 +        return new Transformer(this, this.nextEpoch(), false);
 +    }
 +
 +    public Transformer transformer(boolean sealPeriod)
 +    {
 +        return new Transformer(this, this.nextEpoch(), sealPeriod);
 +    }
 +
 +    public ClusterMetadata forceEpoch(Epoch epoch)
 +    {
 +        // In certain circumstances, the last modified epoch of the individual
 +        // components may have been updated beyond the epoch we're specifying 
here.
 +        // An example is the execution of an UnsafeJoin transformation, where 
the
 +        // sub-steps (Start/Mid/Finish) are executed in series, each updating 
a
 +        // single ClusterMetadata and its individual components. At the end 
of that
 +        // sequence, the CM epoch is then set forcibly to ensure the 
UnsafeJoin only
 +        // increments the published epoch by one. As each component has its 
own last
 +        // modified epoch, we may also need to coerce those, but only if they 
are
 +        // greater than the epoch we're forcing here.
 +        return new ClusterMetadata(metadataIdentifier,
 +                                   epoch,
 +                                   period,
 +                                   lastInPeriod,
 +                                   partitioner,
 +                                   capLastModified(schema, epoch),
 +                                   capLastModified(directory, epoch),
 +                                   capLastModified(tokenMap, epoch),
 +                                   capLastModified(placements, epoch),
 +                                   capLastModified(lockedRanges, epoch),
 +                                   capLastModified(inProgressSequences, 
epoch),
 +                                   capLastModified(extensions, epoch));
 +    }
 +
 +    public ClusterMetadata initializeClusterIdentifier(int clusterIdentifier)
 +    {
 +        if (this.metadataIdentifier != EMPTY_METADATA_IDENTIFIER)
 +            throw new IllegalStateException(String.format("Can only 
initialize cluster identifier once, but it was already set to %d", 
this.metadataIdentifier));
 +
 +        if (clusterIdentifier == EMPTY_METADATA_IDENTIFIER)
 +            throw new IllegalArgumentException("Can not initialize cluster 
with empty cluster identifier");
 +
 +        return new ClusterMetadata(clusterIdentifier,
 +                                   epoch,
 +                                   period,
 +                                   lastInPeriod,
 +                                   partitioner,
 +                                   schema,
 +                                   directory,
 +                                   tokenMap,
 +                                   placements,
 +                                   lockedRanges,
 +                                   inProgressSequences,
 +                                   extensions);
 +    }
 +
 +    public ClusterMetadata forcePeriod(long period)
 +    {
 +        return new ClusterMetadata(metadataIdentifier,
 +                                   epoch,
 +                                   period,
 +                                   false,
 +                                   partitioner,
 +                                   schema,
 +                                   directory,
 +                                   tokenMap,
 +                                   placements,
 +                                   lockedRanges,
 +                                   inProgressSequences,
 +                                   extensions);
 +    }
 +
 +    private static Map<ExtensionKey<?,?>, ExtensionValue<?>> 
capLastModified(Map<ExtensionKey<?,?>, ExtensionValue<?>> original, Epoch 
maxEpoch)
 +    {
 +        Map<ExtensionKey<?, ?>, ExtensionValue<?>> updated = new HashMap<>();
 +        original.forEach((key, value) -> {
 +            ExtensionValue<?> newValue = value == null || 
value.lastModified().isEqualOrBefore(maxEpoch)
 +                                         ? value
 +                                         : 
(ExtensionValue<?>)value.withLastModified(maxEpoch);
 +            updated.put(key, newValue);
 +        });
 +        return updated;
 +    }
 +
 +    @SuppressWarnings("unchecked")
 +    private static <V> V capLastModified(MetadataValue<V> value, Epoch 
maxEpoch)
 +    {
 +        return value == null || value.lastModified().isEqualOrBefore(maxEpoch)
 +               ? (V)value
 +               : value.withLastModified(maxEpoch);
 +    }
 +
 +    public Epoch nextEpoch()
 +    {
 +        return epoch.nextEpoch();
 +    }
 +
 +    public long nextPeriod()
 +    {
 +        return lastInPeriod ? period + 1 : period;
 +    }
 +
 +    public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm)
 +    {
 +        ClusterMetadata metadata = this;
 +        Iterator<MultiStepOperation<?>> iter = 
metadata.inProgressSequences.iterator();
 +        while (iter.hasNext())
 +        {
 +            Transformation.Result result = iter.next().applyTo(metadata);
 +            assert result.isSuccess();
 +            metadata = result.success().metadata;
 +        }
 +        return metadata.placements.get(ksm.params.replication);
 +    }
 +
 +    // TODO Remove this as it isn't really an equivalent to the previous 
concept of pending ranges
 +    public boolean hasPendingRangesFor(KeyspaceMetadata ksm, Token token)
 +    {
 +        PlacementForRange writes = 
placements.get(ksm.params.replication).writes;
 +        PlacementForRange reads = 
placements.get(ksm.params.replication).reads;
 +        return !reads.forToken(token).equals(writes.forToken(token));
 +    }
 +
 +    // TODO Remove this as it isn't really an equivalent to the previous 
concept of pending ranges
 +    public boolean hasPendingRangesFor(KeyspaceMetadata ksm, 
InetAddressAndPort endpoint)
 +    {
 +        PlacementForRange writes = 
placements.get(ksm.params.replication).writes;
 +        PlacementForRange reads = 
placements.get(ksm.params.replication).reads;
 +        return 
!writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint));
 +    }
 +
 +    public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata 
metadata)
 +    {
-         return 
placements.get(metadata.params.replication).writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()).ranges();
++        return writeRanges(metadata, 
FBUtilities.getBroadcastAddressAndPort());
++    }
++
++    public Collection<Range<Token>> writeRanges(KeyspaceMetadata metadata, 
InetAddressAndPort peer)
++    {
++        return 
placements.get(metadata.params.replication).writes.byEndpoint().get(peer).ranges();
 +    }
 +
 +    // TODO Remove this as it isn't really an equivalent to the previous 
concept of pending ranges
 +    public Map<Range<Token>, VersionedEndpoints.ForRange> 
pendingRanges(KeyspaceMetadata metadata)
 +    {
 +        Map<Range<Token>, VersionedEndpoints.ForRange> map = new HashMap<>();
 +        PlacementForRange writes = 
placements.get(metadata.params.replication).writes;
 +        PlacementForRange reads = 
placements.get(metadata.params.replication).reads;
 +
 +        // first, pending ranges as the result of range splitting or merging
 +        // i.e. new ranges being created through join/leave
 +        List<Range<Token>> pending = new ArrayList<>(writes.ranges());
 +        pending.removeAll(reads.ranges());
 +        for (Range<Token> p : pending)
 +            map.put(p, 
placements.get(metadata.params.replication).writes.forRange(p));
 +
 +        // next, ranges where the ranges themselves are not changing, but the 
replicas are
 +        // i.e. replacement or RF increase
 +        writes.replicaGroups().forEach((range, endpoints) -> {
 +            VersionedEndpoints.ForRange readGroup = reads.forRange(range);
 +            if (!readGroup.equals(endpoints))
 +                map.put(range, 
VersionedEndpoints.forRange(endpoints.lastModified(),
 +                                                           
endpoints.get().filter(r -> !readGroup.get().contains(r))));
 +        });
 +
 +        return map;
 +    }
 +
 +    // TODO Remove this as it isn't really an equivalent to the previous 
concept of pending endpoints
 +    public VersionedEndpoints.ForToken pendingEndpointsFor(KeyspaceMetadata 
metadata, Token t)
 +    {
 +        VersionedEndpoints.ForToken writeEndpoints = 
placements.get(metadata.params.replication).writes.forToken(t);
 +        VersionedEndpoints.ForToken readEndpoints = 
placements.get(metadata.params.replication).reads.forToken(t);
 +        EndpointsForToken.Builder endpointsForToken = 
writeEndpoints.get().newBuilder(writeEndpoints.size() - readEndpoints.size());
 +
 +        for (Replica writeReplica : writeEndpoints.get())
 +        {
 +            if (!readEndpoints.get().contains(writeReplica))
 +                endpointsForToken.add(writeReplica);
 +        }
 +        return VersionedEndpoints.forToken(writeEndpoints.lastModified(), 
endpointsForToken.build());
 +    }
 +
 +    public static class Transformer
 +    {
 +        private final ClusterMetadata base;
 +        private final Epoch epoch;
 +        private final long period;
 +        private final boolean lastInPeriod;
 +        private final IPartitioner partitioner;
 +        private DistributedSchema schema;
 +        private Directory directory;
 +        private TokenMap tokenMap;
 +        private DataPlacements placements;
 +        private LockedRanges lockedRanges;
 +        private InProgressSequences inProgressSequences;
 +        private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions;
 +        private final Set<MetadataKey> modifiedKeys;
 +
 +        private Transformer(ClusterMetadata metadata, Epoch epoch, boolean 
lastInPeriod)
 +        {
 +            this.base = metadata;
 +            this.epoch = epoch;
 +            this.period = metadata.nextPeriod();
 +            this.lastInPeriod = lastInPeriod;
 +            this.partitioner = metadata.partitioner;
 +            this.schema = metadata.schema;
 +            this.directory = metadata.directory;
 +            this.tokenMap = metadata.tokenMap;
 +            this.placements = metadata.placements;
 +            this.lockedRanges = metadata.lockedRanges;
 +            this.inProgressSequences = metadata.inProgressSequences;
 +            extensions = new HashMap<>(metadata.extensions);
 +            modifiedKeys = new HashSet<>();
 +        }
 +
 +        public Transformer with(DistributedSchema schema)
 +        {
 +            this.schema = schema;
 +            return this;
 +        }
 +
 +        public Transformer with(Directory directory)
 +        {
 +            this.directory = directory;
 +            return this;
 +        }
 +
 +        public Transformer register(NodeAddresses addresses, Location 
location, NodeVersion version)
 +        {
 +            directory = directory.with(addresses, location, version);
 +            return this;
 +        }
 +
 +        public Transformer unregister(NodeId nodeId)
 +        {
 +            directory = directory.without(nodeId);
 +            return this;
 +        }
 +
 +        public Transformer withNewAddresses(NodeId nodeId, NodeAddresses 
addresses)
 +        {
 +            directory = directory.withNodeAddresses(nodeId, addresses);
 +            return this;
 +        }
 +
 +        public Transformer withVersion(NodeId nodeId, NodeVersion version)
 +        {
 +            directory = directory.withNodeVersion(nodeId, version);
 +            return this;
 +        }
 +
 +        public Transformer withNodeState(NodeId id, NodeState state)
 +        {
 +            directory = directory.withNodeState(id, state);
 +            return this;
 +        }
 +
 +        public Transformer proposeToken(NodeId nodeId, Collection<Token> 
tokens)
 +        {
 +            tokenMap = tokenMap.assignTokens(nodeId, tokens);
 +            return this;
 +        }
 +
 +        public Transformer addToRackAndDC(NodeId nodeId)
 +        {
 +            directory = directory.withRackAndDC(nodeId);
 +            return this;
 +        }
 +
 +        public Transformer unproposeTokens(NodeId nodeId)
 +        {
 +            tokenMap = tokenMap.unassignTokens(nodeId);
 +            directory = directory.withoutRackAndDC(nodeId);
 +            return this;
 +        }
 +
 +        public Transformer moveTokens(NodeId nodeId, Collection<Token> tokens)
 +        {
 +            tokenMap = tokenMap.unassignTokens(nodeId)
 +                               .assignTokens(nodeId, tokens);
 +            return this;
 +        }
 +
 +        public Transformer join(NodeId nodeId)
 +        {
 +            directory = directory.withNodeState(nodeId, NodeState.JOINED);
 +            return this;
 +        }
 +
 +        public Transformer replaced(NodeId replaced, NodeId replacement)
 +        {
 +            Collection<Token> transferringTokens = tokenMap.tokens(replaced);
 +            tokenMap = tokenMap.unassignTokens(replaced)
 +                               .assignTokens(replacement, transferringTokens);
 +            directory = directory.without(replaced)
 +                                 .withRackAndDC(replacement)
 +                                 .withNodeState(replacement, 
NodeState.JOINED);
 +            return this;
 +        }
 +
 +        public Transformer proposeRemoveNode(NodeId id)
 +        {
 +            tokenMap = tokenMap.unassignTokens(id);
 +            return this;
 +        }
 +
 +        public Transformer left(NodeId id)
 +        {
 +            tokenMap = tokenMap.unassignTokens(id);
 +            directory = directory.withNodeState(id, NodeState.LEFT)
 +                                 .withoutRackAndDC(id);
 +            return this;
 +        }
 +
 +        public Transformer with(DataPlacements placements)
 +        {
 +            this.placements = placements;
 +            return this;
 +        }
 +
 +        public Transformer with(LockedRanges lockedRanges)
 +        {
 +            this.lockedRanges = lockedRanges;
 +            return this;
 +        }
 +
 +        public Transformer with(InProgressSequences sequences)
 +        {
 +            this.inProgressSequences = sequences;
 +            return this;
 +        }
 +
 +        public Transformer with(ExtensionKey<?, ?> key, ExtensionValue<?> obj)
 +        {
 +            if (MetadataKeys.CORE_METADATA.contains(key))
 +                throw new IllegalArgumentException("Core cluster metadata 
objects should be addressed directly, " +
 +                                                   "not using the associated 
MetadataKey");
 +
 +            if (!key.valueType.isInstance(obj))
 +                throw new IllegalArgumentException("Value of type " + 
obj.getClass() +
 +                                                   " is incompatible with 
type for key " + key +
 +                                                   " (" + key.valueType + 
")");
 +
 +            extensions.put(key, obj);
 +            modifiedKeys.add(key);
 +            return this;
 +        }
 +
 +        public Transformer withIfAbsent(ExtensionKey<?, ?> key, 
ExtensionValue<?> obj)
 +        {
 +            if (extensions.containsKey(key))
 +                return this;
 +            return with(key, obj);
 +        }
 +
 +        public Transformer without(ExtensionKey<?, ?> key)
 +        {
 +            if (MetadataKeys.CORE_METADATA.contains(key))
 +                throw new IllegalArgumentException("Core cluster metadata 
objects should be addressed directly, " +
 +                                                   "not using the associated 
MetadataKey");
 +            if (extensions.remove(key) != null)
 +                modifiedKeys.add(key);
 +            return this;
 +        }
 +
 +        public Transformed build()
 +        {
 +            // Process extension first as a) these are actually mutable and 
b) they are added to the set of
 +            // modified keys when added/updated/removed
 +            for (MetadataKey key : modifiedKeys)
 +            {
 +                ExtensionValue<?> mutable = extensions.get(key);
 +                if (null != mutable)
 +                    mutable.withLastModified(epoch);
 +            }
 +
 +            if (schema != base.schema)
 +            {
 +                modifiedKeys.add(MetadataKeys.SCHEMA);
 +                schema = schema.withLastModified(epoch);
 +            }
 +
 +            if (directory != base.directory)
 +            {
 +                modifiedKeys.add(MetadataKeys.NODE_DIRECTORY);
 +                directory = directory.withLastModified(epoch);
 +            }
 +
 +            if (tokenMap != base.tokenMap)
 +            {
 +                modifiedKeys.add(MetadataKeys.TOKEN_MAP);
 +                tokenMap = tokenMap.withLastModified(epoch);
 +            }
 +
 +            if (placements != base.placements)
 +            {
 +                modifiedKeys.add(MetadataKeys.DATA_PLACEMENTS);
 +                // sort all endpoint lists to preserve primary replica
 +                if 
(CassandraRelevantProperties.TCM_SORT_REPLICA_GROUPS.getBoolean())
 +                {
 +                    PrimaryRangeComparator comparator = new 
PrimaryRangeComparator(tokenMap, directory);
 +                    placements = DataPlacements.sortReplicaGroups(placements, 
comparator);
 +                }
 +                placements = placements.withLastModified(epoch);
 +            }
 +
 +            if (lockedRanges != base.lockedRanges)
 +            {
 +                modifiedKeys.add(MetadataKeys.LOCKED_RANGES);
 +                lockedRanges = lockedRanges.withLastModified(epoch);
 +            }
 +
 +            if (inProgressSequences != base.inProgressSequences)
 +            {
 +                modifiedKeys.add(MetadataKeys.IN_PROGRESS_SEQUENCES);
 +                inProgressSequences = 
inProgressSequences.withLastModified(epoch);
 +            }
 +
 +            return new Transformed(new 
ClusterMetadata(base.metadataIdentifier,
 +                                                       epoch,
 +                                                       period,
 +                                                       lastInPeriod,
 +                                                       partitioner,
 +                                                       schema,
 +                                                       directory,
 +                                                       tokenMap,
 +                                                       placements,
 +                                                       lockedRanges,
 +                                                       inProgressSequences,
 +                                                       extensions),
 +                                   ImmutableSet.copyOf(modifiedKeys));
 +        }
 +
 +        public ClusterMetadata buildForGossipMode()
 +        {
 +            return new ClusterMetadata(base.metadataIdentifier,
 +                                       Epoch.UPGRADE_GOSSIP,
 +                                       Period.EMPTY,
 +                                       true,
 +                                       partitioner,
 +                                       schema,
 +                                       directory,
 +                                       tokenMap,
 +                                       placements,
 +                                       lockedRanges,
 +                                       inProgressSequences,
 +                                       extensions);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return "Transformer{" +
 +                   "baseEpoch=" + base.epoch +
 +                   ", epoch=" + epoch +
 +                   ", lastInPeriod=" + lastInPeriod +
 +                   ", partitioner=" + partitioner +
 +                   ", schema=" + schema +
 +                   ", directory=" + schema +
 +                   ", tokenMap=" + tokenMap +
 +                   ", placement=" + placements +
 +                   ", lockedRanges=" + lockedRanges +
 +                   ", inProgressSequences=" + inProgressSequences +
 +                   ", extensions=" + extensions +
 +                   ", modifiedKeys=" + modifiedKeys +
 +                   '}';
 +        }
 +
 +        public static class Transformed
 +        {
 +            public final ClusterMetadata metadata;
 +            public final ImmutableSet<MetadataKey> modifiedKeys;
 +
 +            public Transformed(ClusterMetadata metadata, 
ImmutableSet<MetadataKey> modifiedKeys)
 +            {
 +                this.metadata = metadata;
 +                this.modifiedKeys = modifiedKeys;
 +            }
 +        }
 +    }
 +
 +    public String legacyToString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        Set<Pair<Token, InetAddressAndPort>> normal = new HashSet<>();
 +        Set<Pair<Token, InetAddressAndPort>> bootstrapping = new HashSet<>();
 +        Set<InetAddressAndPort> leaving = new HashSet<>();
 +
 +        for (Map.Entry<NodeId, NodeState> entry : directory.states.entrySet())
 +        {
 +            InetAddressAndPort endpoint = directory.endpoint(entry.getKey());
 +            switch (entry.getValue())
 +            {
 +                case BOOTSTRAPPING:
 +                    for (Token t : tokenMap.tokens(entry.getKey()))
 +                        bootstrapping.add(Pair.create(t, endpoint));
 +                    break;
 +                case LEAVING:
 +                    leaving.add(endpoint);
 +                    break;
 +                case JOINED:
 +                    for (Token t : tokenMap.tokens(entry.getKey()))
 +                        normal.add(Pair.create(t, endpoint));
 +                    break;
 +                case MOVING:
 +                    // todo when adding MOVE
 +                    break;
 +            }
 +        }
 +
 +        if (!normal.isEmpty())
 +        {
 +            sb.append("Normal Tokens:");
 +            sb.append(LINE_SEPARATOR.getString());
 +            for (Pair<Token, InetAddressAndPort> ep : normal)
 +            {
 +                sb.append(ep.right);
 +                sb.append(':');
 +                sb.append(ep.left);
 +                sb.append(LINE_SEPARATOR.getString());
 +            }
 +        }
 +
 +        if (!bootstrapping.isEmpty())
 +        {
 +            sb.append("Bootstrapping Tokens:" );
 +            sb.append(LINE_SEPARATOR.getString());
 +            for (Pair<Token, InetAddressAndPort> entry : bootstrapping)
 +            {
 +                sb.append(entry.right).append(':').append(entry.left);
 +                sb.append(LINE_SEPARATOR.getString());
 +            }
 +        }
 +
 +        if (!leaving.isEmpty())
 +        {
 +            sb.append("Leaving Endpoints:");
 +            sb.append(LINE_SEPARATOR.getString());
 +            for (InetAddressAndPort ep : leaving)
 +            {
 +                sb.append(ep);
 +                sb.append(LINE_SEPARATOR.getString());
 +            }
 +        }
 +        return sb.toString();
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return "ClusterMetadata{" +
 +               "epoch=" + epoch +
 +               ", schema=" + schema +
 +               ", directory=" + directory +
 +               ", tokenMap=" + tokenMap +
 +               ", placements=" + placements +
 +               ", lockedRanges=" + lockedRanges +
 +               '}';
 +    }
 +
 +    @Override
 +    public boolean equals(Object o)
 +    {
 +        if (this == o) return true;
 +        if (!(o instanceof ClusterMetadata)) return false;
 +        ClusterMetadata that = (ClusterMetadata) o;
 +        return epoch.equals(that.epoch) &&
 +               lastInPeriod == that.lastInPeriod &&
 +               schema.equals(that.schema) &&
 +               directory.equals(that.directory) &&
 +               tokenMap.equals(that.tokenMap) &&
 +               placements.equals(that.placements) &&
 +               lockedRanges.equals(that.lockedRanges) &&
 +               inProgressSequences.equals(that.inProgressSequences) &&
 +               extensions.equals(that.extensions);
 +    }
 +
 +    private static final Logger logger = 
LoggerFactory.getLogger(ClusterMetadata.class);
 +
 +    public void dumpDiff(ClusterMetadata other)
 +    {
 +        if (!epoch.equals(other.epoch))
 +        {
 +            logger.warn("Epoch {} != {}", epoch, other.epoch);
 +        }
 +        if (lastInPeriod != other.lastInPeriod)
 +        {
 +            logger.warn("lastInPeriod {} != {}", lastInPeriod, 
other.lastInPeriod);
 +        }
 +        if (!schema.equals(other.schema))
 +        {
 +            Keyspaces.KeyspacesDiff diff = 
Keyspaces.diff(schema.getKeyspaces(), other.schema.getKeyspaces());
 +            logger.warn("Schemas differ {}", diff);
 +        }
 +        if (!directory.equals(other.directory))
 +        {
 +            logger.warn("Directories differ:");
 +            directory.dumpDiff(other.directory);
 +        }
 +        if (!tokenMap.equals(other.tokenMap))
 +        {
 +            logger.warn("Token maps differ:");
 +            tokenMap.dumpDiff(other.tokenMap);
 +        }
 +        if (!placements.equals(other.placements))
 +        {
 +            logger.warn("Placements differ:");
 +            placements.dumpDiff(other.placements);
 +        }
 +        if (!lockedRanges.equals(other.lockedRanges))
 +        {
 +            logger.warn("Locked ranges differ: {} != {}", lockedRanges, 
other.lockedRanges);
 +        }
 +        if (!inProgressSequences.equals(other.inProgressSequences))
 +        {
 +            logger.warn("In progress sequences differ: {} != {}", 
inProgressSequences, other.inProgressSequences);
 +        }
 +        if (!extensions.equals(other.extensions))
 +        {
 +            logger.warn("Extensions differ: {} != {}", extensions, 
other.extensions);
 +        }
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        return Objects.hash(epoch, lastInPeriod, schema, directory, tokenMap, 
placements, lockedRanges, inProgressSequences, extensions);
 +    }
 +
 +    public static ClusterMetadata current()
 +    {
 +        return ClusterMetadataService.instance().metadata();
 +    }
 +
 +    public static void checkIdentifier(int remoteIdentifier)
 +    {
 +        ClusterMetadata metadata = currentNullable();
 +        if (metadata != null)
 +        {
 +            int currentIdentifier = metadata.metadataIdentifier;
 +            // We haven't yet joined CMS fully
 +            if (currentIdentifier == EMPTY_METADATA_IDENTIFIER)
 +                return;
 +
 +            // Peer hasn't yet joined CMS fully
 +            if (remoteIdentifier == EMPTY_METADATA_IDENTIFIER)
 +                return;
 +
 +            if (currentIdentifier != remoteIdentifier)
 +                throw new 
CMSIdentifierMismatchException(String.format("Cluster Metadata Identifier 
mismatch. Node is attempting to communicate with a node from a different 
cluster. Current identifier %d. Remote identifier: %d", currentIdentifier, 
remoteIdentifier));
 +        }
 +    }
 +
 +    /**
 +     * Startup of some services may race with cluster metadata 
initialization. We allow those services to
 +     * gracefully handle scenarios when it is not yet initialized.
 +     */
 +    public static ClusterMetadata currentNullable()
 +    {
 +        ClusterMetadataService service = ClusterMetadataService.instance();
 +        if (service == null)
 +            return null;
 +        return service.metadata();
 +    }
 +
 +    public NodeId myNodeId()
 +    {
 +        return directory.peerId(FBUtilities.getBroadcastAddressAndPort());
 +    }
 +
 +    public NodeState myNodeState()
 +    {
 +        NodeId nodeId = myNodeId();
 +        if (myNodeId() != null)
 +            return directory.peerState(nodeId);
 +        return null;
 +    }
 +
 +    public boolean metadataSerializationUpgradeInProgress()
 +    {
 +        return 
!directory.clusterMaxVersion.serializationVersion().equals(directory.clusterMinVersion.serializationVersion());
 +    }
 +
 +    public static class Serializer implements 
MetadataSerializer<ClusterMetadata>
 +    {
 +        @Override
 +        public void serialize(ClusterMetadata metadata, DataOutputPlus out, 
Version version) throws IOException
 +        {
 +            if (version.isAtLeast(Version.V1))
 +                
out.writeUTF(metadata.partitioner.getClass().getCanonicalName());
 +
 +            if (version.isAtLeast(Version.V2))
 +                out.writeUnsignedVInt32(metadata.metadataIdentifier);
 +
 +            Epoch.serializer.serialize(metadata.epoch, out);
 +            out.writeUnsignedVInt(metadata.period);
 +            out.writeBoolean(metadata.lastInPeriod);
 +
 +            if (version.isBefore(Version.V1))
 +                
out.writeUTF(metadata.partitioner.getClass().getCanonicalName());
 +
 +            DistributedSchema.serializer.serialize(metadata.schema, out, 
version);
 +            Directory.serializer.serialize(metadata.directory, out, version);
 +            TokenMap.serializer.serialize(metadata.tokenMap, out, version);
 +            DataPlacements.serializer.serialize(metadata.placements, out, 
version);
 +            LockedRanges.serializer.serialize(metadata.lockedRanges, out, 
version);
 +            
InProgressSequences.serializer.serialize(metadata.inProgressSequences, out, 
version);
 +            out.writeInt(metadata.extensions.size());
 +            for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : 
metadata.extensions.entrySet())
 +            {
 +                ExtensionKey<?, ?> key = entry.getKey();
 +                ExtensionValue<?> value = entry.getValue();
 +                ExtensionKey.serializer.serialize(key, out, version);
 +                assert key.valueType.isInstance(value);
 +                value.serialize(out, version);
 +            }
 +        }
 +
 +        @Override
 +        public ClusterMetadata deserialize(DataInputPlus in, Version version) 
throws IOException
 +        {
 +            IPartitioner partitioner = null;
 +            if (version.isAtLeast(Version.V1))
 +                partitioner = FBUtilities.newPartitioner(in.readUTF());
 +
 +            int clusterIdentifier = EMPTY_METADATA_IDENTIFIER;
 +            if (version.isAtLeast(Version.V2))
 +            {
 +                clusterIdentifier = in.readUnsignedVInt32();
 +                checkIdentifier(clusterIdentifier);
 +            }
 +
 +            Epoch epoch = Epoch.serializer.deserialize(in);
 +            long period = in.readUnsignedVInt();
 +            boolean lastInPeriod = in.readBoolean();
 +
 +            if (version.isBefore(Version.V1))
 +                partitioner = FBUtilities.newPartitioner(in.readUTF());
 +
 +            DistributedSchema schema = 
DistributedSchema.serializer.deserialize(in, version);
 +            Directory dir = Directory.serializer.deserialize(in, version);
 +            TokenMap tokenMap = TokenMap.serializer.deserialize(in, version);
 +            DataPlacements placements = 
DataPlacements.serializer.deserialize(in, version);
 +            LockedRanges lockedRanges = 
LockedRanges.serializer.deserialize(in, version);
 +            InProgressSequences ips = 
InProgressSequences.serializer.deserialize(in, version);
 +            int items = in.readInt();
 +            Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new 
HashMap<>(items);
 +            for (int i = 0; i < items; i++)
 +            {
 +                ExtensionKey<?, ?> key = 
ExtensionKey.serializer.deserialize(in, version);
 +                ExtensionValue<?> value = key.newValue();
 +                value.deserialize(in, version);
 +                extensions.put(key, value);
 +            }
 +            return new ClusterMetadata(clusterIdentifier,
 +                                       epoch,
 +                                       period,
 +                                       lastInPeriod,
 +                                       partitioner,
 +                                       schema,
 +                                       dir,
 +                                       tokenMap,
 +                                       placements,
 +                                       lockedRanges,
 +                                       ips,
 +                                       extensions);
 +        }
 +
 +        @Override
 +        public long serializedSize(ClusterMetadata metadata, Version version)
 +        {
 +            long size = TypeSizes.INT_SIZE;
 +            for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : 
metadata.extensions.entrySet())
 +                size += 
ExtensionKey.serializer.serializedSize(entry.getKey(), version) +
 +                        entry.getValue().serializedSize(version);
 +
 +            if (version.isAtLeast(Version.V2))
 +                size += 
TypeSizes.sizeofUnsignedVInt(metadata.metadataIdentifier);
 +
 +            size += Epoch.serializer.serializedSize(metadata.epoch) +
 +                    VIntCoding.computeUnsignedVIntSize(metadata.period) +
 +                    TypeSizes.BOOL_SIZE +
 +                    
sizeof(metadata.partitioner.getClass().getCanonicalName()) +
 +                    
DistributedSchema.serializer.serializedSize(metadata.schema, version) +
 +                    Directory.serializer.serializedSize(metadata.directory, 
version) +
 +                    TokenMap.serializer.serializedSize(metadata.tokenMap, 
version) +
 +                    
DataPlacements.serializer.serializedSize(metadata.placements, version) +
 +                    
LockedRanges.serializer.serializedSize(metadata.lockedRanges, version) +
 +                    
InProgressSequences.serializer.serializedSize(metadata.inProgressSequences, 
version);
 +
 +            return size;
 +        }
 +
 +        public static IPartitioner getPartitioner(DataInputPlus in, Version 
version) throws IOException
 +        {
 +            if (version.isAtLeast(Version.V1))
 +                return FBUtilities.newPartitioner(in.readUTF());
 +
 +            Epoch.serializer.deserialize(in);
 +            in.readUnsignedVInt();
 +            in.readBoolean();
 +            return FBUtilities.newPartitioner(in.readUTF());
 +        }
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
index 6ec42d40b5,86b8a5199d..8e3db2039f
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
@@@ -57,8 -56,13 +57,9 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.IInvokableInstance;
  import org.apache.cassandra.distributed.api.IMessageFilters;
  import org.apache.cassandra.gms.FailureDetector;
 -import org.apache.cassandra.gms.Gossiper;
  import org.apache.cassandra.locator.InetAddressAndPort;
 -import org.apache.cassandra.gms.ApplicationState;
 -import org.apache.cassandra.gms.EndpointState;
 -import org.apache.cassandra.gms.VersionedValue;
  import org.apache.cassandra.repair.RepairParallelism;
+ import org.apache.cassandra.repair.SharedContext;
  import org.apache.cassandra.repair.messages.RepairOption;
  import org.apache.cassandra.schema.Schema;
  import org.apache.cassandra.schema.TableMetadata;
diff --cc test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index 0c26b3f279,81b341bed9..ff205554f8
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@@ -56,9 -57,7 +56,8 @@@ import accord.utils.DefaultRandom
  import accord.utils.Gen;
  import accord.utils.Gens;
  import accord.utils.RandomSource;
- import org.agrona.collections.Long2ObjectHashMap;
  import org.agrona.collections.LongHashSet;
 +import org.apache.cassandra.ServerTestUtils;
  import org.apache.cassandra.concurrent.ExecutorBuilder;
  import org.apache.cassandra.concurrent.ExecutorBuilderFactory;
  import org.apache.cassandra.concurrent.ExecutorFactory;
@@@ -119,7 -118,15 +118,14 @@@ import org.apache.cassandra.schema.Tabl
  import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.schema.Tables;
  import org.apache.cassandra.service.ActiveRepairService;
 -import org.apache.cassandra.service.PendingRangeCalculatorService;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupComplete;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupHistory;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupRequest;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosFinishPrepareCleanup;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosStartPrepareCleanup;
  import org.apache.cassandra.streaming.StreamEventHandler;
  import org.apache.cassandra.streaming.StreamReceiveException;
  import org.apache.cassandra.streaming.StreamSession;
@@@ -1195,6 -1312,18 +1308,12 @@@ public abstract class FuzzTestBase exte
              {
                  return streamExecutor;
              }
+ 
 -            @Override
 -            public PendingRangeCalculatorService pendingRangeCalculator()
 -            {
 -                return PendingRangeCalculatorService.instance;
 -            }
 -
+             @Override
+             public PaxosRepairState paxosRepairState()
+             {
+                 return paxosRepairState;
+             }
          }
  
          private Message serde(Message msg)
@@@ -1282,15 -1411,13 +1401,20 @@@
                          next = it.next();
                      }
                      if 
(FuzzTestBase.class.getName().equals(next.getClassName())) return 
Access.MAIN_THREAD_ONLY;
-                     if 
(next.getClassName().startsWith("org.apache.cassandra.db.") ||
-                         
next.getClassName().startsWith("org.apache.cassandra.gms.") ||
-                         
next.getClassName().startsWith("org.apache.cassandra.cql3.") ||
-                         
next.getClassName().startsWith("org.apache.cassandra.metrics.") ||
-                         
next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.") ||
-                         
next.getClassName().startsWith("org.apache.cassandra.tcm") ||
-                         
next.getClassName().startsWith("org.apache.cassandra.utils.TimeUUID") ||
-                         
next.getClassName().startsWith("org.apache.cassandra.schema") ||
-                         
next.getClassName().startsWith(PendingAntiCompaction.class.getName()))
++
+                     // this is non-deterministic... but since the scope of 
the work is testing repair and not paxos... this is unblocked for now...
+                     if 
(("org.apache.cassandra.service.paxos.Paxos".equals(next.getClassName()) && 
"newBallot".equals(next.getMethodName()))
+                         || 
("org.apache.cassandra.service.paxos.uncommitted.PaxosBallotTracker".equals(next.getClassName())
 && "updateLowBound".equals(next.getMethodName())))
+                         return Access.MAIN_THREAD_ONLY;
 -                    if 
(next.getClassName().startsWith("org.apache.cassandra.db.") || 
next.getClassName().startsWith("org.apache.cassandra.gms.") || 
next.getClassName().startsWith("org.apache.cassandra.cql3.") || 
next.getClassName().startsWith("org.apache.cassandra.metrics.") || 
next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.")
++                    if 
(next.getClassName().startsWith("org.apache.cassandra.db.")
++                        || 
next.getClassName().startsWith("org.apache.cassandra.gms.")
++                        || 
next.getClassName().startsWith("org.apache.cassandra.cql3.")
++                        || 
next.getClassName().startsWith("org.apache.cassandra.metrics.")
++                        || 
next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.")
++                        || 
next.getClassName().startsWith("org.apache.cassandra.tcm")
+                         || 
next.getClassName().startsWith("org.apache.cassandra.utils.TimeUUID") // this 
would be good to solve
++                        || 
next.getClassName().startsWith("org.apache.cassandra.schema")
+                         || 
next.getClassName().startsWith(PendingAntiCompaction.class.getName()))
                          return Access.IGNORE;
                      if 
(next.getClassName().startsWith("org.apache.cassandra.repair") || 
ActiveRepairService.class.getName().startsWith(next.getClassName()))
                          return Access.REJECT;
diff --cc test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 20bd0c14a5,872ee99abb..36c17855e3
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@@ -40,7 -40,7 +40,8 @@@ import com.google.common.util.concurren
  
  import org.apache.cassandra.repair.messages.SyncResponse;
  import org.apache.cassandra.repair.messages.ValidationResponse;
 +import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
  import org.assertj.core.api.Assertions;
  import org.junit.After;
  import org.junit.Before;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to