This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new d8174c7105 CEP-15 (C*) Integrate accord with repair
d8174c7105 is described below

commit d8174c71054ade99b0d5209c3ca81f7300c5899e
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Thu Feb 15 10:52:55 2024 -0800

    CEP-15 (C*) Integrate accord with repair
    
    Patch by Blake Eggleston; Reviewed by Ariel Weisberg and David Capwell for 
CASSANDRA-19472
---
 .gitmodules                                        |   4 +-
 modules/accord                                     |   2 +-
 .../config/CassandraRelevantProperties.java        |   1 +
 .../db/streaming/CassandraStreamReceiver.java      |   2 +-
 .../org/apache/cassandra/metrics/TableMetrics.java |  12 +-
 .../apache/cassandra/repair/AbstractRepairJob.java |  66 ----
 .../cassandra/repair/AbstractRepairTask.java       |   3 +-
 .../apache/cassandra/repair/RepairCoordinator.java |  18 +
 .../{CassandraRepairJob.java => RepairJob.java}    | 103 ++++-
 .../org/apache/cassandra/repair/RepairSession.java |  50 +--
 .../cassandra/repair/messages/RepairOption.java    |  51 ++-
 .../cassandra/repair/state/CoordinatorState.java   |   4 -
 .../org/apache/cassandra/schema/TableMetadata.java |   7 +
 .../cassandra/service/ActiveRepairService.java     |   5 +-
 .../cassandra/service/accord/AccordService.java    |  69 +++-
 .../cassandra/service/accord/IAccordService.java   |   8 +
 .../accord/repair/AccordRepair.java}               | 104 ++++--
 .../accord/repair/RepairSyncPointAdapter.java      |  79 ++++
 .../accord/repair/RequiredResponseTracker.java     |  79 ++++
 .../accord/serializers/ReadDataSerializers.java    |   3 +
 .../migration/ConsensusMigrationRepairResult.java  |  25 +-
 .../migration/ConsensusMigrationRepairType.java    |   5 +-
 .../migration/ConsensusMigrationTarget.java        |  11 +
 .../migration/ConsensusTableMigration.java         |   7 +-
 ...beFinishConsensusMigrationForTableAndRange.java |   8 +-
 .../apache/cassandra/tools/nodetool/Repair.java    |   4 +
 .../test/OptimiseStreamsRepairTest.java            |   4 +-
 .../test/accord/AccordIncrementalRepairTest.java   | 415 +++++++++++++++++++++
 .../test/accord/AccordMigrationTest.java           |   2 +-
 .../simulator/cluster/OnInstanceRepair.java        |   2 +-
 .../cassandra/repair/FailingRepairFuzzTest.java    |   7 +-
 .../org/apache/cassandra/repair/FuzzTestBase.java  |  18 +-
 .../org/apache/cassandra/repair/RepairJobTest.java |  58 +--
 .../apache/cassandra/repair/RepairSessionTest.java |   2 +-
 .../accord/AccordFastPathCoordinatorTest.java      |   1 -
 .../cassandra/service/accord/AccordTestUtils.java  |  19 +
 .../service/accord/AccordTopologyTest.java         |  95 +----
 .../service/accord/AccordTopologyUtils.java        | 131 +++++++
 .../accord/repair/RequiredResponseTrackerTest.java |  97 +++++
 39 files changed, 1212 insertions(+), 369 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 616dacf610..da620237a6 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
        path = modules/accord
-       url = https://github.com/apache/cassandra-accord.git
-       branch = trunk
+       url = ../cassandra-accord.git
+       branch = accord-IR
diff --git a/modules/accord b/modules/accord
index 8b4f3895cb..3aaec7566e 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 8b4f3895cb926f937450676b1db2e23d01a8b820
+Subproject commit 3aaec7566e389a0037b93b748867886fb68a0fd0
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 85847d2158..a78e01c83e 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.utils.StorageCompatibilityMode;
 /** A class that extracts system properties for the cassandra node it runs 
within. */
 public enum CassandraRelevantProperties
 {
+    ACCORD_AGENT_CLASS("cassandra.test.accord.agent"),
     
ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL("cassandra.accord.repair.range_step_update_interval",
 "100"),
     ACQUIRE_RETRY_SECONDS("cassandra.acquire_retry_seconds", "60"),
     ACQUIRE_SLEEP_MS("cassandra.acquire_sleep_ms", "1000"),
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
index e75b6be269..409a25c7bb 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
@@ -246,7 +246,7 @@ public class CassandraStreamReceiver implements 
StreamReceiver
         checkNotNull(minVersion, "Unable to determine minimum cluster 
version");
         IAccordService accordService = AccordService.instance();
         if (session.streamOperation().requiresBarrierTransaction()
-            && cfs.metadata().isAccordEnabled()
+            && cfs.metadata().requiresAccordSupport()
             && CassandraVersion.CASSANDRA_5_0.compareTo(minVersion) >= 0)
             accordService.postStreamReceivingBarrier(cfs, ranges);
 
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 93f55008d8..8d1b3e52ff 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -186,9 +186,9 @@ public class TableMetrics
     /** Latency for locally run key migrations **/
     public final LatencyMetrics keyMigration;
     /** Latency for range migrations run by locally coordinated Accord repairs 
**/
-    public final LatencyMetrics rangeMigration;
-    public final TableMeter rangeMigrationUnexpectedFailures;
-    public final TableMeter rangeMigrationDependencyLimitFailures;
+    public final LatencyMetrics accordRepair;
+    public final TableMeter accordRepairUnexpectedFailures;
+    public final TableMeter accordRepairDependencyLimitFailures;
     /** percent of the data that is repaired */
     public final Gauge<Double> percentRepaired;
     /** Reports the size of sstables in repaired, unrepaired, and any ongoing 
repair buckets */
@@ -813,9 +813,9 @@ public class TableMetrics
         casPropose = createLatencyMetrics("CasPropose", 
cfs.keyspace.metric.casPropose);
         casCommit = createLatencyMetrics("CasCommit", 
cfs.keyspace.metric.casCommit);
         keyMigration = createLatencyMetrics("KeyMigration", 
cfs.keyspace.metric.keyMigration, GLOBAL_KEY_MIGRATION_LATENCY);
-        rangeMigration = createLatencyMetrics("RangeMigration", 
cfs.keyspace.metric.rangeMigration, GLOBAL_RANGE_MIGRATION_LATENCY);
-        rangeMigrationUnexpectedFailures = 
createTableMeter("RangeMigrationUnexpectedFailures", 
cfs.keyspace.metric.rangeMigrationUnexpectedFailures);
-        rangeMigrationDependencyLimitFailures = 
createTableMeter("RangeMigrationDependencyLimitFaiures", 
cfs.keyspace.metric.rangeMigrationDependencyLimitFailures);
+        accordRepair = createLatencyMetrics("AccordRepair", 
cfs.keyspace.metric.rangeMigration, GLOBAL_RANGE_MIGRATION_LATENCY);
+        accordRepairUnexpectedFailures = 
createTableMeter("AccordRepairUnexpectedFailures", 
cfs.keyspace.metric.rangeMigrationUnexpectedFailures);
+        accordRepairDependencyLimitFailures = 
createTableMeter("AccordRepairDependencyLimitFaiures", 
cfs.keyspace.metric.rangeMigrationDependencyLimitFailures);
 
         repairsStarted = createTableCounter("RepairJobsStarted");
         repairsCompleted = createTableCounter("RepairJobsCompleted");
diff --git a/src/java/org/apache/cassandra/repair/AbstractRepairJob.java 
b/src/java/org/apache/cassandra/repair/AbstractRepairJob.java
deleted file mode 100644
index df3a67dbc9..0000000000
--- a/src/java/org/apache/cassandra/repair/AbstractRepairJob.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.concurrent.Executor;
-import javax.annotation.Nullable;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.repair.state.JobState;
-import org.apache.cassandra.utils.concurrent.AsyncFuture;
-
-public abstract class AbstractRepairJob extends AsyncFuture<RepairResult> 
implements Runnable
-{
-    protected final SharedContext ctx;
-    public final JobState state;
-    protected final RepairJobDesc desc;
-    protected final RepairSession session;
-    protected final Executor taskExecutor;
-
-    protected final Keyspace ks;
-    protected final ColumnFamilyStore cfs;
-
-    /**
-     * Create repair job to run on specific columnfamily
-     * @param session RepairSession that this RepairJob belongs
-     * @param columnFamily name of the ColumnFamily to repair
-     */
-    public AbstractRepairJob(RepairSession session, String columnFamily)
-    {
-        this.ctx = session.ctx;
-        this.session = session;
-        this.taskExecutor = session.taskExecutor;
-        this.desc = new RepairJobDesc(session.state.parentRepairSession, 
session.getId(), session.state.keyspace, columnFamily, 
session.state.commonRange.ranges);
-        this.state = new JobState(ctx.clock(), desc, 
session.state.commonRange.endpoints);
-        this.ks = Keyspace.open(desc.keyspace);
-        this.cfs = ks.getColumnFamilyStore(columnFamily);
-    }
-
-    public void run()
-    {
-        state.phase.start();
-        cfs.metric.repairsStarted.inc();
-        runRepair();
-    }
-
-    abstract protected void runRepair();
-
-    abstract void abort(@Nullable Throwable reason);
-}
diff --git a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java 
b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
index 67ac2cd287..9bf3fe2f84 100644
--- a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
@@ -79,7 +79,8 @@ public abstract class AbstractRepairTask implements RepairTask
                                                                                
  options.optimiseStreams(),
                                                                                
  options.repairPaxos(),
                                                                                
  options.paxosOnly(),
-                                                                               
  options.accordRepair(),
+                                                                               
  options.accordOnly(),
+                                                                               
  options.isConsensusMigration(),
                                                                                
  executor,
                                                                                
  validationScheduler,
                                                                                
  cfnames);
diff --git a/src/java/org/apache/cassandra/repair/RepairCoordinator.java 
b/src/java/org/apache/cassandra/repair/RepairCoordinator.java
index dadbfcf6f2..f49530b5ba 100644
--- a/src/java/org/apache/cassandra/repair/RepairCoordinator.java
+++ b/src/java/org/apache/cassandra/repair/RepairCoordinator.java
@@ -66,6 +66,7 @@ import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.repair.state.CoordinatorState;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.SystemDistributedKeyspace;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
 import org.apache.cassandra.service.ClientState;
@@ -282,12 +283,29 @@ public class RepairCoordinator implements Runnable, 
ProgressEventNotifier, Repai
         }
     }
 
+    private void validate(RepairOption options)
+    {
+        if (options.paxosOnly() && options.accordOnly())
+            throw new IllegalArgumentException("Cannot specify a repair as 
both paxos only and accord only");
+
+        for (ColumnFamilyStore cfs : columnFamilies)
+        {
+            TableMetadata metadata = cfs.metadata();
+            if (options.paxosOnly() && !metadata.supportsPaxosOperations())
+                throw new IllegalArgumentException(String.format("Cannot run 
paxos only repair on %s.%s, which isn't configured for paxos operations", 
cfs.keyspace.getName(), cfs.name));
+
+            if (options.accordOnly() && !metadata.requiresAccordSupport())
+                throw new IllegalArgumentException(String.format("Cannot run 
accord only repair on %s.%s, which isn't configured for accord operations", 
cfs.keyspace.getName(), cfs.name));
+        }
+    }
+
     private void runMayThrow() throws Throwable
     {
         state.phase.setup();
         ctx.repair().recordRepairStatus(state.cmd, 
ParentRepairStatus.IN_PROGRESS, ImmutableList.of());
 
         populateColumnFamilies();
+        validate(state.options);
 
         this.traceState = maybeCreateTraceState(columnFamilies);
         notifyStarting();
diff --git a/src/java/org/apache/cassandra/repair/CassandraRepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
similarity index 85%
rename from src/java/org/apache/cassandra/repair/CassandraRepairJob.java
rename to src/java/org/apache/cassandra/repair/RepairJob.java
index 80daeff5d5..4ad1f803fb 100644
--- a/src/java/org/apache/cassandra/repair/CassandraRepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -40,6 +40,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -47,9 +50,10 @@ import 
org.apache.cassandra.repair.asymmetric.DifferenceHolder;
 import org.apache.cassandra.repair.asymmetric.HostDifferences;
 import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
 import org.apache.cassandra.repair.asymmetric.ReduceHelper;
-import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.repair.state.JobState;
 import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.repair.AccordRepair;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult;
 import org.apache.cassandra.service.paxos.cleanup.PaxosCleanup;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -59,6 +63,7 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.FutureCombiner;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -70,11 +75,14 @@ import static 
org.apache.cassandra.service.paxos.Paxos.useV2;
 /**
  * RepairJob runs repair on given ColumnFamily.
  */
-public class CassandraRepairJob extends AbstractRepairJob
+public class RepairJob extends AsyncFuture<RepairResult> implements Runnable
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(CassandraRepairJob.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(RepairJob.class);
 
+    protected final Keyspace ks;
+    protected final ColumnFamilyStore cfs;
     private final SharedContext ctx;
+    public final JobState state;
     private final RepairJobDesc desc;
     private final RepairSession session;
     private final RepairParallelism parallelismDegree;
@@ -91,14 +99,24 @@ public class CassandraRepairJob extends AbstractRepairJob
      *  @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
      */
-    public CassandraRepairJob(RepairSession session, String columnFamily)
+    public RepairJob(RepairSession session, String columnFamily)
     {
-        super(session, columnFamily);
         this.ctx = session.ctx;
         this.session = session;
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
         this.desc = new RepairJobDesc(session.state.parentRepairSession, 
session.getId(), session.state.keyspace, columnFamily, 
session.state.commonRange.ranges);
+        this.ks = Keyspace.open(desc.keyspace);
+        this.cfs = ks.getColumnFamilyStore(columnFamily);
+        this.state = new JobState(ctx.clock(), desc, 
session.state.commonRange.endpoints);
+
+        TableMetadata metadata = this.cfs.metadata();
+        if (session.paxosOnly && !metadata.supportsPaxosOperations())
+            throw new IllegalArgumentException(String.format("Cannot run paxos 
only repair on %s.%s, which isn't configured for paxos operations", 
cfs.keyspace.getName(), cfs.name));
+
+        if (session.accordOnly && !metadata.requiresAccordSupport())
+            throw new IllegalArgumentException(String.format("Cannot run 
accord only repair on %s.%s, which isn't configured for accord operations", 
cfs.keyspace.getName(), cfs.name));
+
     }
 
     public long getNowInSeconds()
@@ -114,25 +132,39 @@ public class CassandraRepairJob extends AbstractRepairJob
         }
     }
 
+    @Override
+    public void run()
+    {
+        state.phase.start();
+        cfs.metric.repairsStarted.inc();
+        runRepair();
+    }
+
     /**
      * Runs repair job.
      * <p/>
      * This sets up necessary task and runs them on given {@code taskExecutor}.
      * After submitting all tasks, waits until validation with replica 
completes.
      */
-    @Override
     protected void runRepair()
     {
         List<InetAddressAndPort> allEndpoints = new 
ArrayList<>(session.state.commonRange.endpoints);
         allEndpoints.add(ctx.broadcastAddressAndPort());
 
+        TableMetadata metadata = cfs.metadata();
         Future<Void> paxosRepair;
         Epoch repairStartingEpoch = ClusterMetadata.current().epoch;
-        boolean doPaxosRepair = paxosRepairEnabled() && (((useV2() || 
isMetadataKeyspace()) && session.repairPaxos) || session.paxosOnly);
+
+        Preconditions.checkArgument(!session.paxosOnly || !session.accordOnly);
+        boolean doPaxosRepair = paxosRepairEnabled()
+                                && (((useV2() || isMetadataKeyspace()) && 
session.repairPaxos) || session.paxosOnly)
+                                && metadata.supportsPaxosOperations()
+                                && !session.accordOnly;
+        boolean doAccordRepair = metadata.requiresAccordSupport() && 
!session.paxosOnly;
+
         if (doPaxosRepair)
         {
             logger.info("{} {}.{} starting paxos repair", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
-            TableMetadata metadata = 
Schema.instance.getTableMetadata(desc.keyspace, desc.columnFamily);
             paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, 
desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
         }
         else
@@ -141,6 +173,7 @@ public class CassandraRepairJob extends AbstractRepairJob
             paxosRepair = ImmediateFuture.success(null);
         }
 
+
         if (session.paxosOnly)
         {
             paxosRepair.addCallback(new FutureCallback<>()
@@ -148,12 +181,9 @@ public class CassandraRepairJob extends AbstractRepairJob
                 public void onSuccess(Void ignored)
                 {
                     logger.info("{} {}.{} paxos repair completed", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
-                    trySuccess(new RepairResult(desc, Collections.emptyList(), 
ConsensusMigrationRepairResult.fromCassandraRepair(repairStartingEpoch, 
false)));
+                    trySuccess(new RepairResult(desc, Collections.emptyList(), 
ConsensusMigrationRepairResult.fromPaxosOnlyRepair(repairStartingEpoch, 
session.excludedDeadNodes)));
                 }
 
-                /**
-                 * Snapshot, validation and sync failures are all handled here
-                 */
                 public void onFailure(Throwable t)
                 {
                     logger.warn("{} {}.{} paxos repair failed", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
@@ -163,6 +193,43 @@ public class CassandraRepairJob extends AbstractRepairJob
             return;
         }
 
+        Future<Void> accordRepair;
+        if (doAccordRepair)
+        {
+            accordRepair = paxosRepair.flatMap(unused -> {
+                logger.info("{} {}.{} starting accord repair", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
+                IPartitioner partitioner = metadata.partitioner;
+                AccordRepair repair = new AccordRepair(ctx, cfs, partitioner, 
desc.keyspace, desc.ranges, session.isConsensusMigration && session.accordOnly, 
allEndpoints);
+                return repair.repair(taskExecutor);
+            }, taskExecutor);
+        }
+        else
+        {
+            accordRepair = paxosRepair.flatMap(unused -> {
+                logger.info("{} {}.{} not running accord repair", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
+                return ImmediateFuture.success(null);
+            });
+        }
+
+        if (session.accordOnly)
+        {
+            accordRepair.addCallback(new FutureCallback<Void>()
+            {
+                public void onSuccess(Void ignored)
+                {
+                    logger.info("{} {}.{} accord repair completed", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
+                    trySuccess(new RepairResult(desc, Collections.emptyList(), 
ConsensusMigrationRepairResult.fromAccordOnlyRepair(repairStartingEpoch, 
session.excludedDeadNodes)));
+                }
+
+                public void onFailure(Throwable t)
+                {
+                    logger.warn("{} {}.{} accord repair failed", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
+                    tryFailure(t);
+                }
+            }, taskExecutor);
+            return;
+        }
+
         // Create a snapshot at all nodes unless we're using pure parallel 
repairs
         final Future<?> allSnapshotTasks;
         if (parallelismDegree != RepairParallelism.PARALLEL)
@@ -170,12 +237,12 @@ public class CassandraRepairJob extends AbstractRepairJob
             if (session.isIncremental)
             {
                 // consistent repair does it's own "snapshotting"
-                allSnapshotTasks = paxosRepair.map(input -> allEndpoints);
+                allSnapshotTasks = accordRepair.map(input -> allEndpoints);
             }
             else
             {
                 // Request snapshot to all replica
-                allSnapshotTasks = paxosRepair.flatMap(input -> {
+                allSnapshotTasks = accordRepair.flatMap(input -> {
                     List<Future<InetAddressAndPort>> snapshotTasks = new 
ArrayList<>(allEndpoints.size());
                     state.phase.snapshotsSubmitted();
                     for (InetAddressAndPort endpoint : allEndpoints)
@@ -198,7 +265,7 @@ public class CassandraRepairJob extends AbstractRepairJob
 
         // Run validations and the creation of sync tasks in the scheduler, so 
it can limit the number of Merkle trees
         // that there are in memory at once. When all validations complete, 
submit sync tasks out of the scheduler.
-        Future<List<SyncStat>> syncResults = 
session.validationScheduler.schedule(() -> createSyncTasks(paxosRepair, 
allSnapshotTasks, allEndpoints), taskExecutor)
+        Future<List<SyncStat>> syncResults = 
session.validationScheduler.schedule(() -> createSyncTasks(accordRepair, 
allSnapshotTasks, allEndpoints), taskExecutor)
                                                                         
.flatMap(this::executeTasks, taskExecutor);
 
         // When all sync complete, set the final result
@@ -215,7 +282,7 @@ public class CassandraRepairJob extends AbstractRepairJob
                 }
                 cfs.metric.repairsCompleted.inc();
                 logger.info("Completing repair with excludedDeadNodes {}", 
session.excludedDeadNodes);
-                trySuccess(new RepairResult(desc, stats, 
ConsensusMigrationRepairResult.fromCassandraRepair(repairStartingEpoch, 
doPaxosRepair && !session.excludedDeadNodes)));
+                trySuccess(new RepairResult(desc, stats, 
ConsensusMigrationRepairResult.fromRepair(repairStartingEpoch, doPaxosRepair, 
doAccordRepair, session.excludedDeadNodes)));
             }
 
             /**
@@ -240,7 +307,7 @@ public class CassandraRepairJob extends AbstractRepairJob
         }, taskExecutor);
     }
 
-    private Future<List<SyncTask>> createSyncTasks(Future<Void> paxosRepair, 
Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints)
+    private Future<List<SyncTask>> createSyncTasks(Future<Void> accordRepair, 
Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints)
     {
         Future<List<TreeResponse>> treeResponses;
         if (allSnapshotTasks != null)
@@ -256,7 +323,7 @@ public class CassandraRepairJob extends AbstractRepairJob
         else
         {
             // If not sequential, just send validation request to all replica
-            treeResponses = paxosRepair.flatMap(input -> 
sendValidationRequest(allEndpoints));
+            treeResponses = accordRepair.flatMap(input -> 
sendValidationRequest(allEndpoints));
         }
 
         treeResponses = treeResponses.map(a -> {
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 99b877a96b..230d77b52b 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -74,7 +74,7 @@ import org.apache.cassandra.utils.concurrent.AsyncFuture;
  *
  * A given RepairSession repairs a set of replicas for a given set of ranges 
on a list
  * of column families. For each of the column family to repair, RepairSession
- * creates a {@link AbstractRepairJob} that handles the repair of that CF.
+ * creates a {@link RepairJob} that handles the repair of that CF.
  *
  * A given RepairJob has the 3 main phases:
  * <ol>
@@ -120,8 +120,12 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
     /** Range to repair */
     public final boolean isIncremental;
     public final PreviewKind previewKind;
-    public final boolean repairPaxos;
+    public final boolean repairPaxos; // TODO (now): rename to 
repairPaxosIfSupported
     public final boolean paxosOnly;
+
+    public final boolean accordOnly;
+    public final boolean isConsensusMigration;
+
     public final boolean excludedDeadNodes;
 
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
@@ -136,22 +140,24 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
     public final boolean optimiseStreams;
     public final SharedContext ctx;
     public final Scheduler validationScheduler;
-    private volatile List<AbstractRepairJob> jobs = Collections.emptyList();
-    private final boolean accordRepair;
+    private volatile List<RepairJob> jobs = Collections.emptyList();
 
     private volatile boolean terminated = false;
 
     /**
      * Create new repair session.
-     * @param parentRepairSession the parent sessions id
-     * @param commonRange ranges to repair
-     * @param excludedDeadNodes Was the repair started for --force and were 
dead nodes excluded as a result
-     * @param keyspace name of keyspace
-     * @param parallelismDegree specifies the degree of parallelism when 
calculating the merkle trees
-     * @param pullRepair true if the repair should be one way (from remote 
host to this host and only applicable between two hosts--see RepairOption)
-     * @param repairPaxos true if incomplete paxos operations should be 
completed as part of repair
-     * @param paxosOnly true if we should only complete paxos operations, not 
run a normal repair
-     * @param cfnames names of columnfamilies
+     *
+     * @param parentRepairSession  the parent sessions id
+     * @param commonRange          ranges to repair
+     * @param excludedDeadNodes    Was the repair started for --force and were 
dead nodes excluded as a result
+     * @param keyspace             name of keyspace
+     * @param parallelismDegree    specifies the degree of parallelism when 
calculating the merkle trees
+     * @param pullRepair           true if the repair should be one way (from 
remote host to this host and only applicable between two hosts--see 
RepairOption)
+     * @param repairPaxos          true if incomplete paxos operations should 
be completed as part of repair
+     * @param paxosOnly            true if we should only complete paxos 
operations, not run a normal repair
+     * @param accordOnly           true if we should only complete accord 
operations, not run a normal repair
+     * @param isConsensusMigration true if this repair is being run by the 
consensus migration tool (affects accord repair availability requirements)
+     * @param cfnames              names of columnfamilies
      */
     public RepairSession(SharedContext ctx,
                          Scheduler validationScheduler,
@@ -166,13 +172,15 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
                          boolean optimiseStreams,
                          boolean repairPaxos,
                          boolean paxosOnly,
-                         boolean accordRepair,
+                         boolean accordOnly,
+                         boolean isConsensusMigration,
                          String... cfnames)
     {
         this.ctx = ctx;
         this.validationScheduler = validationScheduler;
         this.repairPaxos = repairPaxos;
         this.paxosOnly = paxosOnly;
+        this.isConsensusMigration = isConsensusMigration;
         assert cfnames.length > 0 : "Repairing no column families seems 
pointless, doesn't it";
         this.state = new SessionState(ctx.clock(), parentRepairSession, 
keyspace, cfnames, commonRange);
         this.parallelismDegree = parallelismDegree;
@@ -181,7 +189,7 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
         this.pullRepair = pullRepair;
         this.optimiseStreams = optimiseStreams;
         this.taskExecutor = new SafeExecutor(createExecutor(ctx));
-        this.accordRepair = accordRepair;
+        this.accordOnly = accordOnly;
         this.excludedDeadNodes = excludedDeadNodes;
     }
 
@@ -305,7 +313,7 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
         logger.info("{} parentSessionId = {}: new session: will sync {} on 
range {} for {}.{}",
                     previewKind.logPrefix(getId()), state.parentRepairSession, 
repairedNodes(), state.commonRange, state.keyspace, 
Arrays.toString(state.cfnames));
         Tracing.traceRepair("Syncing range {}", state.commonRange);
-        if (!previewKind.isPreview() && !paxosOnly)
+        if (!previewKind.isPreview() && !paxosOnly && !accordOnly)
         {
             SystemDistributedKeyspace.startRepairs(getId(), 
state.parentRepairSession, state.keyspace, state.cfnames, state.commonRange);
         }
@@ -343,12 +351,10 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
 
         // Create and submit RepairJob for each ColumnFamily
         state.phase.jobsSubmitted();
-        List<AbstractRepairJob> jobs = new ArrayList<>(state.cfnames.length);
+        List<RepairJob> jobs = new ArrayList<>(state.cfnames.length);
         for (String cfname : state.cfnames)
         {
-            AbstractRepairJob job = accordRepair ?
-                                    new AccordRepairJob(this, cfname) :
-                                    new CassandraRepairJob(this, cfname);
+            RepairJob job = new RepairJob(this, cfname);
             // Repairs can drive forward progress for consensus migration so 
always check
             job.addCallback(ConsensusTableMigration.completedRepairJobHandler);
             state.register(job.state);
@@ -390,10 +396,10 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
     public synchronized void terminate(@Nullable Throwable reason)
     {
         terminated = true;
-        List<AbstractRepairJob> jobs = this.jobs;
+        List<RepairJob> jobs = this.jobs;
         if (jobs != null)
         {
-            for (AbstractRepairJob job : jobs)
+            for (RepairJob job : jobs)
                 job.abort(reason);
         }
         this.jobs = null;
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java 
b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index f7b202dafb..e7ff9bc6c9 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -59,8 +59,7 @@ public class RepairOption
     public static final String IGNORE_UNREPLICATED_KS = 
"ignoreUnreplicatedKeyspaces";
     public static final String REPAIR_PAXOS_KEY = "repairPaxos";
     public static final String PAXOS_ONLY_KEY = "paxosOnly";
-
-    public static final String ACCORD_REPAIR_KEY = "accordRepair";
+    public static final String ACCORD_ONLY_KEY = "accordOnly";
 
     // we don't want to push nodes too much for repair
     public static final int MAX_JOB_THREADS = 4;
@@ -199,21 +198,16 @@ public class RepairOption
         boolean ignoreUnreplicatedKeyspaces = 
Boolean.parseBoolean(options.get(IGNORE_UNREPLICATED_KS));
         boolean repairPaxos = 
Boolean.parseBoolean(options.get(REPAIR_PAXOS_KEY));
         boolean paxosOnly = Boolean.parseBoolean(options.get(PAXOS_ONLY_KEY));
-        boolean accordRepair = 
Boolean.parseBoolean(options.get(ACCORD_REPAIR_KEY));
+        boolean accordOnly = 
Boolean.parseBoolean(options.get(ACCORD_ONLY_KEY));
+
+        if (paxosOnly && accordOnly)
+            throw new IllegalArgumentException("Cannot repair paxos and repair 
only");
 
         if (previewKind != PreviewKind.NONE)
         {
             Preconditions.checkArgument(!repairPaxos, "repairPaxos must be set 
to false for preview repairs");
             Preconditions.checkArgument(!paxosOnly, "paxosOnly must be set to 
false for preview repairs");
-            Preconditions.checkArgument(!accordRepair, "accordRepair must be 
set to false for preview repairs");
-        }
-
-        if (accordRepair)
-        {
-            Preconditions.checkArgument(!paxosOnly, "paxosOnly must be set to 
false for Accord repairs");
-            Preconditions.checkArgument(previewKind == PreviewKind.NONE, 
"Can't perform preview repair with an Accord repair");
-            Preconditions.checkArgument(!force, "Accord repair only requires a 
quorum to work so force is not supported");
-            incremental = false;
+            Preconditions.checkArgument(!accordOnly, "accordOnly must be set 
to false for preview repairs");
         }
 
         int jobThreads = 1;
@@ -231,7 +225,7 @@ public class RepairOption
 
         boolean asymmetricSyncing = 
Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY));
 
-        RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, trace, jobThreads, ranges, pullRepair, force, previewKind, 
asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, 
accordRepair);
+        RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, trace, jobThreads, ranges, pullRepair, force, previewKind, 
asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, 
accordOnly, false);
 
         // data centers
         String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -313,14 +307,15 @@ public class RepairOption
     private final boolean repairPaxos;
     private final boolean paxosOnly;
 
-    private final boolean accordRepair;
+    private final boolean accordOnly;
+    private final boolean isConsensusMigration;
 
     private final Collection<String> columnFamilies = new HashSet<>();
     private final Collection<String> dataCenters = new HashSet<>();
     private final Collection<String> hosts = new HashSet<>();
     private final Collection<Range<Token>> ranges = new HashSet<>();
 
-    public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> 
ranges, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, 
boolean optimiseStreams, boolean ignoreUnreplicatedKeyspaces, boolean 
repairPaxos, boolean paxosOnly, boolean accordRepair)
+    public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> 
ranges, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, 
boolean optimiseStreams, boolean ignoreUnreplicatedKeyspaces, boolean 
repairPaxos, boolean paxosOnly, boolean accordOnly, boolean 
isConsensusMigration)
     {
 
         this.parallelism = parallelism;
@@ -328,6 +323,7 @@ public class RepairOption
         this.incremental = incremental;
         this.trace = trace;
         this.jobThreads = jobThreads;
+        this.isConsensusMigration = isConsensusMigration;
         this.ranges.addAll(ranges);
         this.pullRepair = pullRepair;
         this.forceRepair = forceRepair;
@@ -336,17 +332,7 @@ public class RepairOption
         this.ignoreUnreplicatedKeyspaces = ignoreUnreplicatedKeyspaces;
         this.repairPaxos = repairPaxos;
         this.paxosOnly = paxosOnly;
-        this.accordRepair = accordRepair;
-    }
-
-    public RepairOption withAccordRepair(boolean accordRepair)
-    {
-        RepairOption repairOption = new RepairOption(parallelism, 
primaryRange, incremental, trace, jobThreads, ranges, pullRepair, forceRepair, 
previewKind, optimiseStreams, ignoreUnreplicatedKeyspaces, repairPaxos, 
paxosOnly, accordRepair);
-        repairOption.columnFamilies.addAll(columnFamilies);
-        repairOption.dataCenters.addAll(dataCenters);
-        repairOption.hosts.addAll(hosts);
-        repairOption.ranges.addAll(ranges);
-        return repairOption;
+        this.accordOnly = accordOnly;
     }
 
     public RepairParallelism getParallelism()
@@ -457,9 +443,14 @@ public class RepairOption
         return paxosOnly;
     }
 
-    public boolean accordRepair()
+    public boolean accordOnly()
+    {
+        return accordOnly;
+    }
+
+    public boolean isConsensusMigration()
     {
-        return accordRepair;
+        return isConsensusMigration;
     }
 
     @Override
@@ -481,7 +472,7 @@ public class RepairOption
                ", ignore unreplicated keyspaces: "+ 
ignoreUnreplicatedKeyspaces +
                ", repairPaxos: " + repairPaxos +
                ", paxosOnly: " + paxosOnly +
-               ", accordRepair: " + accordRepair +
+               ", accordOnly: " + accordOnly +
                ')';
     }
 
@@ -503,7 +494,7 @@ public class RepairOption
         options.put(OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams));
         options.put(REPAIR_PAXOS_KEY, Boolean.toString(repairPaxos));
         options.put(PAXOS_ONLY_KEY, Boolean.toString(paxosOnly));
-        options.put(ACCORD_REPAIR_KEY, Boolean.toString(accordRepair));
+        options.put(ACCORD_ONLY_KEY, Boolean.toString(accordOnly));
         return options;
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/state/CoordinatorState.java 
b/src/java/org/apache/cassandra/repair/state/CoordinatorState.java
index fbd184737e..35fbbf6bc7 100644
--- a/src/java/org/apache/cassandra/repair/state/CoordinatorState.java
+++ b/src/java/org/apache/cassandra/repair/state/CoordinatorState.java
@@ -77,10 +77,6 @@ public class CoordinatorState extends 
AbstractState<CoordinatorState.State, Time
                 default: throw new AssertionError("Unknown preview kind: " + 
options.getPreviewKind());
             }
         }
-        else if (options.accordRepair())
-        {
-            return "accord repair";
-        }
         else if (options.isIncremental())
         {
             return "incremental";
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java 
b/src/java/org/apache/cassandra/schema/TableMetadata.java
index 338b7dbc18..9838f38299 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -66,6 +66,7 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.fastpath.FastPathStrategy;
+import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.tcm.Epoch;
 import 
org.apache.cassandra.tcm.serialization.UDTAndFunctionsAwareMetadataSerializer;
@@ -340,6 +341,12 @@ public class TableMetadata implements SchemaElement
         return isAccordEnabled() || migratingFromAccord();
     }
 
+    public boolean supportsPaxosOperations()
+    {
+        return params.transactionalMode == TransactionalMode.off
+               || params.transactionalMigrationFrom.from == 
TransactionalMode.off;
+    }
+
     public ImmutableCollection<ColumnMetadata> columns()
     {
         return columns.values();
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 8fcee2610f..a2a89e1074 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -454,7 +454,8 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                                              boolean optimiseStreams,
                                              boolean repairPaxos,
                                              boolean paxosOnly,
-                                             boolean accordRepair,
+                                             boolean accordOnly,
+                                             boolean isConsensusMigration,
                                              ExecutorPlus executor,
                                              Scheduler validationScheduler,
                                              String... cfnames)
@@ -472,7 +473,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                                                         range, 
excludedDeadNodes, keyspace,
                                                         parallelismDegree, 
isIncremental, pullRepair,
                                                         previewKind, 
optimiseStreams, repairPaxos, paxosOnly,
-                                                        accordRepair, cfnames);
+                                                        accordOnly, 
isConsensusMigration, cfnames);
         repairs.getIfPresent(parentRepairSession).register(session.state);
 
         sessions.put(session.getId(), session);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 6e136029a0..54f8c369ff 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -20,10 +20,14 @@ package org.apache.cassandra.service.accord;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 
 import javax.annotation.concurrent.GuardedBy;
@@ -32,12 +36,18 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
 
+import accord.coordinate.Barrier;
+import accord.coordinate.CoordinateSyncPoint;
 import accord.coordinate.TopologyMismatch;
 import accord.impl.CoordinateDurabilityScheduling;
+import accord.primitives.SyncPoint;
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.cql3.statements.RequestValidations;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import 
org.apache.cassandra.service.accord.interop.AccordInteropAdapter.AccordInteropFactory;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.service.accord.repair.RepairSyncPointAdapter;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.service.accord.api.*;
 import org.apache.cassandra.utils.*;
@@ -150,6 +160,12 @@ public class AccordService implements IAccordService, 
Shutdownable
             throw new UnsupportedOperationException("No accord barriers should 
be executed when accord.enabled = false in cassandra.yaml");
         }
 
+        @Override
+        public long repair(@Nonnull Seekables keysOrRanges, long epoch, long 
queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean 
isForWrite, List<InetAddressAndPort> allEndpoints)
+        {
+            throw new UnsupportedOperationException("No accord repairs should 
be executed when accord.enabled = false in cassandra.yaml");
+        }
+
         @Override
         public @Nonnull TxnResult coordinate(@Nonnull Txn txn, @Nonnull 
ConsistencyLevel consistencyLevel, @Nonnull long queryStartNanos)
         {
@@ -287,7 +303,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     {
         Invariants.checkState(localId != null, "static localId must be set 
before instantiating AccordService");
         logger.info("Starting accord with nodeId {}", localId);
-        AccordAgent agent = new AccordAgent();
+        AccordAgent agent = 
FBUtilities.construct(CassandraRelevantProperties.ACCORD_AGENT_CLASS.getString(AccordAgent.class.getName()),
 "AccordAgent");
         this.configService = new AccordConfigurationService(localId);
         this.fastPathCoordinator = AccordFastPathCoordinator.create(localId, 
configService);
         this.messageSink = new AccordMessageSink(agent, configService);
@@ -341,8 +357,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         return requestHandler;
     }
 
-    @Override
-    public long barrier(@Nonnull Seekables keysOrRanges, long epoch, long 
queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean isForWrite)
+    private <S extends Seekables<?, ?>> long barrier(@Nonnull S keysOrRanges, 
long epoch, long queryStartNanos, long timeoutNanos, BarrierType barrierType, 
boolean isForWrite, BiFunction<Node, S, AsyncResult<SyncPoint<S>>> syncPoint)
     {
         AccordClientRequestMetrics metrics = isForWrite ? accordWriteMetrics : 
accordReadMetrics;
         TxnId txnId = null;
@@ -350,7 +365,9 @@ public class AccordService implements IAccordService, 
Shutdownable
         {
             logger.debug("Starting barrier key: {} epoch: {} barrierType: {} 
isForWrite {}", keysOrRanges, epoch, barrierType, isForWrite);
             txnId = node.nextTxnId(Kind.SyncPoint, keysOrRanges.domain());
-            AsyncResult<Timestamp> asyncResult = node.barrier(keysOrRanges, 
epoch, barrierType);
+            AsyncResult<Timestamp> asyncResult = syncPoint == null
+                                                 ? Barrier.barrier(node, 
keysOrRanges, epoch, barrierType)
+                                                 : Barrier.barrier(node, 
keysOrRanges, epoch, barrierType, syncPoint);
             long deadlineNanos = queryStartNanos + timeoutNanos;
             Timestamp barrierExecuteAt = AsyncChains.getBlocking(asyncResult, 
deadlineNanos - nanoTime(), NANOSECONDS);
             logger.debug("Completed in {}ms barrier key: {} epoch: {} 
barrierType: {} isForWrite {}",
@@ -394,6 +411,23 @@ public class AccordService implements IAccordService, 
Shutdownable
         }
     }
 
+    @Override
+    public long barrier(@Nonnull Seekables keysOrRanges, long epoch, long 
queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean isForWrite)
+    {
+        return barrier(keysOrRanges, epoch, queryStartNanos, timeoutNanos, 
barrierType, isForWrite, null);
+    }
+
+    public static <S extends Seekables<?, ?>> BiFunction<Node, S, 
AsyncResult<SyncPoint<S>>> repairSyncPoint(Set<Node.Id> allNodes)
+    {
+        return (node, seekables) -> CoordinateSyncPoint.coordinate(node, 
Kind.SyncPoint, seekables, RepairSyncPointAdapter.create(allNodes));
+    }
+
+    public long repair(@Nonnull Seekables keysOrRanges, long epoch, long 
queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean 
isForWrite, List<InetAddressAndPort> allEndpoints)
+    {
+        Set<Node.Id> allNodes = 
allEndpoints.stream().map(configService::mappedId).collect(Collectors.toUnmodifiableSet());
+        return barrier(keysOrRanges, epoch, queryStartNanos, timeoutNanos, 
barrierType, isForWrite, repairSyncPoint(allNodes));
+    }
+
     private static ReadTimeoutException newBarrierTimeout(TxnId txnId, boolean 
global)
     {
         return new ReadTimeoutException(global ? ConsistencyLevel.ANY : 
ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString());
@@ -404,14 +438,13 @@ public class AccordService implements IAccordService, 
Shutdownable
         return new ReadPreemptedException(global ? ConsistencyLevel.ANY : 
ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString());
     }
 
-    @Override
-    public long barrierWithRetries(Seekables keysOrRanges, long minEpoch, 
BarrierType barrierType, boolean isForWrite) throws InterruptedException
+    private long doWithRetries(LongSupplier action, int retryAttempts, long 
initialBackoffMillis, long maxBackoffMillis) throws InterruptedException
     {
         // Since we could end up having the barrier transaction or the 
transaction it listens to invalidated
         CoordinationFailed existingFailures = null;
         Long success = null;
         long backoffMillis = 0;
-        for (int attempt = 0; attempt < 
DatabaseDescriptor.getAccordBarrierRetryAttempts(); attempt++)
+        for (int attempt = 0; attempt < retryAttempts; attempt++)
         {
             try
             {
@@ -423,10 +456,10 @@ public class AccordService implements IAccordService, 
Shutdownable
                     e.addSuppressed(existingFailures);
                 throw e;
             }
-            backoffMillis = backoffMillis == 0 ? 
DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis() : 
Math.min(backoffMillis * 2, 
DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis());
+            backoffMillis = backoffMillis == 0 ? initialBackoffMillis : 
Math.min(backoffMillis * 2, maxBackoffMillis);
             try
             {
-                success = AccordService.instance().barrier(keysOrRanges, 
minEpoch, Clock.Global.nanoTime(), 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, 
isForWrite);
+                success = action.getAsLong();
                 break;
             }
             catch (CoordinationFailed newFailures)
@@ -442,6 +475,24 @@ public class AccordService implements IAccordService, 
Shutdownable
         return success;
     }
 
+    @Override
+    public long barrierWithRetries(Seekables keysOrRanges, long minEpoch, 
BarrierType barrierType, boolean isForWrite) throws InterruptedException
+    {
+        return doWithRetries(() -> 
AccordService.instance().barrier(keysOrRanges, minEpoch, 
Clock.Global.nanoTime(), 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, 
isForWrite),
+                      DatabaseDescriptor.getAccordBarrierRetryAttempts(),
+                      
DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(),
+                      
DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis());
+    }
+
+    @Override
+    public long repairWithRetries(Seekables keysOrRanges, long minEpoch, 
BarrierType barrierType, boolean isForWrite, List<InetAddressAndPort> 
allEndpoints) throws InterruptedException
+    {
+        return doWithRetries(() -> 
AccordService.instance().repair(keysOrRanges, minEpoch, 
Clock.Global.nanoTime(), 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, 
isForWrite, allEndpoints),
+                             
DatabaseDescriptor.getAccordBarrierRetryAttempts(),
+                             
DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(),
+                             
DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis());
+    }
+
     @Override
     public long currentEpoch()
     {
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java 
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index 42b957ba6a..ae0d08c48e 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
@@ -60,6 +61,13 @@ public interface IAccordService
 
     long barrier(@Nonnull Seekables keysOrRanges, long minEpoch, long 
queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean 
isForWrite);
 
+    default long repairWithRetries(Seekables keysOrRanges, long minEpoch, 
BarrierType barrierType, boolean isForWrite, List<InetAddressAndPort> 
allEndpoints) throws InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    long repair(@Nonnull Seekables keysOrRanges, long epoch, long 
queryStartNanos, long timeoutNanos, BarrierType barrierType, boolean 
isForWrite, List<InetAddressAndPort> allEndpoints);
+
     default void postStreamReceivingBarrier(ColumnFamilyStore cfs, 
List<Range<Token>> ranges)
     {
         String ks = cfs.keyspace.getName();
diff --git a/src/java/org/apache/cassandra/repair/AccordRepairJob.java 
b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
similarity index 67%
rename from src/java/org/apache/cassandra/repair/AccordRepairJob.java
rename to src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
index d82e4407fa..924662b38d 100644
--- a/src/java/org/apache/cassandra/repair/AccordRepairJob.java
+++ b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
@@ -16,12 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.repair;
+package org.apache.cassandra.service.accord.repair;
 
 import java.math.BigInteger;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
 
-import org.apache.cassandra.service.accord.AccordTopology;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,67 +31,88 @@ import accord.api.BarrierType;
 import accord.api.RoutingKey;
 import accord.primitives.Ranges;
 import accord.primitives.Seekables;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.AccordSplitter;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.AccordTopology;
 import org.apache.cassandra.service.accord.TokenRange;
-import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
 
 import static com.google.common.base.Preconditions.checkState;
-import static java.util.Collections.emptyList;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL;
 
 /*
  * Accord repair consists of creating a barrier transaction for all the ranges 
which ensure that all Accord transactions
  * before the Epoch and point in time at which the repair started have their 
side effects visible to Paxos and regular quorum reads.
  */
-public class AccordRepairJob extends AbstractRepairJob
+public class AccordRepair
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(AccordRepairJob.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordRepair.class);
 
     public static final BigInteger TWO = BigInteger.valueOf(2);
 
+    private final SharedContext ctx;
+    private final ColumnFamilyStore cfs;
+
     private final Ranges ranges;
 
     private final AccordSplitter splitter;
+    private final boolean requireAllEndpoints;
+    private final List<InetAddressAndPort> endpoints;
 
     private BigInteger rangeStep;
 
-    private Epoch minEpoch = ClusterMetadata.current().epoch;
+    private final Epoch minEpoch = ClusterMetadata.current().epoch;
 
     private volatile Throwable shouldAbort = null;
 
-    public AccordRepairJob(RepairSession repairSession, String cfname)
+    public AccordRepair(SharedContext ctx, ColumnFamilyStore cfs, IPartitioner 
partitioner, String keyspace, Collection<Range<Token>> ranges, boolean 
requireAllEndpoints, List<InetAddressAndPort> endpoints)
     {
-        super(repairSession, cfname);
-        IPartitioner partitioner = 
desc.ranges.iterator().next().left.getPartitioner();
-        this.ranges = AccordTopology.toAccordRanges(desc.keyspace, 
desc.ranges);
-        this.splitter = partitioner.accordSplitter().apply(ranges);
+        this.ctx = ctx;
+        this.cfs = cfs;
+        this.requireAllEndpoints = requireAllEndpoints;
+        this.endpoints = endpoints;
+        this.ranges = AccordTopology.toAccordRanges(keyspace, ranges);
+        this.splitter = partitioner.accordSplitter().apply(this.ranges);
     }
 
-    @Override
-    protected void runRepair()
+    public Epoch minEpoch()
     {
-        try
-        {
-            for (accord.primitives.Range range : ranges)
-                repairRange((TokenRange)range);
-            state.phase.success();
-            cfs.metric.repairsCompleted.inc();
-            trySuccess(new RepairResult(desc, emptyList(), 
ConsensusMigrationRepairResult.fromAccordRepair(minEpoch)));
-        }
-        catch (Throwable t)
-        {
-            state.phase.fail(t);
-            cfs.metric.repairsCompleted.inc();
-            tryFailure(t);
-        }
+        return minEpoch;
     }
 
-    @Override
-    void abort(@Nullable Throwable reason)
+    public void repair() throws Throwable
+    {
+        for (accord.primitives.Range range : ranges)
+            repairRange((TokenRange)range);
+    }
+
+    public Future<Void> repair(Executor executor)
+    {
+        AsyncPromise<Void> future = new AsyncPromise<>();
+        executor.execute(() -> {
+            try
+            {
+                repair();
+                future.trySuccess(null);
+            }
+            catch (Throwable e)
+            {
+                future.tryFailure(e);
+            }
+        });
+        return future;
+    }
+
+    protected void abort(@Nullable Throwable reason)
     {
         shouldAbort = reason == null ? new RuntimeException("Abort") : reason;
     }
@@ -145,25 +168,32 @@ public class AccordRepairJob extends AbstractRepairJob
                 checkState(!toRepair.equals(lastRepaired), "Shouldn't repair 
the same range twice");
                 checkState(lastRepaired == null || 
toRepair.start().equals(lastRepaired.end()), "Next range should directly follow 
previous range");
                 lastRepaired = toRepair;
-                
AccordService.instance().barrierWithRetries(Seekables.of(toRepair), 
minEpoch.getEpoch(), BarrierType.global_sync, false);
+
+                if (requireAllEndpoints)
+                {
+                    
AccordService.instance().repairWithRetries(Seekables.of(toRepair), 
minEpoch.getEpoch(), BarrierType.global_sync, false, endpoints);
+                }
+                else
+                {
+                    
AccordService.instance().barrierWithRetries(Seekables.of(toRepair), 
minEpoch.getEpoch(), BarrierType.global_sync, false);
+                }
+
                 remainingStart = toRepair.end();
             }
             catch (RuntimeException e)
             {
-                // TODO Placeholder for dependency limit overflow
-//                dependencyOverflow = true;
-                cfs.metric.rangeMigrationDependencyLimitFailures.mark();
+                cfs.metric.accordRepairUnexpectedFailures.mark();
                 throw e;
             }
             catch (Throwable t)
             {
-                // unexpected error
-                cfs.metric.rangeMigrationUnexpectedFailures.mark();
+                cfs.metric.accordRepairUnexpectedFailures.mark();
                 throw new RuntimeException(t);
             }
             finally
             {
-                cfs.metric.rangeMigration.addNano(start);
+                long end = ctx.clock().nanoTime();
+                cfs.metric.accordRepair.addNano(end - start);
             }
 
             // TODO when dependency limits are added to Accord need to test 
repair overflow
diff --git 
a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
 
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
new file mode 100644
index 0000000000..76e29adbc5
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.repair;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+import com.google.common.collect.ImmutableSet;
+
+import accord.api.Result;
+import accord.coordinate.CoordinationAdapter;
+import accord.coordinate.ExecutePath;
+import accord.coordinate.ExecuteSyncPoint;
+import accord.local.Node;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Seekables;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.topology.Topologies;
+
+/**
+ * Sync point adapter used for accord-only repairs.
+ *
+ * Repair has the requirement that all client writes begun before the repair 
will be fully replicated once repair
+ * has completed. In the case of accord, repairs that compare data on disk 
satisfy this requirement by running
+ * a sync point as part of streaming if differences are found. For accord-only 
repairs, the barrier used by normal
+ * repairs is not sufficient since it only requires a quorum of nodes to 
respond before completing. This sync point
+ * adapter requires responses from all of the supplied endpoints before 
completing. Note that shards only block on the
+ * intersection of the provided replicas and their own endpoints.
+ */
+public class RepairSyncPointAdapter<S extends Seekables<?, ?>> extends 
CoordinationAdapter.Adapters.AbstractSyncPointAdapter<S>
+{
+    private final ImmutableSet<Node.Id> requiredResponses;
+
+    public RepairSyncPointAdapter(Collection<Node.Id> requiredResponses)
+    {
+        this.requiredResponses = ImmutableSet.copyOf(requiredResponses);
+    }
+
+    @Override
+    public void execute(Node node, Topologies all, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super SyncPoint<S>, Throwable> callback)
+    {
+        RequiredResponseTracker tracker = new 
RequiredResponseTracker(requiredResponses, all);
+        ExecuteSyncPoint.ExecuteBlocking<S> execute = new 
ExecuteSyncPoint.ExecuteBlocking<>(node, tracker, new SyncPoint<>(txnId, deps, 
(S) txn.keys(), route), executeAt);
+        execute.addCallback(callback);
+        execute.start();
+    }
+
+    @Override
+    public void persist(Node node, Topologies all, FullRoute<?> route, TxnId 
txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, 
BiConsumer<? super SyncPoint<S>, Throwable> callback)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public static <S extends Seekables<?, ?>> 
CoordinationAdapter<SyncPoint<S>> create(Collection<Node.Id> requiredResponses)
+    {
+        return new RepairSyncPointAdapter<>(requiredResponses);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java
 
b/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java
new file mode 100644
index 0000000000..130e914969
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.repair;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import accord.coordinate.tracking.AbstractSimpleTracker;
+import accord.coordinate.tracking.RequestStatus;
+import accord.coordinate.tracking.ShardTracker;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
+import static 
accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success;
+
+public class RequiredResponseTracker extends 
AbstractSimpleTracker<RequiredResponseTracker.RequiredResponseShardTracker>
+{
+    public static class RequiredResponseShardTracker extends ShardTracker
+    {
+        private final Set<Node.Id> outstandingResponses;
+
+        public RequiredResponseShardTracker(Set<Node.Id> requiredResponses, 
Shard shard)
+        {
+            super(shard);
+            this.outstandingResponses = new HashSet<>();
+            for (Node.Id id : shard.nodes)
+            {
+                if (requiredResponses.contains(id))
+                    outstandingResponses.add(id);
+            }
+        }
+
+        public ShardOutcomes onSuccess(Node.Id node)
+        {
+            return outstandingResponses.remove(node) && 
outstandingResponses.isEmpty() ? Success : NoChange;
+        }
+
+        public ShardOutcomes onFailure(Object ignore)
+        {
+            return !outstandingResponses.isEmpty() ? Fail : NoChange;
+        }
+    }
+
+    public RequiredResponseTracker(Set<Node.Id> requiredResponses, Topologies 
topologies)
+    {
+        super(topologies, RequiredResponseShardTracker[]::new, shard -> new 
RequiredResponseShardTracker(requiredResponses, shard));
+    }
+
+    @Override
+    public RequestStatus recordSuccess(Node.Id node)
+    {
+        return recordResponse(this, node, 
RequiredResponseShardTracker::onSuccess, node);
+    }
+
+    @Override
+    public RequestStatus recordFailure(Node.Id node)
+    {
+        return recordResponse(this, node, 
RequiredResponseShardTracker::onFailure, null);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
index 1b48a53554..163e8f65f2 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
@@ -82,6 +82,7 @@ public class ReadDataSerializers
             CommandSerializers.txnId.serialize(msg.txnId, out, version);
             KeySerializers.participants.serialize(msg.readScope, out, version);
             out.writeUnsignedVInt(msg.executeAtEpoch);
+            CommandSerializers.timestamp.serialize(msg.executeAt, out, 
version);
             KeySerializers.fullRoute.serialize(msg.route, out, version);
             CommandSerializers.partialTxn.serialize(msg.txn, out, version);
             DepsSerializer.partialDeps.serialize(msg.deps, out, version);
@@ -97,6 +98,7 @@ public class ReadDataSerializers
             CommandSerializers.txnId.deserialize(in, version),
             KeySerializers.participants.deserialize(in, version),
             in.readUnsignedVInt(),
+            CommandSerializers.timestamp.deserialize(in, version),
             KeySerializers.fullRoute.deserialize(in, version),
             CommandSerializers.partialTxn.deserialize(in, version),
             DepsSerializer.partialDeps.deserialize(in, version),
@@ -111,6 +113,7 @@ public class ReadDataSerializers
             return CommandSerializers.txnId.serializedSize(msg.txnId, version)
                    + KeySerializers.participants.serializedSize(msg.readScope, 
version)
                    + TypeSizes.sizeofUnsignedVInt(msg.executeAtEpoch)
+                   + 
CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
                    + KeySerializers.fullRoute.serializedSize(msg.route, 
version)
                    + CommandSerializers.partialTxn.serializedSize(msg.txn, 
version)
                    + DepsSerializer.partialDeps.serializedSize(msg.deps, 
version)
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairResult.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairResult.java
index 9233667b5a..2215c734c8 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairResult.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairResult.java
@@ -24,6 +24,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 public class ConsensusMigrationRepairResult
 {
+    private static final ConsensusMigrationRepairResult INELIGIBLE = new 
ConsensusMigrationRepairResult(ConsensusMigrationRepairType.ineligible, 
Epoch.EMPTY);
     public final ConsensusMigrationRepairType type;
     public final Epoch minEpoch;
 
@@ -33,18 +34,24 @@ public class ConsensusMigrationRepairResult
         this.minEpoch = minEpoch;
     }
 
-    public static ConsensusMigrationRepairResult fromCassandraRepair(Epoch 
minEpoch, boolean migrationEligibleRepair)
+    public static ConsensusMigrationRepairResult fromRepair(Epoch minEpoch, 
boolean paxosAndDataRepaired, boolean accordRepaired, boolean deadNodesExcluded)
     {
-        checkArgument(!migrationEligibleRepair || 
minEpoch.isAfter(Epoch.EMPTY), "Epoch should not be empty if Paxos and regular 
repairs were performed");
-        if (migrationEligibleRepair)
-            return new 
ConsensusMigrationRepairResult(ConsensusMigrationRepairType.paxos, minEpoch);
-        else
-            return new 
ConsensusMigrationRepairResult(ConsensusMigrationRepairType.ineligible, 
Epoch.EMPTY);
+        checkArgument((!paxosAndDataRepaired && !accordRepaired) || 
minEpoch.isAfter(Epoch.EMPTY), "Epoch should not be empty if Paxos and regular 
repairs were performed");
+
+        if (deadNodesExcluded) return INELIGIBLE;
+        if (paxosAndDataRepaired && accordRepaired) return new 
ConsensusMigrationRepairResult(ConsensusMigrationRepairType.either, minEpoch);
+        if (paxosAndDataRepaired) return new 
ConsensusMigrationRepairResult(ConsensusMigrationRepairType.paxos, minEpoch);
+        if (accordRepaired) return new 
ConsensusMigrationRepairResult(ConsensusMigrationRepairType.accord, minEpoch);
+        return INELIGIBLE;
+    }
+
+    public static ConsensusMigrationRepairResult fromPaxosOnlyRepair(Epoch 
minEpoch, boolean deadNodesExcluded)
+    {
+        return fromRepair(minEpoch, false, false, deadNodesExcluded);
     }
 
-    public static ConsensusMigrationRepairResult fromAccordRepair(Epoch 
minEpoch)
+    public static ConsensusMigrationRepairResult fromAccordOnlyRepair(Epoch 
minEpoch, boolean deadNodesExcluded)
     {
-        checkArgument(minEpoch.isAfter(Epoch.EMPTY), "Accord repairs should 
always occur at an Epoch");
-        return new 
ConsensusMigrationRepairResult(ConsensusMigrationRepairType.accord, minEpoch);
+        return fromRepair(minEpoch, false, true, deadNodesExcluded);
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairType.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairType.java
index 694b7fcd44..7bce208739 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairType.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationRepairType.java
@@ -24,7 +24,8 @@ public enum ConsensusMigrationRepairType
 {
     ineligible(0),
     paxos(1),
-    accord(2);
+    accord(2),
+    either(3);
 
     public final byte value;
 
@@ -50,6 +51,8 @@ public enum ConsensusMigrationRepairType
                 return ConsensusMigrationRepairType.paxos;
             case 2:
                 return ConsensusMigrationRepairType.accord;
+            case 3:
+                return ConsensusMigrationRepairType.either;
         }
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationTarget.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationTarget.java
index 534ad1e99e..45c2e5fa12 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationTarget.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationTarget.java
@@ -34,6 +34,17 @@ public enum ConsensusMigrationTarget
         this.value = SignedBytes.checkedCast(value);
     }
 
+    public boolean isMigratedBy(ConsensusMigrationRepairType repairType)
+    {
+        switch (repairType)
+        {
+            case either: return true;
+            case paxos: return this == accord;
+            case accord: return this == paxos;
+            default: return false;
+        }
+    }
+
     public static ConsensusMigrationTarget fromString(String targetProtocol)
     {
         return ConsensusMigrationTarget.valueOf(targetProtocol.toLowerCase());
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusTableMigration.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusTableMigration.java
index 3f25922a3e..ae6e2f0cb0 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusTableMigration.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusTableMigration.java
@@ -90,9 +90,7 @@ public abstract class ConsensusTableMigration
             if (tms == null || !Range.intersects(tms.migratingRanges, 
desc.ranges))
                 return;
 
-            if (tms.targetProtocol == ConsensusMigrationTarget.paxos && 
repairResult.consensusMigrationRepairResult.type != 
ConsensusMigrationRepairType.accord)
-                return;
-            if (tms.targetProtocol == ConsensusMigrationTarget.accord && 
repairResult.consensusMigrationRepairResult.type != 
ConsensusMigrationRepairType.paxos)
+            if 
(!tms.targetProtocol.isMigratedBy(repairResult.consensusMigrationRepairResult.type))
                 return;
 
             ClusterMetadataService.instance().commit(
@@ -310,7 +308,8 @@ public abstract class ConsensusTableMigration
         boolean ignoreUnreplicatedKeyspaces = true;
         boolean repairPaxos = !accordRepair;
         boolean paxosOnly = false;
-        RepairOption repairOption = new 
RepairOption(RepairParallelism.PARALLEL, primaryRange, incremental, trace, 
numJobThreads, intersectingRanges, pullRepair, forceRepair, PreviewKind.NONE, 
optimiseStreams, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, 
accordRepair);
+        boolean accordOnly = false;
+        RepairOption repairOption = new 
RepairOption(RepairParallelism.PARALLEL, primaryRange, incremental, trace, 
numJobThreads, intersectingRanges, pullRepair, forceRepair, PreviewKind.NONE, 
optimiseStreams, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, 
accordOnly, true);
         tables.forEach(table -> 
repairOption.getColumnFamilies().add(table.tableName));
         return repairOption;
     }
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
 
b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
index 7dff0111ef..1c1f3985fa 100644
--- 
a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
@@ -38,7 +38,6 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableParams;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairType;
-import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusTableMigration;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import org.apache.cassandra.service.consensus.migration.TableMigrationState;
@@ -138,11 +137,8 @@ public class MaybeFinishConsensusMigrationForTableAndRange 
implements Transforma
         if (tms == null)
             return new Rejected(INVALID, format("Table %s is not currently 
performing consensus migration", ksAndCF));
 
-        if (tms.targetProtocol == ConsensusMigrationTarget.accord && 
repairType != ConsensusMigrationRepairType.paxos)
-            return new Rejected(INVALID, format("Table %s is not currently 
performing consensus migration to Accord and the repair was a Paxos repair", 
ksAndCF));
-
-        if (tms.targetProtocol == ConsensusMigrationTarget.paxos && repairType 
!= ConsensusMigrationRepairType.accord)
-            return new Rejected(INVALID, format("Table %s is not currently 
performing consensus migration to Paxos and the repair was an Accord repair", 
ksAndCF));
+        if (!tms.targetProtocol.isMigratedBy(repairType))
+            return new Rejected(INVALID, format("Table %s is not currently 
performing consensus migration to %s and the repair was a %s repair", ksAndCF, 
tms.targetProtocol, repairType));
 
         List<Range<Token>> normalizedRepairedRanges = 
normalize(repairedRanges);
 
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java 
b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index ff7eace231..d5e24caa57 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -105,6 +105,9 @@ public class Repair extends NodeToolCmd
     @Option(title = "paxos-only", name = {"-paxos-only", "--paxos-only"}, 
description = "If the --paxos-only flag is included, no table data is repaired, 
only paxos operations..")
     private boolean paxosOnly = false;
 
+    @Option(title = "accord-only", name = {"-accord-only", "--accord-only"}, 
description = "If the --accord-only flag is included, no table data is 
repaired, only accord operations..")
+    private boolean accordOnly = false;
+
     @Option(title = "ignore_unreplicated_keyspaces", name = 
{"-iuk","--ignore-unreplicated-keyspaces"}, description = "Use 
--ignore-unreplicated-keyspaces to ignore keyspaces which are not replicated, 
otherwise the repair will fail")
     private boolean ignoreUnreplicatedKeyspaces = false;
 
@@ -187,6 +190,7 @@ public class Repair extends NodeToolCmd
         options.put(RepairOption.IGNORE_UNREPLICATED_KS, 
Boolean.toString(ignoreUnreplicatedKeyspaces));
         options.put(RepairOption.REPAIR_PAXOS_KEY, Boolean.toString(!skipPaxos 
&& getPreviewKind() == PreviewKind.NONE));
         options.put(RepairOption.PAXOS_ONLY_KEY, Boolean.toString(paxosOnly && 
getPreviewKind() == PreviewKind.NONE));
+        options.put(RepairOption.ACCORD_ONLY_KEY, Boolean.toString(accordOnly 
&& getPreviewKind() == PreviewKind.NONE));
 
         if (!startToken.isEmpty() || !endToken.isEmpty())
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java
index edb749fb21..cec5fdb2e5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.AsymmetricRemoteSyncTask;
-import org.apache.cassandra.repair.CassandraRepairJob;
+import org.apache.cassandra.repair.RepairJob;
 import org.apache.cassandra.repair.LocalSyncTask;
 import org.apache.cassandra.repair.SyncTask;
 import org.apache.cassandra.repair.TreeResponse;
@@ -107,7 +107,7 @@ public class OptimiseStreamsRepairTest extends TestBaseImpl
     {
         public static void install(ClassLoader cl, int id)
         {
-            new ByteBuddy().rebase(CassandraRepairJob.class)
+            new ByteBuddy().rebase(RepairJob.class)
                            
.method(named("createOptimisedSyncingSyncTasks").and(takesArguments(1)))
                            .intercept(MethodDelegation.to(BBHelper.class))
                            .make()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
new file mode 100644
index 0000000000..66cf1c0e00
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.local.CommandsForKey;
+import accord.impl.SimpleProgressLog;
+import accord.local.Node;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.Status;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.async.AsyncChains;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordSafeCommandStore;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.api.AccordAgent;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+
+import static java.lang.String.format;
+
+public class AccordIncrementalRepairTest extends AccordTestBase
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordIncrementalRepairTest.class);
+
+    public static class BarrierRecordingAgent extends AccordAgent
+    {
+        static class ExecutedBarrier
+        {
+            final Seekables<?, ?> keysOrRanges;
+            final @Nonnull Timestamp executeAt;
+
+            public ExecutedBarrier(Seekables<?, ?> keysOrRanges, @Nonnull 
Timestamp executeAt)
+            {
+                this.keysOrRanges = keysOrRanges;
+                this.executeAt = executeAt;
+            }
+
+            @Override
+            public String toString()
+            {
+                return "ExecutedBarrier{" +
+                       "keysOrRanges=" + keysOrRanges +
+                       ", executeAt=" + executeAt +
+                       '}';
+            }
+        }
+
+        private final List<ExecutedBarrier> barriers = new ArrayList<>();
+
+        @Override
+        public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, 
@Nonnull Timestamp executeAt)
+        {
+            super.onLocalBarrier(keysOrRanges, executeAt);
+            synchronized (barriers)
+            {
+                barriers.add(new ExecutedBarrier(keysOrRanges, executeAt));
+            }
+        }
+
+        public List<ExecutedBarrier> executedBarriers()
+        {
+            synchronized (barriers)
+            {
+                return ImmutableList.copyOf(barriers);
+            }
+        }
+
+        public void reset()
+        {
+            synchronized (barriers)
+            {
+                barriers.clear();
+            }
+        }
+
+    }
+
+    static BarrierRecordingAgent agent()
+    {
+        AccordService service = (AccordService) AccordService.instance();
+        return (BarrierRecordingAgent) service.node().agent();
+    }
+
+    static AccordService accordService()
+    {
+        return (AccordService) AccordService.instance();
+    }
+
+    @Override
+    protected Logger logger()
+    {
+        return logger;
+    }
+
+    @BeforeClass
+    public static void setupClass() throws Throwable
+    {
+        
CassandraRelevantProperties.ACCORD_AGENT_CLASS.setString(BarrierRecordingAgent.class.getName());
+//        setupCluster(opt -> opt.withConfig(conf -> 
conf.with(Feature.NETWORK, Feature.GOSSIP)), 3);
+        setupCluster(opt -> opt, 3);
+    }
+
+    @After
+    public void tearDown()
+    {
+        SHARED_CLUSTER.filters().reset();
+    }
+
+    private static void await(IInvokableInstance instance, 
IIsolatedExecutor.SerializableCallable<Boolean> check, long duration, TimeUnit 
unit)
+    {
+        instance.runOnInstance(() -> {
+            long timeout = Clock.Global.currentTimeMillis() + 
unit.toMillis(duration);
+            while (Clock.Global.currentTimeMillis() < timeout)
+            {
+                if (check.call())
+                    return;
+
+                try
+                {
+                    Thread.sleep(1);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+            throw new AssertionError("Timed out waiting for node 3 to become 
alive");
+        });
+    }
+
+    private static void awaitEndpointUp(IInvokableInstance instance, 
IInvokableInstance waitOn)
+    {
+        InetAddressAndPort endpoint = 
InetAddressAndPort.getByAddress(waitOn.broadcastAddress());
+        await(instance, () -> FailureDetector.instance.isAlive(endpoint), 1, 
TimeUnit.MINUTES);
+    }
+
+    private static void awaitEndpointDown(IInvokableInstance instance, 
IInvokableInstance waitOn)
+    {
+        InetAddressAndPort endpoint = 
InetAddressAndPort.getByAddress(waitOn.broadcastAddress());
+        await(instance, () -> !FailureDetector.instance.isAlive(endpoint), 1, 
TimeUnit.MINUTES);
+    }
+
+    private static <V> V getUninterruptibly(Future<V> future, long timeout, 
TimeUnit units)
+    {
+        try
+        {
+            return future.get(timeout, units);
+        }
+        catch (InterruptedException e)
+        {
+            throw new UncheckedInterruptedException(e);
+        }
+        catch (ExecutionException | TimeoutException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static <V> V getUninterruptibly(Future<V> future)
+    {
+        return getUninterruptibly(future, 1, TimeUnit.MINUTES);
+    }
+
+    private static TxnId awaitLocalApplyOnKey(PartitionKey key)
+    {
+        Node node = accordService().node();
+        AtomicReference<TxnId> waitFor = new AtomicReference<>(null);
+        
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(key),
 key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+            AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore;
+            CommandsForKey commands = store.maybeCommandsForKey(key).current();
+            int size = commands.size();
+            if (size < 1)
+                return;
+            // if txnId is an instance of CommandsForKey.TxnInfo, copying it 
into a
+            // new txnId instance will prevent any issues related to 
TxnInfo#hashCode
+            waitFor.set(new TxnId(commands.txnId(size - 1)));
+        }));
+        Assert.assertNotNull(waitFor.get());
+        TxnId txnId = waitFor.get();
+        long start = Clock.Global.currentTimeMillis();
+        AtomicBoolean applied = new AtomicBoolean(false);
+        while (!applied.get())
+        {
+            long now = Clock.Global.currentTimeMillis();
+            if (now - start > TimeUnit.MINUTES.toMillis(1))
+                throw new AssertionError("Timeout");
+            
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId),
 key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+                SafeCommand command = safeStore.get(txnId, key.toUnseekable());
+                Assert.assertNotNull(command.current());
+                if (command.current().status().hasBeen(Status.Applied))
+                    applied.set(true);
+            }));
+        }
+        return txnId;
+    }
+
+    @Test
+    public void txnRepairTest() throws Throwable
+    {
+        SHARED_CLUSTER.schemaChange(format("CREATE TABLE %s.%s (k int primary 
key, v int) WITH transactional_mode='full' AND fast_path={'size':2};", 
KEYSPACE, tableName));
+        final String keyspace = KEYSPACE;
+        final String table = tableName;
+
+        SHARED_CLUSTER.filters().allVerbs().to(3).drop();
+        awaitEndpointDown(SHARED_CLUSTER.get(1), SHARED_CLUSTER.get(3));
+
+        executeWithRetry(SHARED_CLUSTER, format("BEGIN TRANSACTION\n" +
+                                                "INSERT INTO %s (k, v) VALUES 
(1, 1);\n" +
+                                                "COMMIT TRANSACTION", 
qualifiedTableName));
+
+        SHARED_CLUSTER.get(1, 2).forEach(instance -> instance.runOnInstance(() 
-> {
+            TableMetadata metadata = 
Schema.instance.getTableMetadata(keyspace, table);
+            awaitLocalApplyOnKey(new PartitionKey(metadata.id, 
metadata.partitioner.decorateKey(ByteBufferUtil.bytes(1))));
+        }));
+
+        SHARED_CLUSTER.forEach(instance -> instance.runOnInstance(() -> 
agent().reset()));
+
+        SHARED_CLUSTER.get(1, 2).forEach(instance -> {
+            instance.runOnInstance(() -> {
+                ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+                
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+                Assert.assertFalse(cfs.getLiveSSTables().isEmpty());
+                cfs.getLiveSSTables().forEach(sstable -> {
+                    Assert.assertFalse(sstable.isRepaired());
+                    Assert.assertFalse(sstable.isPendingRepair());
+                });
+            });
+        });
+        SHARED_CLUSTER.get(3).runOnInstance(() -> {
+            ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+            cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+            Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
+        });
+
+        // heal partition and wait for node 1 to see node 3 again
+        for (IInvokableInstance instance : SHARED_CLUSTER)
+            instance.runOnInstance(() -> {
+                SimpleProgressLog.PAUSE_FOR_TEST = true;
+                Assert.assertTrue(agent().executedBarriers().isEmpty());
+            });
+        SHARED_CLUSTER.filters().reset();
+        awaitEndpointUp(SHARED_CLUSTER.get(1), SHARED_CLUSTER.get(3));
+        SHARED_CLUSTER.get(1).nodetool("repair", KEYSPACE);
+
+        SHARED_CLUSTER.forEach(instance -> {
+            instance.runOnInstance(() -> {
+                Assert.assertFalse( agent().executedBarriers().isEmpty());
+                ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+                Assert.assertFalse(cfs.getLiveSSTables().isEmpty());
+                cfs.getLiveSSTables().forEach(sstable -> {
+                    Assert.assertTrue(sstable.isRepaired() || 
sstable.isPendingRepair());
+                });
+            });
+        });
+    }
+
+    private void testSingleNodeWrite(TransactionalMode mode)
+    {
+        SHARED_CLUSTER.schemaChange(format("CREATE TABLE %s.%s (k int primary 
key, v int) WITH transactional_mode='%s';", KEYSPACE, tableName, mode));
+        final String keyspace = KEYSPACE;
+        final String table = tableName;
+
+        SHARED_CLUSTER.get(3).runOnInstance(() -> {
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (1, 2);", keyspace, table));
+        });
+
+        SHARED_CLUSTER.get(3).runOnInstance(() -> {
+            UntypedResultSet result = 
QueryProcessor.executeInternal(format("SELECT * FROM %s.%s WHERE k=1", 
keyspace, table));
+            Assert.assertFalse(result.isEmpty());
+            UntypedResultSet.Row row = Iterables.getOnlyElement(result);
+            Assert.assertEquals(1, row.getInt("k"));
+            Assert.assertEquals(2, row.getInt("v"));
+
+
+
+            ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+            cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+            Assert.assertFalse(cfs.getLiveSSTables().isEmpty());
+            cfs.getLiveSSTables().forEach(sstable -> {
+                Assert.assertFalse(sstable.isRepaired());
+                Assert.assertFalse(sstable.isPendingRepair());
+            });
+        });
+        SHARED_CLUSTER.get(1, 2).forEach(instance -> instance.runOnInstance(() 
-> {
+            UntypedResultSet result = 
QueryProcessor.executeInternal(format("SELECT * FROM %s.%s WHERE k=1", 
keyspace, table));
+            Assert.assertTrue(result.isEmpty());
+
+            ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+            cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+            Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
+        }));
+        SHARED_CLUSTER.forEach(instance -> instance.runOnInstance(() -> {
+            agent().reset();
+        }));
+
+        SHARED_CLUSTER.get(1).nodetool("repair", KEYSPACE);
+        SHARED_CLUSTER.forEach(instance -> instance.runOnInstance(() -> {
+            Assert.assertFalse( agent().executedBarriers().isEmpty());
+            ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(table);
+            Assert.assertFalse(cfs.getLiveSSTables().isEmpty());
+            cfs.getLiveSSTables().forEach(sstable -> {
+                Assert.assertTrue(sstable.isRepaired() || 
sstable.isPendingRepair());
+            });
+
+            UntypedResultSet result = 
QueryProcessor.executeInternal(format("SELECT * FROM %s.%s WHERE k=1", 
keyspace, table));
+            Assert.assertFalse(result.isEmpty());
+            UntypedResultSet.Row row = Iterables.getOnlyElement(result);
+            Assert.assertEquals(1, row.getInt("k"));
+            Assert.assertEquals(2, row.getInt("v"));
+        }));
+    }
+
+    /**
+     * a failed write at txn mode unsafe should be made visible by repair
+     */
+    @Test
+    public void unsafeRepairTest()
+    {
+        testSingleNodeWrite(TransactionalMode.unsafe);
+    }
+
+    /**
+     * Repair should repair (fully replicate _some_ state) any divergent state 
between replicas
+     */
+    @Test
+    public void fullRepairTest()
+    {
+        testSingleNodeWrite(TransactionalMode.full);
+    }
+
+    @Test
+    public void onlyAccordTest()
+    {
+        SHARED_CLUSTER.schemaChange(format("CREATE TABLE %s.%s (k int primary 
key, v int) WITH transactional_mode='full' AND fast_path={'size':2};", 
KEYSPACE, tableName));
+        final String keyspace = KEYSPACE;
+        final String table = tableName;
+
+        SHARED_CLUSTER.filters().allVerbs().to(3).drop();
+        awaitEndpointDown(SHARED_CLUSTER.get(1), SHARED_CLUSTER.get(3));
+        awaitEndpointDown(SHARED_CLUSTER.get(2), SHARED_CLUSTER.get(3));
+
+        executeWithRetry(SHARED_CLUSTER, format("BEGIN TRANSACTION\n" +
+                                                "INSERT INTO %s (k, v) VALUES 
(1, 1);\n" +
+                                                "COMMIT TRANSACTION", 
qualifiedTableName));
+
+        SHARED_CLUSTER.get(1, 2).forEach(instance -> instance.runOnInstance(() 
-> {
+            TableMetadata metadata = 
Schema.instance.getTableMetadata(keyspace, table);
+            awaitLocalApplyOnKey(new PartitionKey(metadata.id, 
metadata.partitioner.decorateKey(ByteBufferUtil.bytes(1))));
+        }));
+
+        SHARED_CLUSTER.forEach(instance -> instance.runOnInstance(() -> 
agent().reset()));
+
+        SHARED_CLUSTER.filters().reset();
+        awaitEndpointUp(SHARED_CLUSTER.get(1), SHARED_CLUSTER.get(3));
+        SHARED_CLUSTER.get(1).nodetool("repair", "--accord-only", KEYSPACE);
+
+        SHARED_CLUSTER.forEach(instance -> {
+            logger().info("checking instance {}", instance.broadcastAddress());
+            instance.runOnInstance(() -> {
+                Assert.assertFalse( agent().executedBarriers().isEmpty());
+            });
+        });
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
index f568d982f4..519ea15f70 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
@@ -570,7 +570,7 @@ public class AccordMigrationTest extends AccordTestBase
                  assertTargetPaxosWrite(runCasNoApply, 1, 
paxosMigratingKeys.next(), 2, 1, 1, 1, 1);
 
                  // Repair the currently migrating range from when targets 
were switched, but it's not an Accord repair, this is to make sure the wrong 
repair type doesn't trigger progress
-                 nodetool(coordinator, "repair", "-st", 
upperMidToken.toString(), "-et", maxAlignedWithLocalRanges.toString());
+                 nodetool(coordinator, "repair", "-st", 
upperMidToken.toString(), "-et", maxAlignedWithLocalRanges.toString(), 
"--paxos-only");
                  assertMigrationState(tableName, 
ConsensusMigrationTarget.paxos, ImmutableList.of(new Range(minToken, midToken), 
new Range(maxToken, minToken)), ImmutableList.of(accordMigratingRange), 1);
 
                  // Paxos migrating keys should still need key migration after 
non-Accord repair
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java
 
b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java
index b04df4b154..bf75f920ed 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java
@@ -97,7 +97,7 @@ class OnInstanceRepair extends ClusterAction
     {
         Collection<Range<Token>> ranges = rangesSupplier.call();
         // no need to wait for completion, as we track all task submissions 
and message exchanges, and ensure they finish before continuing to next action
-        StorageService.instance.repair(keyspaceName, new 
RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, 
ranges, false, force, PreviewKind.NONE, false, true, repairPaxos, 
repairOnlyPaxos, false), singletonList((tag, event) -> {
+        StorageService.instance.repair(keyspaceName, new 
RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, 
ranges, false, force, PreviewKind.NONE, false, true, repairPaxos, 
repairOnlyPaxos, false, false), singletonList((tag, event) -> {
             if (event.getType() == ProgressEventType.COMPLETE)
                 listener.run();
         }));
diff --git a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java 
b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
index 6caec1e8ec..27152901c3 100644
--- a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
+++ b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
@@ -66,12 +66,7 @@ public class FailingRepairFuzzTest extends FuzzTestBase
                 Cluster.Node coordinator = coordinatorGen.next(rs);
 
                 // exclude accord repair as this test breaks validation/sync; 
which accord doesn't have
-                RepairOption options;
-                do
-                {
-                    options = repairOption(rs, coordinator, KEYSPACE, TABLES);
-                }
-                while (options.accordRepair());
+                RepairOption options = repairOption(rs, coordinator, KEYSPACE, 
TABLES);
                 RepairCoordinator repair = coordinator.repair(KEYSPACE, 
options, false);
                 repair.run();
                 InetAddressAndPort failingAddress = pickParticipant(rs, 
coordinator, repair);
diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java 
b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index ea57c1b790..3cf67a6024 100644
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@ -382,26 +382,14 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
             for (JobState job : session.getJobs())
             {
                 EnumSet<JobState.State> expected = 
EnumSet.allOf(JobState.State.class);
-                if (repair.state.options.accordRepair())
+                if (!shouldSnapshot)
                 {
-                    // accord doesn't do snapshot, validation, or streaming
                     expected.remove(JobState.State.SNAPSHOT_START);
                     expected.remove(JobState.State.SNAPSHOT_COMPLETE);
-                    expected.remove(JobState.State.VALIDATION_START);
-                    expected.remove(JobState.State.VALIDATION_COMPLETE);
-                    expected.remove(JobState.State.STREAM_START);
                 }
-                else
+                if (!shouldSync)
                 {
-                    if (!shouldSnapshot)
-                    {
-                        expected.remove(JobState.State.SNAPSHOT_START);
-                        expected.remove(JobState.State.SNAPSHOT_COMPLETE);
-                    }
-                    if (!shouldSync)
-                    {
-                        expected.remove(JobState.State.STREAM_START);
-                    }
+                    expected.remove(JobState.State.STREAM_START);
                 }
                 Set<JobState.State> actual = 
job.getStateTimesMillis().keySet();
                 Assertions.assertThat(actual).isEqualTo(expected);
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java 
b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 1c26105909..2832368ecb 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -113,7 +113,7 @@ public class RepairJobTest
     private static InetAddressAndPort addr4;
     private static InetAddressAndPort addr5;
     private MeasureableRepairSession session;
-    private CassandraRepairJob job;
+    private RepairJob job;
     private RepairJobDesc sessionJobDesc;
 
     // So that threads actually get recycled and we can have accurate memory 
accounting while testing
@@ -125,11 +125,11 @@ public class RepairJobTest
         public MeasureableRepairSession(TimeUUID parentRepairSession, 
CommonRange commonRange, boolean excludedDeadNodes, String keyspace,
                                         RepairParallelism parallelismDegree, 
boolean isIncremental, boolean pullRepair,
                                         PreviewKind previewKind, boolean 
optimiseStreams, boolean repairPaxos, boolean paxosOnly,
-                                        boolean accordRepair, String... 
cfnames)
+                                        String... cfnames)
         {
             super(SharedContext.Global.instance, new Scheduler.NoopScheduler(),
                   parentRepairSession, commonRange, excludedDeadNodes, 
keyspace, parallelismDegree, isIncremental, pullRepair,
-                  previewKind, optimiseStreams, repairPaxos, paxosOnly, 
accordRepair, cfnames);
+                  previewKind, optimiseStreams, repairPaxos, paxosOnly, false, 
false, cfnames);
         }
 
         @Override
@@ -195,9 +195,9 @@ public class RepairJobTest
         this.session = new MeasureableRepairSession(parentRepairSession,
                                                     new CommonRange(neighbors, 
emptySet(), FULL_RANGE), false,
                                                     KEYSPACE, SEQUENTIAL, 
false, false,
-                                                    NONE, false, true, false, 
false, CF);
+                                                    NONE, false, true, false, 
CF);
 
-        this.job = new CassandraRepairJob(session, CF);
+        this.job = new RepairJob(session, CF);
         this.sessionJobDesc = new 
RepairJobDesc(session.state.parentRepairSession, session.getId(),
                                                 session.state.keyspace, CF, 
session.ranges());
 
@@ -267,7 +267,7 @@ public class RepairJobTest
 
         // Use addr4 instead of one of the provided trees to force everything 
to be remote sync tasks as
         // LocalSyncTasks try to reach over the network.
-        List<SyncTask> syncTasks = 
CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
sessionJobDesc, mockTreeResponses,
+        List<SyncTask> syncTasks = 
RepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
sessionJobDesc, mockTreeResponses,
                                                                      addr4, // 
local
                                                                      
noTransient(),
                                                                      
session.isIncremental,
@@ -367,7 +367,7 @@ public class RepairJobTest
                                                          treeResponse(addr2, 
RANGE_1, "different", RANGE_2, "same", RANGE_3, "different"),
                                                          treeResponse(addr3, 
RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"));
 
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC,
                                                                                
     treeResponses,
                                                                                
     addr1, // local
                                                                                
     noTransient(), // transient
@@ -403,7 +403,7 @@ public class RepairJobTest
         List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"),
                                                          treeResponse(addr2, 
RANGE_1, "different", RANGE_2, "same", RANGE_3, "different"));
 
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC,
                                                                                
     treeResponses,
                                                                                
     addr1, // local
                                                                                
     transientPredicate(addr2),
@@ -433,7 +433,7 @@ public class RepairJobTest
         List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"),
                                                          treeResponse(addr2, 
RANGE_1, "different", RANGE_2, "same", RANGE_3, "different"));
 
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC,
                                                                                
     treeResponses,
                                                                                
     addr1, // local
                                                                                
     transientPredicate(addr1),
@@ -493,7 +493,7 @@ public class RepairJobTest
         List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"),
                                                          treeResponse(addr2, 
RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"));
 
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC,
                                                                                
     treeResponses,
                                                                                
     local, // local
                                                                                
     isTransient,
@@ -511,13 +511,13 @@ public class RepairJobTest
                                                          treeResponse(addr2, 
RANGE_1, "two", RANGE_2, "two", RANGE_3, "two"),
                                                          treeResponse(addr3, 
RANGE_1, "three", RANGE_2, "three", RANGE_3, "three"));
 
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC,
                                                                                
     treeResponses,
                                                                                
     addr1, // local
                                                                                
     ep -> ep.equals(addr3), // transient
-                                                                               
             false,
-                                                                               
             true,
-                                                                               
             PreviewKind.ALL));
+                                                                               
     false,
+                                                                               
     true,
+                                                                               
     PreviewKind.ALL));
 
         assertThat(tasks).hasSize(3);
 
@@ -542,7 +542,7 @@ public class RepairJobTest
                                                          treeResponse(addr5, 
RANGE_1, "five", RANGE_2, "five", RANGE_3, "five"));
 
         Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || 
ep.equals(addr5);
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC,
                                                                                
     treeResponses,
                                                                                
     addr1, // local
                                                                                
     isTransient, // transient
@@ -609,7 +609,7 @@ public class RepairJobTest
                                                          treeResponse(addr5, 
RANGE_1, "five", RANGE_2, "five", RANGE_3, "five"));
 
         Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || 
ep.equals(addr5);
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC,
                                                                                
     treeResponses,
                                                                                
     local, // local
                                                                                
     isTransient, // transient
@@ -658,13 +658,13 @@ public class RepairJobTest
                                                          treeResponse(addr4, 
RANGE_1, "four", RANGE_2, "four", RANGE_3, "four"),
                                                          treeResponse(addr5, 
RANGE_1, "five", RANGE_2, "five", RANGE_3, "five"));
 
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createStandardSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(SharedContext.Global.instance, JOB_DESC,
                                                                                
     treeResponses,
                                                                                
     addr4, // local
                                                                                
     ep -> ep.equals(addr4) || ep.equals(addr5), // transient
-                                                                               
             false,
-                                                                               
             pullRepair,
-                                                                               
             PreviewKind.ALL));
+                                                                               
     false,
+                                                                               
     pullRepair,
+                                                                               
     PreviewKind.ALL));
 
         assertThat(tasks.get(pair(addr4, addr5))).isNull();
     }
@@ -676,13 +676,13 @@ public class RepairJobTest
                                                          treeResponse(addr2, 
RANGE_1, "two", RANGE_2, "two", RANGE_3, "two"),
                                                          treeResponse(addr3, 
RANGE_1, "three", RANGE_2, "three", RANGE_3, "three"));
 
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance,
 JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
                                                                                
             treeResponses,
                                                                                
             addr1, // local
                                                                                
             noTransient(),
                                                                                
             addr -> "DC1",
-                                                                               
                     false,
-                                                                               
                     PreviewKind.ALL));
+                                                                               
             false,
+                                                                               
             PreviewKind.ALL));
 
         for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
                                                      pair(addr1, addr3),
@@ -711,13 +711,13 @@ public class RepairJobTest
                                                          treeResponse(addr2, 
RANGE_1, "one", RANGE_2, "two"),
                                                          treeResponse(addr3, 
RANGE_1, "three", RANGE_2, "two"));
 
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance,
 JOB_DESC,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance, 
JOB_DESC,
                                                                                
             treeResponses,
                                                                                
             addr4, // local
                                                                                
             noTransient(),
                                                                                
             addr -> "DC1",
-                                                                               
                     false,
-                                                                               
                     PreviewKind.ALL));
+                                                                               
             false,
+                                                                               
             PreviewKind.ALL));
 
         
SyncTaskListAssert.assertThat(tasks.values()).areAllInstanceOf(AsymmetricRemoteSyncTask.class);
 
@@ -744,13 +744,13 @@ public class RepairJobTest
                                                          treeResponse(addr3, 
RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"));
 
         RepairJobDesc desc = new RepairJobDesc(nextTimeUUID(), nextTimeUUID(), 
"ks", "cf", Collections.emptyList());
-        Map<SyncNodePair, SyncTask> tasks = 
toMap(CassandraRepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance,
 desc,
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createOptimisedSyncingSyncTasks(SharedContext.Global.instance, 
desc,
                                                                                
             treeResponses,
                                                                                
             addr1, // local
                                                                                
             ep -> ep.equals(addr3),
                                                                                
             addr -> "DC1",
-                                                                               
                     false,
-                                                                               
                     PreviewKind.ALL));
+                                                                               
             false,
+                                                                               
             PreviewKind.ALL));
 
         SyncTask task = tasks.get(pair(addr1, addr2));
 
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java 
b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 5ef72a9e58..c59be70c7b 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -67,7 +67,7 @@ public class RepairSessionTest
                                                   new CommonRange(endpoints, 
Collections.emptySet(), Arrays.asList(repairRange)),
                                                   false, "Keyspace1", 
RepairParallelism.SEQUENTIAL,
                                                   false, false, 
PreviewKind.NONE, false,
-                                                  false, false, false, 
"Standard1");
+                                                  false, false, false, false, 
"Standard1");
 
         // perform convict
         session.convict(remote, Double.MAX_VALUE);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java
 
b/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java
index 1630b3c7c3..143f63a26c 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java
@@ -38,7 +38,6 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.cassandra.service.accord.AccordTestUtils.*;
-import static org.apache.cassandra.service.accord.AccordTopologyTest.token;
 
 public class AccordFastPathCoordinatorTest 
 {
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index 6d37f962ac..a608f41f8d 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -81,6 +81,9 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.TransactionStatement;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.metrics.AccordStateCacheMetrics;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
@@ -478,4 +481,20 @@ public class AccordTestUtils
     {
         return 
Arrays.stream(ids).mapToObj(AccordTestUtils::id).collect(Collectors.toSet());
     }
+
+    public static Token token(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    public static Range<Token> range(Token left, Token right)
+    {
+        return new Range<>(left, right);
+    }
+
+    public static Range<Token> range(long left, long right)
+    {
+        return range(token(left), token(right));
+    }
+
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
index e3ddf6eeb5..a0ada37645 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
@@ -18,15 +18,11 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -40,40 +36,20 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Keyspaces;
-import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.membership.Location;
-import org.apache.cassandra.tcm.membership.NodeAddresses;
-import org.apache.cassandra.tcm.membership.NodeId;
-import org.apache.cassandra.tcm.membership.NodeState;
-import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.ownership.DataPlacement;
-import org.apache.cassandra.tcm.ownership.DataPlacements;
 
 import static 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+import static org.apache.cassandra.service.accord.AccordTopologyUtils.*;
 
 public class AccordTopologyTest
 {
-    private static final Id ID1 = new Id(1);
-    private static final Id ID2 = new Id(2);
-    private static final Id ID3 = new Id(3);
-    private static final List<Id> NODE_LIST = ImmutableList.of(ID1, ID2, ID3);
-    private static final Set<Id> NODE_SET = ImmutableSet.copyOf(NODE_LIST);
-
-    private static final InetAddressAndPort EP1 = ep(1);
-    private static final InetAddressAndPort EP2 = ep(2);
-    private static final InetAddressAndPort EP3 = ep(3);
-
     private static final IPartitioner partitioner = 
Murmur3Partitioner.instance;
     private static TableId tableId = null;
     private static KeyspaceMetadata keyspace = null;
@@ -89,75 +65,6 @@ public class AccordTopologyTest
         keyspace = KeyspaceMetadata.create("ks", KeyspaceParams.simple(3), 
Tables.of(table));
     }
 
-    private static InetAddressAndPort ep(int i)
-    {
-        try
-        {
-            return 
InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(new 
byte[]{127, 0, 0, (byte)i}), 7012);
-        }
-        catch (UnknownHostException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static NodeId nodeId(int id)
-    {
-        return new NodeId(id);
-    }
-
-    static void addNode(ClusterMetadata.Transformer transformer, int node, 
Token token)
-    {
-        NodeId nodeId = nodeId(node);
-        InetAddressAndPort ep = ep(node);
-        NodeAddresses addresses = new NodeAddresses(nodeId.toUUID(), ep, ep, 
ep);
-        transformer.register(nodeId, addresses, LOCATION, NodeVersion.CURRENT);
-        transformer.withNodeState(nodeId, NodeState.JOINED);
-        transformer.proposeToken(nodeId, Collections.singleton(token));
-        transformer.addToRackAndDC(nodeId);
-    }
-
-    private static ClusterMetadata configureCluster(List<Range<Token>> ranges, 
Keyspaces keyspaces)
-    {
-        assert ranges.size() == 3;
-
-        IPartitioner partitioner = Murmur3Partitioner.instance;
-        ClusterMetadata empty = new ClusterMetadata(partitioner);
-        ClusterMetadata.Transformer transformer = empty.transformer();
-        transformer.with(new DistributedSchema(Keyspaces.of(keyspace)));
-        addNode(transformer, 1, ranges.get(0).right);
-        addNode(transformer, 2, ranges.get(1).right);
-        addNode(transformer, 3, ranges.get(2).right);
-        ClusterMetadata metadata = transformer.build().metadata;
-
-        for (KeyspaceMetadata keyspace : keyspaces)
-        {
-            ReplicationParams replication = keyspace.params.replication;
-            AbstractReplicationStrategy strategy = 
AbstractReplicationStrategy.createReplicationStrategy(keyspace.name, 
replication);
-            DataPlacements.Builder placements = metadata.placements.unbuild();
-            DataPlacement placement = 
strategy.calculateDataPlacement(Epoch.EMPTY, metadata.tokenMap.toRanges(), 
metadata);
-            placements.with(replication, placement);
-            metadata = transformer.with(placements.build()).build().metadata;
-        }
-
-        return metadata;
-    }
-
-    static Token token(long t)
-    {
-        return new Murmur3Partitioner.LongToken(t);
-    }
-
-    static Range<Token> range(Token left, Token right)
-    {
-        return new Range<>(left, right);
-    }
-
-    static Range<Token> range(long left, long right)
-    {
-        return range(token(left), token(right));
-    }
-
     /**
      * Check converter does the right thing if the ring is constructed with 
min and max tokens
      */
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java
new file mode 100644
index 0000000000..c3c8e6b588
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import accord.local.Node;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+
+public class AccordTopologyUtils
+{
+    public static final Node.Id ID1 = new Node.Id(1);
+    public static final Node.Id ID2 = new Node.Id(2);
+    public static final Node.Id ID3 = new Node.Id(3);
+    public static final List<Node.Id> NODE_LIST = ImmutableList.of(ID1, ID2, 
ID3);
+    public static final Set<Node.Id> NODE_SET = ImmutableSet.copyOf(NODE_LIST);
+
+    private static final IPartitioner partitioner = 
Murmur3Partitioner.instance;
+    private static final Location LOCATION = new Location("DC1", "RACK1");
+
+    public static InetAddressAndPort ep(int i)
+    {
+        try
+        {
+            return 
InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(new 
byte[]{ 127, 0, 0, (byte)i}), 7012);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static NodeId nodeId(int id)
+    {
+        return new NodeId(id);
+    }
+
+    static void addNode(ClusterMetadata.Transformer transformer, int node, 
Token token)
+    {
+        NodeId nodeId = nodeId(node);
+        InetAddressAndPort ep = ep(node);
+        NodeAddresses addresses = new NodeAddresses(nodeId.toUUID(), ep, ep, 
ep);
+        transformer.register(nodeId, addresses, LOCATION, NodeVersion.CURRENT);
+        transformer.withNodeState(nodeId, NodeState.JOINED);
+        transformer.proposeToken(nodeId, Collections.singleton(token));
+        transformer.addToRackAndDC(nodeId);
+    }
+
+    public static ClusterMetadata configureCluster(List<Range<Token>> ranges, 
Keyspaces keyspaces)
+    {
+        assert ranges.size() == 3;
+
+        IPartitioner partitioner = Murmur3Partitioner.instance;
+        ClusterMetadata empty = new ClusterMetadata(partitioner);
+        ClusterMetadata.Transformer transformer = empty.transformer();
+        transformer.with(new DistributedSchema(keyspaces));
+        addNode(transformer, 1, ranges.get(0).right);
+        addNode(transformer, 2, ranges.get(1).right);
+        addNode(transformer, 3, ranges.get(2).right);
+        ClusterMetadata metadata = transformer.build().metadata;
+
+        for (KeyspaceMetadata keyspace : keyspaces)
+        {
+            ReplicationParams replication = keyspace.params.replication;
+            AbstractReplicationStrategy strategy = 
AbstractReplicationStrategy.createReplicationStrategy(keyspace.name, 
replication);
+            DataPlacements.Builder placements = metadata.placements.unbuild();
+            DataPlacement placement = 
strategy.calculateDataPlacement(Epoch.EMPTY, metadata.tokenMap.toRanges(), 
metadata);
+            placements.with(replication, placement);
+            metadata = transformer.with(placements.build()).build().metadata;
+        }
+
+        return metadata;
+    }
+
+    static Token token(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    static Range<Token> range(Token left, Token right)
+    {
+        return new Range<>(left, right);
+    }
+
+    public static Range<Token> range(long left, long right)
+    {
+        return range(token(left), token(right));
+    }
+
+}
diff --git 
a/test/unit/org/apache/cassandra/service/accord/repair/RequiredResponseTrackerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/repair/RequiredResponseTrackerTest.java
new file mode 100644
index 0000000000..022543bb3a
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/accord/repair/RequiredResponseTrackerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.repair;
+
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import accord.api.TopologySorter;
+import accord.coordinate.tracking.RequestStatus;
+import accord.local.Node;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.accord.AccordTopology;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Location;
+
+import static 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+import static org.apache.cassandra.service.accord.AccordTopologyUtils.*;
+
+public class RequiredResponseTrackerTest
+{
+    private static final IPartitioner partitioner = 
Murmur3Partitioner.instance;
+    private static TableId tableId = null;
+    private static KeyspaceMetadata keyspace = null;
+    private static Topology topology;
+    private static final Location LOCATION = new Location("DC1", "RACK1");
+
+    private static final List<Range<Token>> RANGES = 
ImmutableList.of(range(-100, 0), range(0, 100), range(100, -100));
+    private static final TopologySorter TOPOLOGY_SORTER = (node1, node2, 
shards) -> node1.compareTo(node2);
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+        TableMetadata table = parse("CREATE TABLE tbl (k int, c int, v int, 
primary key (k, c)) WITH transactional_mode='full'", "ks").build();
+        tableId = table.id;
+        keyspace = KeyspaceMetadata.create("ks", KeyspaceParams.simple(3), 
Tables.of(table));
+
+        ClusterMetadata metadata = configureCluster(RANGES, 
Keyspaces.of(keyspace));
+        topology = AccordTopology.createAccordTopology(metadata);
+
+    }
+
+    @Test
+    public void successCase()
+    {
+        Set<Node.Id> nodes = topology.nodes();
+        Assert.assertEquals(NODE_SET, nodes);
+        RequiredResponseTracker tracker = new RequiredResponseTracker(nodes, 
new Topologies.Single(TOPOLOGY_SORTER, topology));
+        Assert.assertEquals(RequestStatus.NoChange, 
tracker.recordSuccess(ID1));
+        Assert.assertEquals(RequestStatus.NoChange, 
tracker.recordSuccess(ID2));
+        Assert.assertEquals(RequestStatus.Success, tracker.recordSuccess(ID3));
+    }
+
+    @Test
+    public void failureCase()
+    {
+        Set<Node.Id> nodes = topology.nodes();
+        Assert.assertEquals(NODE_SET, nodes);
+        RequiredResponseTracker tracker = new RequiredResponseTracker(nodes, 
new Topologies.Single(TOPOLOGY_SORTER, topology));
+        Assert.assertEquals(RequestStatus.NoChange, 
tracker.recordSuccess(ID1));
+        Assert.assertEquals(RequestStatus.Failed, tracker.recordFailure(ID2));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to