This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 33918934c7 (Accord) Fix recovery when one shard is committed/stable 
and another is preaccepted Also fix:  - Topology slicing must declare whether 
we share/slice node ownership (to assist above)  - CFK.visit removes transitive 
dependencies too eagerly across epoch change  - apply cleanup to builder 
consistently, and construct the same value we would produce by purge (so that 
replay is idempotent)  - Invoke ExecuteTxn.LocalExecute callbacks on 
originating CommandStore  - misc other [...]
33918934c7 is described below

commit 33918934c702d4d3e1d846b236aacf6b9c795e8c
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Feb 13 00:36:59 2025 +0000

    (Accord) Fix recovery when one shard is committed/stable and another is 
preaccepted
    Also fix:
     - Topology slicing must declare whether we share/slice node ownership (to 
assist above)
     - CFK.visit removes transitive dependencies too eagerly across epoch change
     - apply cleanup to builder consistently, and construct the same value we 
would produce by purge (so that replay is idempotent)
     - Invoke ExecuteTxn.LocalExecute callbacks on originating CommandStore
     - misc other minor issues
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20325
---
 modules/accord                                     |   2 +-
 .../db/compaction/CompactionIterator.java          |  41 +++---
 src/java/org/apache/cassandra/dht/Token.java       |   2 +-
 .../cassandra/index/accord/RouteJournalIndex.java  |   4 +-
 .../service/accord/AccordCommandStore.java         |   2 +-
 .../cassandra/service/accord/AccordJournal.java    |  15 +--
 .../service/accord/AccordJournalTable.java         |   2 +-
 .../service/accord/AccordMessageSink.java          |  29 ++--
 .../cassandra/service/accord/AccordService.java    |   5 +-
 .../service/accord/RouteInMemoryIndex.java         |   6 +-
 .../cassandra/service/accord/api/AccordAgent.java  |  54 +++++++-
 .../service/accord/api/AccordRoutingKey.java       |   3 +-
 .../accord/interop/AccordInteropAdapter.java       |  29 ++--
 .../service/accord/interop/AccordInteropApply.java |   7 +-
 .../accord/interop/AccordInteropExecution.java     |   8 +-
 .../accord/interop/AccordInteropPersist.java       |   2 +-
 .../accord/repair/RepairSyncPointAdapter.java      |   4 +-
 .../accord/serializers/AcceptSerializers.java      | 120 +++++++----------
 .../serializers/BeginInvalidationSerializers.java  |   6 +-
 .../accord/serializers/CommandSerializers.java     |  70 +++++++++-
 .../service/accord/serializers/KeySerializers.java |  24 ++--
 .../accord/serializers/LatestDepsSerializers.java  |   5 +-
 .../accord/serializers/RecoverySerializers.java    |   6 +-
 .../cassandra/service/accord/txn/TxnNamedRead.java |  28 ++++
 .../cassandra/service/accord/txn/TxnRead.java      | 146 +++++++++++++++++----
 .../cassandra/service/accord/txn/TxnWrite.java     |   3 -
 .../accord/AccordMigrationWriteRaceTestBase.java   |   4 +-
 .../distributed/test/accord/AccordTestBase.java    |   4 +-
 .../cassandra/simulator/paxos/PaxosSimulation.java |   1 -
 test/unit/org/apache/cassandra/Util.java           |   3 +-
 .../service/accord/AccordCommandTest.java          |   2 +-
 .../service/accord/AccordJournalOrderTest.java     |   6 +-
 .../service/accord/AccordMessageSinkTest.java      |   4 +-
 .../accord/SimulatedAccordCommandStore.java        |   2 +-
 .../serializers/CommandsForKeySerializerTest.java  |   3 +-
 35 files changed, 435 insertions(+), 217 deletions(-)

diff --git a/modules/accord b/modules/accord
index 74a3b81ca9..1542da5d8a 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 74a3b81ca9d9e1ce7ddfd117682fc7c310f0cd99
+Subproject commit 1542da5d8acf08b8226227c91d4f81c64ed9762c
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 198f1941d2..dbee4e8893 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -40,6 +40,7 @@ import accord.local.CommandStores;
 import accord.local.DurableBefore;
 import accord.local.RedundantBefore;
 import accord.utils.Invariants;
+import accord.utils.UnhandledEnum;
 import org.agrona.collections.Int2ObjectHashMap;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -98,7 +99,9 @@ import 
org.apache.cassandra.service.paxos.uncommitted.PaxosRows;
 import org.apache.cassandra.utils.TimeUUID;
 
 import static accord.local.Cleanup.ERASE;
+import static accord.local.Cleanup.EXPUNGE;
 import static accord.local.Cleanup.Input.PARTIAL;
+import static accord.local.Cleanup.NO;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static org.apache.cassandra.config.Config.PaxosStatePurging.legacy;
@@ -912,22 +915,30 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                 RedundantBefore redundantBefore = 
redundantBefores.get(key.commandStoreId);
                 DurableBefore durableBefore = 
durableBefores.get(key.commandStoreId);
                 Cleanup cleanup = commandBuilder.shouldCleanup(PARTIAL, agent, 
redundantBefore, durableBefore);
-                if (cleanup == ERASE)
-                    return PartitionUpdate.fullPartitionDelete(metadata(), 
partition.partitionKey(), Long.MAX_VALUE, nowInSec).unfilteredIterator();
-
-                commandBuilder = commandBuilder.maybeCleanup(cleanup);
-                if (commandBuilder != builder)
+                if (cleanup != NO)
                 {
-                    if (commandBuilder == null)
-                        return null;
-
-                    PartitionUpdate.SimpleBuilder newVersion = 
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
-
-                    Row.SimpleBuilder rowBuilder = 
newVersion.row(firstClustering);
-                    rowBuilder.add("record", 
commandBuilder.asByteBuffer(redundantBefore, userVersion))
-                              .add("user_version", userVersion);
-
-                    return newVersion.build().unfilteredIterator();
+                    switch (cleanup)
+                    {
+                        default: throw new UnhandledEnum(cleanup);
+                        case EXPUNGE:
+                            return null;
+                        case ERASE:
+                            return 
PartitionUpdate.fullPartitionDelete(metadata(), partition.partitionKey(), 
Long.MAX_VALUE, nowInSec).unfilteredIterator();
+                        case TRUNCATE:
+                        case INVALIDATE:
+                        case TRUNCATE_WITH_OUTCOME:
+                        case VESTIGIAL:
+                            if (commandBuilder.maybeCleanup(PARTIAL, cleanup))
+                            {
+                                PartitionUpdate.SimpleBuilder newVersion = 
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
+
+                                Row.SimpleBuilder rowBuilder = 
newVersion.row(firstClustering);
+                                rowBuilder.add("record", 
commandBuilder.asByteBuffer(redundantBefore, userVersion))
+                                          .add("user_version", userVersion);
+
+                                return newVersion.build().unfilteredIterator();
+                            }
+                    }
                 }
 
                 return PartitionUpdate.multiRowUpdate(AccordKeyspace.Journal, 
partition.partitionKey(), rows)
diff --git a/src/java/org/apache/cassandra/dht/Token.java 
b/src/java/org/apache/cassandra/dht/Token.java
index 048d1a0394..f76300e80d 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -187,7 +187,7 @@ public abstract class Token implements RingPosition<Token>, 
Serializable
         }
     }
 
-    public static volatile boolean logPartitioner = false;
+    public static boolean logPartitioner = false;
     public static final Set<Class<? extends IPartitioner>> 
serializePartitioners = Sets.newSetFromMap(new ConcurrentHashMap<>());
     public static final Set<Class<? extends IPartitioner>> 
deserializePartitioners = Sets.newSetFromMap(new ConcurrentHashMap<>());
 
diff --git a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java 
b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
index c03f840533..d668f3f658 100644
--- a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
+++ b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
@@ -90,6 +90,8 @@ import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.FutureCombiner;
 
+import static accord.primitives.Routable.Domain.Range;
+
 public class RouteJournalIndex implements Index, INotificationConsumer
 {
     public enum RegisterStatus
@@ -131,7 +133,7 @@ public class RouteJournalIndex implements Index, 
INotificationConsumer
 
     public static boolean allowed(TxnId id)
     {
-        return id.domain().isRange();
+        return id.is(Range);
     }
 
     private static void validateTargets(ColumnFamilyStore baseCfs, 
IndexMetadata indexMetadata)
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 8086cecdbc..4c723fa104 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -308,7 +308,7 @@ public class AccordCommandStore extends CommandStore
     }
 
     @Override
-    public <T> AsyncChain<T> submit(Callable<T> task)
+    public <T> AsyncChain<T> build(Callable<T> task)
     {
         return AsyncChains.ofCallable(taskExecutor(), task);
     }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 9a266788e2..4614ba24ba 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -544,7 +544,6 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                     continue;
                 }
 
-
                 switch (field)
                 {
                     case EXECUTE_AT:
@@ -557,7 +556,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                         
out.writeUnsignedVInt(command.waitingOn().minUniqueHlc());
                         break;
                     case SAVE_STATUS:
-                        out.writeShort(command.saveStatus().ordinal());
+                        out.writeByte(command.saveStatus().ordinal());
                         break;
                     case DURABILITY:
                         out.writeByte(command.durability().ordinal());
@@ -631,12 +630,6 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
             }
         }
 
-        public Builder maybeCleanup(Cleanup cleanup)
-        {
-            super.maybeCleanup(cleanup);
-            return this;
-        }
-
         public void serialize(DataOutputPlus out, RedundantBefore 
redundantBefore, int userVersion) throws IOException
         {
             Invariants.require(mask == 0);
@@ -697,7 +690,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                     minUniqueHlc = in.readUnsignedVInt();
                     break;
                 case SAVE_STATUS:
-                    saveStatus = SaveStatus.values()[in.readShort()];
+                    saveStatus = SaveStatus.values()[in.readByte()];
                     break;
                 case DURABILITY:
                     durability = Durability.values()[in.readByte()];
@@ -748,14 +741,14 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                     in.readUnsignedVInt();
                     break;
                 case SAVE_STATUS:
-                    in.readShort();
+                    in.readByte();
                     break;
                 case DURABILITY:
                     in.readByte();
                     break;
                 case ACCEPTED:
                 case PROMISED:
-                    CommandSerializers.ballot.skip(in, userVersion);
+                    CommandSerializers.ballot.skip(in);
                     break;
                 case PARTICIPANTS:
                     // TODO (expected): skip
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index 5b16752e32..8eb144c4f5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -231,7 +231,7 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
         @Override
         public void accept(long segment, int position, K key, ByteBuffer 
buffer, int userVersion)
         {
-            if (!tableRecordConsumer.visited(segment)) //TODO: don't need this 
anymore
+            if (!tableRecordConsumer.visited(segment)) //TODO (required): 
don't need this anymore
                 delegate.accept(segment, position, key, buffer, userVersion);
         }
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index cf8f8c7515..bdb9f03584 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.api.Agent;
 import accord.api.MessageSink;
 import accord.impl.RequestCallbacks;
 import accord.local.AgentExecutor;
@@ -46,19 +45,16 @@ import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.messages.TxnRequest;
 import accord.primitives.TxnId;
-import org.apache.cassandra.config.AccordSpec;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageDelivery;
 import org.apache.cassandra.net.MessageFlag;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.ResponseContext;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.service.TimeoutStrategy;
-import org.apache.cassandra.service.TimeoutStrategy.LatencySourceFactory;
+import org.apache.cassandra.service.accord.api.AccordAgent;
 import org.apache.cassandra.utils.Clock;
 
 import static accord.messages.MessageType.Kind.REMOTE;
@@ -211,29 +207,20 @@ public class AccordMessageSink implements MessageSink
         return null;
     }
 
-    private final Agent agent;
+    private final AccordAgent agent;
     private final MessageDelivery messaging;
     private final AccordEndpointMapper endpointMapper;
     private final RequestCallbacks callbacks;
-    // TODO (required): make hot property
-    private TimeoutStrategy slowPreaccept, slowRead;
 
-    public AccordMessageSink(Agent agent, MessageDelivery messaging, 
AccordEndpointMapper endpointMapper, RequestCallbacks callbacks)
+    public AccordMessageSink(AccordAgent agent, MessageDelivery messaging, 
AccordEndpointMapper endpointMapper, RequestCallbacks callbacks)
     {
-        AccordSpec config = DatabaseDescriptor.getAccord();
-        if (config != null)
-        {
-            // TODO (expected): introduce better metrics, esp. for preaccept, 
but also to disambiguate DC latencies
-            slowPreaccept = new TimeoutStrategy(config.slowPreAccept, 
LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
-            slowRead = new TimeoutStrategy(config.slowRead, 
LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
-        }
         this.agent = agent;
         this.messaging = messaging;
         this.endpointMapper = endpointMapper;
         this.callbacks = callbacks;
     }
 
-    public AccordMessageSink(Agent agent, AccordConfigurationService 
endpointMapper, RequestCallbacks callbacks)
+    public AccordMessageSink(AccordAgent agent, AccordConfigurationService 
endpointMapper, RequestCallbacks callbacks)
     {
         this(agent, MessagingService.instance(), endpointMapper, callbacks);
     }
@@ -285,17 +272,17 @@ public class AccordMessageSink implements MessageSink
         {
             case ACCORD_READ_REQ:
             case ACCORD_STABLE_THEN_READ_REQ:
-                if (slowRead == null || isRangeBarrier(request))
+                if (agent.slowRead() == null || isRangeBarrier(request))
                     break;
 
             case ACCORD_CHECK_STATUS_REQ:
-                delayedAtNanos = nowNanos + slowRead.computeWait(1, 
NANOSECONDS);
+                delayedAtNanos = nowNanos + agent.slowRead().computeWait(1, 
NANOSECONDS);
                 break;
 
             case ACCORD_PRE_ACCEPT_REQ:
-                if (slowPreaccept == null || isRangeBarrier(request))
+                if (agent.slowPreaccept() == null || isRangeBarrier(request))
                     break;
-                delayedAtNanos = nowNanos + slowPreaccept.computeWait(1, 
NANOSECONDS);
+                delayedAtNanos = nowNanos + 
agent.slowPreaccept().computeWait(1, NANOSECONDS);
         }
 
         Message<Request> message = Message.out(verb, request, expiresAtNanos);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 81f992eec1..931a8ecead 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -176,6 +176,7 @@ import static accord.messages.SimpleReply.Ok;
 import static accord.primitives.Routable.Domain.Key;
 import static accord.primitives.Routable.Domain.Range;
 import static accord.primitives.TxnId.Cardinality.cardinality;
+import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
 import static accord.utils.Invariants.require;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -957,8 +958,6 @@ public class AccordService implements IAccordService, 
Shutdownable
             }
             else if (cause instanceof RuntimeException)
                 throw (RuntimeException) cause;
-            else if (cause instanceof InvalidRequestException)
-                throw ((InvalidRequestException)cause);
             else
                 throw new RuntimeException(cause);
         }
@@ -1486,7 +1485,7 @@ public class AccordService implements IAccordService, 
Shutdownable
 
         private Await(Node node, SyncPoint<?> exclusiveSyncPoint)
         {
-            Topologies topologies = 
node.topology().forEpoch(exclusiveSyncPoint.route, 
exclusiveSyncPoint.syncId.epoch());
+            Topologies topologies = 
node.topology().forEpoch(exclusiveSyncPoint.route, 
exclusiveSyncPoint.syncId.epoch(), SHARE);
             this.node = node;
             this.tracker = new AllTracker(topologies);
             this.exclusiveSyncPoint = exclusiveSyncPoint;
diff --git 
a/src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java 
b/src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java
index 722d7d6f0c..e0861d661e 100644
--- a/src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java
+++ b/src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java
@@ -26,7 +26,6 @@ import java.util.NavigableSet;
 import java.util.TreeSet;
 import java.util.function.Consumer;
 
-import accord.primitives.Routable;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -45,6 +44,8 @@ import org.apache.cassandra.utils.FastByteOperations;
 import org.apache.cassandra.utils.RTree;
 import org.apache.cassandra.utils.RangeTree;
 
+import static accord.primitives.Routable.Domain.Range;
+
 public class RouteInMemoryIndex<V> implements RangeSearcher
 {
     private final Long2ObjectHashMap<SegmentIndex> segmentIndexes = new 
Long2ObjectHashMap<>();
@@ -144,8 +145,7 @@ public class RouteInMemoryIndex<V> implements RangeSearcher
 
         private void add(TxnId id, Unseekable keyOrRange)
         {
-            if (keyOrRange.domain() != Routable.Domain.Range)
-                throw new IllegalArgumentException("Unexpected domain: " + 
keyOrRange.domain());
+            Invariants.require(keyOrRange.domain() == Range);
             TokenRange ts = (TokenRange) keyOrRange;
             TableId tableId = ts.table();
             tableIndex.computeIfAbsent(tableId, i -> new TableIndex()).add(id, 
ts);
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 466914eae6..b640f47c37 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -51,9 +51,13 @@ import accord.utils.DefaultRandom;
 import accord.utils.Invariants;
 import accord.utils.RandomSource;
 import accord.utils.SortedList;
+import accord.utils.UnhandledEnum;
+import org.apache.cassandra.config.AccordSpec;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.metrics.AccordMetrics;
+import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
 import org.apache.cassandra.net.ResponseContext;
+import org.apache.cassandra.service.TimeoutStrategy;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.txn.TxnQuery;
 import org.apache.cassandra.service.accord.txn.TxnRead;
@@ -73,12 +77,18 @@ public class AccordAgent implements Agent
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordAgent.class);
 
-    protected Node.Id self;
-
     // TODO (required): this should be configurable and have exponential 
back-off, escaping to operator input past a certain number of retries
     private long retryBootstrapDelayMicros = SECONDS.toMicros(1L);
     private final RandomSource random = new DefaultRandom();
 
+    // TODO (required): make hot property
+    private TimeoutStrategy slowPreaccept, slowRead;
+    protected Node.Id self;
+
+    public AccordAgent()
+    {
+    }
+
     public void setNodeId(Node.Id id)
     {
         self = id;
@@ -247,11 +257,23 @@ public class AccordAgent implements Agent
         return units.convert((1L << Math.min(retryCount, 4)), SECONDS);
     }
 
+    @Override
+    public long localSlowAt(TxnId txnId, Status.Phase phase, TimeUnit unit)
+    {
+        switch (phase)
+        {
+            default: throw new UnhandledEnum(phase);
+            case PreAccept: return 
unit.convert(slowPreaccept().computeWaitUntil(1), NANOSECONDS);
+            case Execute:   return 
unit.convert(slowRead().computeWaitUntil(1), NANOSECONDS);
+        }
+    }
+
     @Override
     public long localExpiresAt(TxnId txnId, Status.Phase phase, TimeUnit unit)
     {
         // TODO (expected): make this configurable
-        return txnId.is(Write) ? DatabaseDescriptor.getWriteRpcTimeout(unit) : 
DatabaseDescriptor.getReadRpcTimeout(unit);
+        return txnId.is(Write) ? DatabaseDescriptor.getWriteRpcTimeout(unit)
+                               : DatabaseDescriptor.getReadRpcTimeout(unit);
     }
 
     @Override
@@ -265,4 +287,30 @@ public class AccordAgent implements Agent
     {
         logger.error(message);
     }
+
+    public TimeoutStrategy slowRead()
+    {
+        if (slowRead == null)
+        {
+            synchronized (this)
+            {
+                AccordSpec config = DatabaseDescriptor.getAccord();
+                slowRead = new TimeoutStrategy(config.slowRead, 
TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
+            }
+        }
+        return slowRead;
+    }
+
+    public TimeoutStrategy slowPreaccept()
+    {
+        if (slowPreaccept == null)
+        {
+            synchronized (this)
+            {
+                AccordSpec config = DatabaseDescriptor.getAccord();
+                slowPreaccept = new TimeoutStrategy(config.slowPreAccept, 
TimeoutStrategy.LatencySourceFactory.of(ClientRequestsMetricsHolder.accordReadMetrics));
+            }
+        }
+        return slowPreaccept;
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 61454eab64..f3db1790f3 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -33,6 +33,7 @@ import accord.local.ShardDistributor;
 import accord.primitives.Range;
 import accord.primitives.RangeFactory;
 import accord.primitives.Ranges;
+import accord.utils.Invariants;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.dht.IPartitioner;
@@ -257,6 +258,7 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
         public void serialize(T key, DataOutputPlus out, int version) throws 
IOException
         {
             key.table.serializeCompact(out);
+            Invariants.require(key.token.getPartitioner() == getPartitioner());
             Token.compactSerializer.serialize(key.token, out, version);
         }
 
@@ -264,7 +266,6 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
         public void skip(DataInputPlus in, int version) throws IOException
         {
             TableId.skipCompact(in);
-            // TODO (expected): should we be using the TableId partitioner 
here?
             Token.compactSerializer.skip(in, getPartitioner(), version);
         }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
index fb3d8a606c..435c02f26c 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
@@ -26,9 +26,8 @@ import org.slf4j.LoggerFactory;
 import accord.api.Result;
 import accord.api.Update;
 import accord.coordinate.CoordinationAdapter;
-import accord.coordinate.CoordinationAdapter.Adapters.AbstractTxnAdapter;
+import accord.coordinate.CoordinationAdapter.Adapters.TxnAdapter;
 import accord.coordinate.ExecutePath;
-import accord.coordinate.PersistTxn;
 import accord.local.Node;
 import accord.messages.Apply;
 import accord.primitives.Deps;
@@ -39,6 +38,7 @@ import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.topology.Topologies;
+import accord.topology.Topologies.SelectNodeOwnership;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.service.accord.AccordEndpointMapper;
 import org.apache.cassandra.service.accord.api.AccordAgent;
@@ -49,7 +49,7 @@ import org.apache.cassandra.service.accord.txn.TxnRead;
 import static accord.messages.Apply.Kind.Maximal;
 import static accord.messages.Apply.Kind.Minimal;
 
-public class AccordInteropAdapter extends AbstractTxnAdapter
+public class AccordInteropAdapter extends TxnAdapter
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordInteropAdapter.class);
     public static final class AccordInteropFactory implements 
CoordinationAdapter.Factory
@@ -76,29 +76,29 @@ public class AccordInteropAdapter extends AbstractTxnAdapter
 
     private AccordInteropAdapter(InteropExecutor executor, 
AccordEndpointMapper endpointMapper, Apply.Kind applyKind)
     {
+        super(Minimal);
         this.executor = executor;
         this.endpointMapper = endpointMapper;
         this.applyKind = applyKind;
     }
 
     @Override
-    public void execute(Node node, Topologies all, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super Result, Throwable> callback)
+    public void execute(Node node, Topologies any, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps, 
Deps sendDeps, BiConsumer<? super Result, Throwable> callback)
     {
-        if (!doInteropExecute(node, route, txnId, txn, executeAt, deps, 
callback))
-            super.execute(node, all, route, path, txnId, txn, executeAt, deps, 
callback);
+        if (!doInteropExecute(node, route, txnId, txn, executeAt, stableDeps, 
callback))
+            super.execute(node, any, route, path, txnId, txn, executeAt, 
stableDeps, sendDeps, callback);
     }
 
     @Override
-    public void persist(Node node, Topologies all, FullRoute<?> fullRoute, 
Route<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes 
writes, Result result, BiConsumer<? super Result, Throwable> callback)
+    public void persist(Node node, Topologies any, Route<?> require, Route<?> 
sendTo, SelectNodeOwnership selectSendTo, FullRoute<?> route, TxnId txnId, Txn 
txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? 
super Result, Throwable> callback)
     {
-        if (applyKind == Minimal && doInteropPersist(node, all, sendTo, txnId, 
txn, executeAt, deps, writes, result, fullRoute, callback))
+        if (applyKind == Minimal && doInteropPersist(node, any, require, 
sendTo, selectSendTo, txnId, txn, executeAt, deps, writes, result, route, 
callback))
             return;
 
-        if (callback != null) callback.accept(result, null);
-        new PersistTxn(node, all, txnId, sendTo, txn, executeAt, deps, writes, 
result, fullRoute)
-            .start(Apply.FACTORY, applyKind, all, writes, result);
+        super.persist(node, any, require, sendTo, selectSendTo, route, txnId, 
txn, executeAt, deps, writes, result, callback);
     }
 
+
     private boolean doInteropExecute(Node node, FullRoute<?> route, TxnId 
txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super Result, 
Throwable> callback)
     {
         // Unrecoverable repair always needs to be run by 
AccordInteropExecution
@@ -112,15 +112,16 @@ public class AccordInteropAdapter extends 
AbstractTxnAdapter
         return true;
     }
 
-    private static boolean doInteropPersist(Node node, Topologies all, 
Route<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes 
writes, Result result, FullRoute<?> fullRoute, BiConsumer<? super Result, 
Throwable> callback)
+    private boolean doInteropPersist(Node node, Topologies any, Route<?> 
require, Route<?> sendTo, SelectNodeOwnership selectSendTo, TxnId txnId, Txn 
txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> 
fullRoute, BiConsumer<? super Result, Throwable> callback)
     {
         Update update = txn.update();
         ConsistencyLevel consistencyLevel = update instanceof AccordUpdate ? 
((AccordUpdate) update).cassandraCommitCL() : null;
         if (consistencyLevel == null || consistencyLevel == 
ConsistencyLevel.ANY || writes.isEmpty())
             return false;
 
-        new AccordInteropPersist(node, all, txnId, sendTo, txn, executeAt, 
deps, writes, result, fullRoute, consistencyLevel, callback)
-            .start(AccordInteropApply.FACTORY, Minimal, all, writes, result);
+        Topologies all = execution(node, any, sendTo, selectSendTo, fullRoute, 
txnId, executeAt);
+        new AccordInteropPersist(node, all, txnId, require, txn, executeAt, 
deps, writes, result, fullRoute, consistencyLevel, callback)
+            .start(Minimal, any, writes, result);
         return true;
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
index edc5fb9629..241c8acd01 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
@@ -48,9 +48,11 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.accord.AccordMessageSink.AccordMessageType;
 import 
org.apache.cassandra.service.accord.serializers.ApplySerializers.ApplySerializer;
 import org.apache.cassandra.service.accord.txn.AccordUpdate;
+import org.apache.cassandra.tcm.ClusterMetadata;
 
 import static accord.utils.Invariants.requireArgument;
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * Apply that waits until the transaction is actually applied before sending a 
response
@@ -100,6 +102,8 @@ public class AccordInteropApply extends Apply implements 
LocalListeners.ComplexL
     @Override
     public ApplyReply apply(SafeCommandStore safeStore, StoreParticipants 
participants)
     {
+        ClusterMetadata cm = ClusterMetadata.current();
+        checkState(cm.epoch.getEpoch() >= minEpoch, "TCM epoch %d is < 
minEpoch %d", cm.epoch.getEpoch(), minEpoch);
         ApplyReply reply = super.apply(safeStore, participants);
         requireArgument(reply == ApplyReply.Redundant || reply == 
ApplyReply.Applied || reply == ApplyReply.Insufficient, "Unexpected 
ApplyReply");
 
@@ -198,7 +202,8 @@ public class AccordInteropApply extends Apply implements 
LocalListeners.ComplexL
             listeners = this.listeners;
             this.listeners = null;
         }
-        listeners.forEach((i, v) -> v.cancel());
+        if (listeners != null)
+            listeners.forEach((i, v) -> v.cancel());
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
index 0bc6cdbe32..56966df9d3 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
@@ -95,6 +95,7 @@ import org.apache.cassandra.transport.Dispatcher;
 
 import static accord.coordinate.CoordinationAdapter.Factory.Kind.Standard;
 import static accord.primitives.Txn.Kind.Write;
+import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
 import static accord.utils.Invariants.requireArgument;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordReadMetrics;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordWriteMetrics;
@@ -130,7 +131,7 @@ public class AccordInteropExecution implements 
ReadCoordinator, MaximalCommitSen
         }
 
         @Override
-        public <T> AsyncChain<T> submit(Callable<T> task)
+        public <T> AsyncChain<T> build(Callable<T> task)
         {
             try
             {
@@ -181,9 +182,10 @@ public class AccordInteropExecution implements 
ReadCoordinator, MaximalCommitSen
         this.consistencyLevel = consistencyLevel;
         this.endpointMapper = endpointMapper;
 
-        this.executes = node.topology().forEpoch(route, executeAt.epoch());
+        // TODO (required): compare this to latest logic in Accord, make sure 
it makes sense
+        this.executes = node.topology().forEpoch(route, executeAt.epoch(), 
SHARE);
         this.allTopologies = txnId.epoch() != executeAt.epoch()
-                             ? node.topology().preciseEpochs(route, 
txnId.epoch(), executeAt.epoch())
+                             ? node.topology().preciseEpochs(route, 
txnId.epoch(), executeAt.epoch(), SHARE)
                              : executes;
         this.executeTopology = executes.getEpoch(executeAt.epoch());
         this.coordinateTopology = allTopologies.getEpoch(txnId.epoch());
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
index 1a5d74b28d..e61d3934b5 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
@@ -110,7 +110,7 @@ public class AccordInteropPersist extends Persist
 
     public AccordInteropPersist(Node node, Topologies topologies, TxnId txnId, 
Route<?> sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result 
result, FullRoute<?> fullRoute, ConsistencyLevel consistencyLevel, BiConsumer<? 
super Result, Throwable> clientCallback)
     {
-        super(node, topologies, txnId, sendTo, txn, executeAt, deps, writes, 
result, fullRoute);
+        super(node, topologies, txnId, sendTo, txn, executeAt, deps, writes, 
result, fullRoute, AccordInteropApply.FACTORY);
         Invariants.requireArgument(consistencyLevel == ConsistencyLevel.QUORUM 
|| consistencyLevel == ConsistencyLevel.ALL || consistencyLevel == 
ConsistencyLevel.SERIAL || consistencyLevel == ConsistencyLevel.ONE);
         this.consistencyLevel = consistencyLevel;
         registerClientCallback(result, clientCallback);
diff --git 
a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
 
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
index c0b1938fd3..c31971883e 100644
--- 
a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
+++ 
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
@@ -58,10 +58,10 @@ public class RepairSyncPointAdapter<U extends Unseekable> 
extends CoordinationAd
     }
 
     @Override
-    public void execute(Node node, Topologies all, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
+    public void execute(Node node, Topologies all, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps, 
Deps sendDeps, BiConsumer<? super SyncPoint<U>, Throwable> callback)
     {
         RequiredResponseTracker tracker = new 
RequiredResponseTracker(requiredResponses, all);
-        ExecuteSyncPoint.ExecuteInclusive<U> execute = new 
ExecuteSyncPoint.ExecuteInclusive<>(node, new SyncPoint<>(txnId, executeAt, 
deps, (FullRoute<U>) route), tracker, executeAt);
+        ExecuteSyncPoint.ExecuteInclusive<U> execute = new 
ExecuteSyncPoint.ExecuteInclusive<>(node, new SyncPoint<>(txnId, executeAt, 
stableDeps, (FullRoute<U>) route), tracker, executeAt);
         execute.addCallback(callback);
         execute.start();
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index 9454cae12c..aa325da041 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -20,13 +20,15 @@ package org.apache.cassandra.service.accord.serializers;
 
 import java.io.IOException;
 
+import accord.local.Commands.AcceptOutcome;
 import accord.messages.Accept;
 import accord.messages.Accept.AcceptReply;
 import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Participants;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
-import accord.utils.Invariants;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -34,20 +36,21 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import 
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
 
 import static accord.messages.Accept.SerializerSupport.create;
-import static accord.utils.Invariants.illegalState;
 
 public class AcceptSerializers
 {
     private AcceptSerializers() {}
 
-    public static final IVersionedSerializer<Accept> request = new 
TxnRequestSerializer.WithUnsyncedSerializer<>()
+    public static final IVersionedSerializer<Accept> request = new 
RequestSerializer();
+    public static class RequestSerializer extends 
TxnRequestSerializer.WithUnsyncedSerializer<Accept>
     {
-        final IVersionedSerializer<Accept.Kind> kindSerializer = new 
EnumSerializer<>(Accept.Kind.class);
+        private static final Accept.Kind[] kinds = Accept.Kind.values();
+        private static final int IS_PARTIAL = 1;
 
         @Override
         public void serializeBody(Accept accept, DataOutputPlus out, int 
version) throws IOException
         {
-            kindSerializer.serialize(accept.kind, out, version);
+            out.writeByte((accept.kind.ordinal() << 1) | 
(accept.isPartialAccept ? IS_PARTIAL : 0));
             CommandSerializers.ballot.serialize(accept.ballot, out, version);
             ExecuteAtSerializer.serialize(accept.txnId, accept.executeAt, out);
             DepsSerializers.partialDeps.serialize(accept.partialDeps, out, 
version);
@@ -56,17 +59,20 @@ public class AcceptSerializers
         @Override
         public Accept deserializeBody(DataInputPlus in, int version, TxnId 
txnId, Route<?> scope, long waitForEpoch, long minEpoch) throws IOException
         {
+            int flags = in.readByte();
+            Accept.Kind kind = kinds[(flags >>> 1) & 1];
             return create(txnId, scope, waitForEpoch, minEpoch,
-                          kindSerializer.deserialize(in, version),
+                          kind,
                           CommandSerializers.ballot.deserialize(in, version),
                           ExecuteAtSerializer.deserialize(txnId, in),
-                          DepsSerializers.partialDeps.deserialize(in, 
version));
+                          DepsSerializers.partialDeps.deserialize(in, version),
+                          (flags & IS_PARTIAL) != 0);
         }
 
         @Override
         public long serializedBodySize(Accept accept, int version)
         {
-            return kindSerializer.serializedSize(accept.kind, version)
+            return 1
                    + CommandSerializers.ballot.serializedSize(accept.ballot, 
version)
                    + ExecuteAtSerializer.serializedSize(accept.txnId, 
accept.executeAt)
                    + 
DepsSerializers.partialDeps.serializedSize(accept.partialDeps, version);
@@ -103,84 +109,58 @@ public class AcceptSerializers
         }
     };
 
-    public static final IVersionedSerializer<AcceptReply> reply = new 
IVersionedSerializer<>()
+    public static final IVersionedSerializer<AcceptReply> reply = new 
ReplySerializer();
+    public static class ReplySerializer implements 
IVersionedSerializer<AcceptReply>
     {
+        private static final int SUPERSEDED_BY        = 0x08;
+        private static final int COMMITTED_EXECUTE_AT = 0x10;
+        private static final int SUCCESSFUL           = 0x20;
+        private static final int DEPS                 = 0x40;
         @Override
         public void serialize(AcceptReply reply, DataOutputPlus out, int 
version) throws IOException
         {
-            switch (reply.outcome())
-            {
-                default: throw new AssertionError();
-                case Retired:
-                case Truncated:
-                    throw illegalState("AcceptReply with invalid 
AcceptOutcome: " + reply.outcome);
-                case Success:
-                    if (reply.deps != null)
-                    {
-                        out.writeByte(1);
-                        DepsSerializers.deps.serialize(reply.deps, out, 
version);
-                    }
-                    else
-                    {
-                        Invariants.require(reply == AcceptReply.SUCCESS);
-                        out.writeByte(2);
-                    }
-                    break;
-                case RejectedBallot:
-                    out.writeByte(3);
-                    CommandSerializers.ballot.serialize(reply.supersededBy, 
out, version);
-                    break;
-                case Redundant:
-                    int flags = 4 | (reply.supersededBy != null ? 0x8 : 0) | 
(reply.committedExecuteAt != null ? 0x10 : 0);
-                    out.writeByte(flags);
-                    if (reply.supersededBy != null)
-                        
CommandSerializers.ballot.serialize(reply.supersededBy, out, version);
-                    if (reply.committedExecuteAt != null)
-                        
ExecuteAtSerializer.serialize(reply.committedExecuteAt, out);
-            }
+            int flags =  reply.outcome.ordinal()
+                      | (reply.supersededBy != null       ? SUPERSEDED_BY      
  : 0)
+                      | (reply.committedExecuteAt != null ? 
COMMITTED_EXECUTE_AT : 0)
+                      | (reply.successful != null         ? SUCCESSFUL         
  : 0)
+                      | (reply.deps != null               ? DEPS               
  : 0);
+
+            out.writeByte(flags);
+            if (reply.supersededBy != null)
+                CommandSerializers.ballot.serialize(reply.supersededBy, out, 
version);
+            if (reply.committedExecuteAt != null)
+                ExecuteAtSerializer.serialize(reply.committedExecuteAt, out);
+            if (reply.successful != null)
+                KeySerializers.participants.serialize(reply.successful, out, 
version);
+            if (reply.deps != null)
+                DepsSerializers.deps.serialize(reply.deps, out, version);
         }
 
+        private final AcceptOutcome[] outcomes = AcceptOutcome.values();
         @Override
         public AcceptReply deserialize(DataInputPlus in, int version) throws 
IOException
         {
             int flags = in.readByte();
-            switch (flags & 0x7)
-            {
-                default: throw new IllegalStateException("Unexpected 
AcceptNack type: " + (flags & 0x7));
-                case 1:
-                    return new 
AcceptReply(DepsSerializers.deps.deserialize(in, version));
-                case 2:
-                    return AcceptReply.SUCCESS;
-                case 3:
-                    return new 
AcceptReply(CommandSerializers.ballot.deserialize(in, version));
-                case 4:
-                    Ballot supersededBy = (flags & 0x8) == 0 ? null : 
CommandSerializers.ballot.deserialize(in, version);
-                    Timestamp committedExecuteAt = (flags & 0x10) == 0 ? null 
: ExecuteAtSerializer.deserialize(in);
-                    return new AcceptReply(supersededBy, committedExecuteAt);
-            }
+            AcceptOutcome outcome = outcomes[flags & 3];
+            Ballot supersededBy = (flags & SUPERSEDED_BY) == 0 ? null : 
CommandSerializers.ballot.deserialize(in, version);
+            Timestamp committedExecuteAt = (flags & COMMITTED_EXECUTE_AT) == 0 
? null : ExecuteAtSerializer.deserialize(in);
+            Participants<?> successful = (flags & SUCCESSFUL) == 0 ? null : 
KeySerializers.participants.deserialize(in, version);
+            Deps deps = (flags & DEPS) == 0 ? null : 
DepsSerializers.deps.deserialize(in, version);
+            return new AcceptReply(outcome, supersededBy, successful, deps, 
committedExecuteAt);
         }
 
         @Override
         public long serializedSize(AcceptReply reply, int version)
         {
             long size = TypeSizes.BYTE_SIZE;
-            switch (reply.outcome())
-            {
-                default: throw new AssertionError();
-                case Retired:
-                case Truncated:
-                    throw illegalState("AcceptReply with invalid 
AcceptOutcome: " + reply.outcome);
-                case Success:
-                    if (reply.deps != null)
-                        size += 
DepsSerializers.deps.serializedSize(reply.deps, version);
-                    break;
-                case RejectedBallot:
-                    size += 
CommandSerializers.ballot.serializedSize(reply.supersededBy, version);
-                    break;
-                case Redundant:
-                    if (reply.supersededBy != null) size += 
CommandSerializers.ballot.serializedSize(reply.supersededBy, version);
-                    if (reply.committedExecuteAt != null) size += 
ExecuteAtSerializer.serializedSize(reply.committedExecuteAt);
-            }
+            if (reply.supersededBy != null)
+                size += 
CommandSerializers.ballot.serializedSize(reply.supersededBy, version);
+            if (reply.committedExecuteAt != null)
+                size += 
ExecuteAtSerializer.serializedSize(reply.committedExecuteAt);
+            if (reply.successful != null)
+                size += 
KeySerializers.participants.serializedSize(reply.successful, version);
+            if (reply.deps != null)
+                size += DepsSerializers.deps.serializedSize(reply.deps, 
version);
             return size;
         }
     };
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index 44fff87469..8eeed391a7 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -66,7 +66,7 @@ public class BeginInvalidationSerializers
         @Override
         public void serialize(InvalidateReply reply, DataOutputPlus out, int 
version) throws IOException
         {
-            CommandSerializers.nullableBallot.serialize(reply.supersededBy, 
out, version);
+            CommandSerializers.ballot.serialize(reply.supersededBy, out, 
version);
             CommandSerializers.ballot.serialize(reply.accepted, out, version);
             CommandSerializers.saveStatus.serialize(reply.maxStatus, out, 
version);
             CommandSerializers.saveStatus.serialize(reply.maxKnowledgeStatus, 
out, version);
@@ -80,7 +80,7 @@ public class BeginInvalidationSerializers
         public InvalidateReply deserialize(DataInputPlus in, int version) 
throws IOException
         {
             // TODO (expected): use headers instead of nullable+bool 
serializers
-            Ballot supersededBy = 
CommandSerializers.nullableBallot.deserialize(in, version);
+            Ballot supersededBy = CommandSerializers.ballot.deserialize(in, 
version);
             Ballot accepted = CommandSerializers.ballot.deserialize(in, 
version);
             SaveStatus maxStatus = 
CommandSerializers.saveStatus.deserialize(in, version);
             SaveStatus maxKnowledgeStatus = 
CommandSerializers.saveStatus.deserialize(in, version);
@@ -94,7 +94,7 @@ public class BeginInvalidationSerializers
         @Override
         public long serializedSize(InvalidateReply reply, int version)
         {
-            return 
CommandSerializers.nullableBallot.serializedSize(reply.supersededBy, version)
+            return 
CommandSerializers.ballot.serializedSize(reply.supersededBy, version)
                  + CommandSerializers.ballot.serializedSize(reply.accepted, 
version)
                  + 
CommandSerializers.saveStatus.serializedSize(reply.maxStatus, version)
                  + 
CommandSerializers.saveStatus.serializedSize(reply.maxKnowledgeStatus, version)
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index d5a80d7b26..bf349b988b 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -75,8 +75,7 @@ public class CommandSerializers
     public static final IVersionedSerializer<TxnId> nullableTxnId = 
NullableSerializer.wrap(txnId);
     public static final TimestampSerializer<Timestamp> timestamp = new 
TimestampSerializer<>(Timestamp::fromBits);
     public static final IVersionedSerializer<Timestamp> nullableTimestamp = 
NullableSerializer.wrap(timestamp);
-    public static final TimestampSerializer<Ballot> ballot = new 
TimestampSerializer<>(Ballot::fromBits);
-    public static final IVersionedSerializer<Ballot> nullableBallot = 
NullableSerializer.wrap(ballot);
+    public static final BallotSerializer ballot = new BallotSerializer(); // 
permits null
     public static final EnumSerializer<Txn.Kind> kind = new 
EnumSerializer<>(Txn.Kind.class);
     public static final StoreParticipantsSerializer participants = new 
StoreParticipantsSerializer();
 
@@ -356,8 +355,11 @@ public class CommandSerializers
         public void skip(DataInputPlus in, int version) throws IOException
         {
             int flags = in.readByte();
-            if (0 != (flags & HAS_ROUTE))
-                KeySerializers.route.deserialize(in, version);
+            if (0 != (flags & HAS_ROUTE)) KeySerializers.route.skip(in, 
version);
+            if (0 == (flags & HAS_TOUCHED_EQUALS_ROUTE)) 
KeySerializers.participants.skip(in, version);
+            if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED)) 
KeySerializers.participants.skip(in, version);
+            if (0 == (flags & OWNS_EQUALS_TOUCHES)) 
KeySerializers.participants.skip(in, version);
+            if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL))) 
KeySerializers.participants.skip(in, version);
         }
 
         @Override
@@ -446,7 +448,7 @@ public class CommandSerializers
             TopologySerializers.nodeId.serialize(ts.node, out);
         }
 
-        public void skip(DataInputPlus in, int version) throws IOException
+        public void skip(DataInputPlus in) throws IOException
         {
             in.skipBytesFully(serializedSize());
         }
@@ -500,6 +502,64 @@ public class CommandSerializers
         }
     }
 
+    public static class BallotSerializer implements 
IVersionedSerializer<Ballot>
+    {
+        final TimestampSerializer<Ballot> wrapped = new 
TimestampSerializer<>(Ballot::fromBits);
+
+        @Override
+        public void serialize(Ballot t, DataOutputPlus out, int version) 
throws IOException
+        {
+            if (t == null || t.equals(Ballot.ZERO) || t.equals(Ballot.MAX))
+            {
+                out.writeByte(t == null ? 1 : t.equals(Ballot.ZERO) ? 2 : 3);
+            }
+            else
+            {
+                out.writeByte(0);
+                wrapped.serialize(t, out, version);
+            }
+        }
+
+        @Override
+        public Ballot deserialize(DataInputPlus in, int version) throws 
IOException
+        {
+            return deserialize(in);
+        }
+
+        public Ballot deserialize(DataInputPlus in) throws IOException
+        {
+            int flags = in.readByte();
+            switch (flags)
+            {
+                default: throw new IOException("Corrupted input: expected 
[0..3], received: " + flags);
+                case 0: return wrapped.deserialize(in);
+                case 1: return null;
+                case 2: return Ballot.ZERO;
+                case 3: return Ballot.MAX;
+            }
+        }
+
+        public void skip(DataInputPlus in) throws IOException
+        {
+            int flags = in.readByte();
+            if (flags == 0)
+                wrapped.skip(in);
+        }
+
+        @Override
+        public long serializedSize(Ballot t, int version)
+        {
+            return serializedSize(t);
+        }
+
+        public long serializedSize(Ballot t)
+        {
+            if (t == null || t.equals(Ballot.ZERO) || t.equals(Ballot.MAX))
+                return 1;
+            return 1 + wrapped.serializedSize();
+        }
+    }
+
     public static class PartialTxnSerializer extends AbstractWithKeysSerializer
     implements IVersionedSerializer<PartialTxn>
     {
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java 
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
index 6eb7a0fe6b..ffb36a2224 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
@@ -80,11 +80,11 @@ public class KeySerializers
     public static final IVersionedSerializer<Route<?>> nullableRoute;
     public static final IVersionedSerializer<PartialRoute<?>> partialRoute;
 
-    public static final IVersionedSerializer<FullRoute<?>> fullRoute;
+    public static final AbstractRoutablesSerializer<FullRoute<?>> fullRoute;
     public static final IVersionedSerializer<Seekables<?, ?>> seekables;
     public static final IVersionedSerializer<FullRoute<?>> nullableFullRoute;
-    public static final IVersionedSerializer<Unseekables<?>> unseekables;
-    public static final IVersionedSerializer<Participants<?>> participants;
+    public static final AbstractRoutablesSerializer<Unseekables<?>> 
unseekables;
+    public static final AbstractRoutablesSerializer<Participants<?>> 
participants;
     public static final IVersionedSerializer<Participants<?>> 
nullableParticipants;
 
     static
@@ -138,11 +138,11 @@ public class KeySerializers
         final IVersionedSerializer<Route<?>> nullableRoute;
         final IVersionedSerializer<PartialRoute<?>> partialRoute;
 
-        final IVersionedSerializer<FullRoute<?>> fullRoute;
-        final IVersionedSerializer<Seekables<?, ?>> seekables;
+        final AbstractRoutablesSerializer<FullRoute<?>> fullRoute;
+        final AbstractSeekablesSerializer seekables;
         final IVersionedSerializer<FullRoute<?>> nullableFullRoute;
-        final IVersionedSerializer<Unseekables<?>> unseekables;
-        final IVersionedSerializer<Participants<?>> participants;
+        final AbstractRoutablesSerializer<Unseekables<?>> unseekables;
+        final AbstractRoutablesSerializer<Participants<?>> participants;
         final IVersionedSerializer<Participants<?>> nullableParticipants;
         private Impl()
         {
@@ -285,19 +285,19 @@ public class KeySerializers
             this.route = (AbstractRoutablesSerializer<Route<?>>) 
factory.apply(EnumSet.of(UnseekablesKind.PartialKeyRoute, 
UnseekablesKind.FullKeyRoute, UnseekablesKind.PartialRangeRoute, 
UnseekablesKind.FullRangeRoute));
             this.nullableRoute = NullableSerializer.wrap(route);
 
-            this.partialRoute = (IVersionedSerializer<PartialRoute<?>>) 
factory.apply(EnumSet.of(UnseekablesKind.PartialKeyRoute, 
UnseekablesKind.PartialRangeRoute));
-            this.fullRoute = (IVersionedSerializer<FullRoute<?>>) 
factory.apply(EnumSet.of(UnseekablesKind.FullKeyRoute, 
UnseekablesKind.FullRangeRoute));
+            this.partialRoute = (AbstractRoutablesSerializer<PartialRoute<?>>) 
factory.apply(EnumSet.of(UnseekablesKind.PartialKeyRoute, 
UnseekablesKind.PartialRangeRoute));
+            this.fullRoute = (AbstractRoutablesSerializer<FullRoute<?>>) 
factory.apply(EnumSet.of(UnseekablesKind.FullKeyRoute, 
UnseekablesKind.FullRangeRoute));
             this.nullableFullRoute = NullableSerializer.wrap(fullRoute);
 
-            this.unseekables = (IVersionedSerializer<Unseekables<?>>) 
factory.apply(EnumSet.allOf(UnseekablesKind.class));
-            this.participants = (IVersionedSerializer<Participants<?>>) 
factory.apply(EnumSet.allOf(UnseekablesKind.class));
+            this.unseekables = (AbstractRoutablesSerializer<Unseekables<?>>) 
factory.apply(EnumSet.allOf(UnseekablesKind.class));
+            this.participants = (AbstractRoutablesSerializer<Participants<?>>) 
factory.apply(EnumSet.allOf(UnseekablesKind.class));
 
             this.nullableParticipants = NullableSerializer.wrap(participants);
             this.seekables = new AbstractSeekablesSerializer(keys, ranges);
         }
     }
 
-    static class AbstractRoutablesSerializer<RS extends Unseekables<?>> 
implements IVersionedSerializer<RS>
+    public static class AbstractRoutablesSerializer<RS extends Unseekables<?>> 
implements IVersionedSerializer<RS>
     {
         final EnumSet<UnseekablesKind> permitted;
         final AbstractKeysSerializer<RoutingKey, RoutingKeys> routingKeys;
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/LatestDepsSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/LatestDepsSerializers.java
index aa0019d211..ccbb40a460 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/LatestDepsSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/LatestDepsSerializers.java
@@ -38,7 +38,7 @@ import 
org.apache.cassandra.service.accord.serializers.CommandSerializers.Execut
 
 public class LatestDepsSerializers
 {
-    public static final IVersionedSerializer<LatestDeps> latestDeps = new 
IVersionedSerializer<LatestDeps>()
+    public static final IVersionedSerializer<LatestDeps> latestDeps = new 
IVersionedSerializer<>()
     {
         @Override
         public void serialize(LatestDeps t, DataOutputPlus out, int version) 
throws IOException
@@ -133,8 +133,9 @@ public class LatestDepsSerializers
         @Override
         public GetLatestDeps deserializeBody(DataInputPlus in, int version, 
TxnId txnId, Route<?> scope, long waitForEpoch, long minEpoch) throws 
IOException
         {
+            Ballot ballot = CommandSerializers.ballot.deserialize(in);
             Timestamp executeAt = ExecuteAtSerializer.deserialize(in);
-            return GetLatestDeps.SerializationSupport.create(txnId, scope, 
waitForEpoch, minEpoch, executeAt);
+            return GetLatestDeps.SerializationSupport.create(txnId, scope, 
waitForEpoch, minEpoch, ballot, executeAt);
         }
 
         @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index ee0000f1a4..55832dfee7 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -90,7 +90,7 @@ public class RecoverySerializers
         final RecoverReply.Kind[] kinds = RecoverReply.Kind.values();
         void serializeNack(RecoverNack recoverNack, DataOutputPlus out, int 
version) throws IOException
         {
-            
CommandSerializers.nullableBallot.serialize(recoverNack.supersededBy, out, 
version);
+            CommandSerializers.ballot.serialize(recoverNack.supersededBy, out, 
version);
         }
 
         void serializeOk(RecoverOk recoverOk, DataOutputPlus out, int version) 
throws IOException
@@ -132,7 +132,7 @@ public class RecoverySerializers
         {
             RecoverReply.Kind kind = kinds[in.readByte()];
             if (kind != Ok)
-                return deserializeNack(kind, 
CommandSerializers.nullableBallot.deserialize(in, version), in, version);
+                return deserializeNack(kind, 
CommandSerializers.ballot.deserialize(in, version), in, version);
 
             TxnId id = CommandSerializers.txnId.deserialize(in, version);
             Status status = CommandSerializers.status.deserialize(in, version);
@@ -160,7 +160,7 @@ public class RecoverySerializers
 
         long serializedNackSize(RecoverNack recoverNack, int version)
         {
-            return 
CommandSerializers.nullableBallot.serializedSize(recoverNack.supersededBy, 
version);
+            return 
CommandSerializers.ballot.serializedSize(recoverNack.supersededBy, version);
         }
 
         long serializedOkSize(RecoverOk recoverOk, int version)
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index dd9486ca30..3a28cec6b0 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -29,9 +29,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Data;
+import accord.api.RoutingKey;
 import accord.primitives.Range;
 import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
+import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResults;
@@ -66,6 +68,7 @@ import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.serializers.KeySerializers;
 import org.apache.cassandra.service.accord.txn.TxnData.TxnDataNameKind;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Comparables;
 import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.ObjectSizes;
 
@@ -219,6 +222,31 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
         }
     }
 
+    public TxnNamedRead slice(Range range)
+    {
+        Invariants.require(key.domain().isRange());
+        if (key.equals(range))
+            return this;
+
+        Invariants.require(((Range)key).contains(range));
+        return new TxnNamedRead(txnDataName(), range, bytes());
+    }
+
+    public TxnNamedRead merge(TxnNamedRead with)
+    {
+        Invariants.require(key.domain().isRange());
+        if (key.equals(with.key))
+            return this;
+
+        Range thisRange = key.asRange();
+        Range thatRange = with.key.asRange();
+        Invariants.require(thisRange.compareTouching(thatRange) == 0);
+        RoutingKey start = Comparables.min(thisRange.start(), 
thatRange.start());
+        RoutingKey end = Comparables.max(thisRange.end(), thatRange.end());
+        Range range = thisRange.newRange(start, end);
+        return new TxnNamedRead(txnDataName(), range, bytes());
+    }
+
     public static boolean readsWithoutReconciliation(ConsistencyLevel 
consistencyLevel)
     {
         boolean withoutReconciliation = consistencyLevel == null || 
consistencyLevel == ConsistencyLevel.ONE;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index d52cea531c..aa58aa7875 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import javax.annotation.Nonnull;
@@ -41,6 +40,8 @@ import accord.primitives.Routable.Domain;
 import accord.primitives.Seekable;
 import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
+import accord.utils.Invariants;
+import accord.utils.UnhandledEnum;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -55,6 +56,7 @@ import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.ObjectSizes;
 
+import static accord.primitives.Routables.Slice.Minimal;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
@@ -112,6 +114,10 @@ public class TxnRead extends 
AbstractKeySorted<TxnNamedRead> implements Read
         checkArgument(cassandraConsistencyLevel == null || 
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel), 
"Unsupported consistency level for read: %s", cassandraConsistencyLevel);
         this.cassandraConsistencyLevel = cassandraConsistencyLevel;
         this.domain = items[0].key().domain();
+        // TODO (expected): relax this condition, require only that it holds 
for each equal byte[]
+        //  right now this means we don't permit two different range queries 
in the same transaction touching adjacent ranges
+        //  this is a pretty weak restriction and doesn't interfere with 
current CQL capabilities, but should be addressed eventually
+        Invariants.require(domain == Domain.Key || 
((Ranges)keys()).mergeTouching() == keys());
     }
 
     private TxnRead(@Nonnull List<TxnNamedRead> items, @Nullable 
ConsistencyLevel cassandraConsistencyLevel)
@@ -120,6 +126,7 @@ public class TxnRead extends 
AbstractKeySorted<TxnNamedRead> implements Read
         checkArgument(cassandraConsistencyLevel == null || 
SUPPORTED_READ_CONSISTENCY_LEVELS.contains(cassandraConsistencyLevel), 
"Unsupported consistency level for read: %s", cassandraConsistencyLevel);
         this.cassandraConsistencyLevel = cassandraConsistencyLevel;
         this.domain = items.get(0).key().domain();
+        Invariants.require(domain == Domain.Key || 
((Ranges)keys()).mergeTouching() == keys());
     }
 
     private static void sortReads(List<TxnNamedRead> reads)
@@ -213,52 +220,147 @@ public class TxnRead extends 
AbstractKeySorted<TxnNamedRead> implements Read
     @Override
     public Read slice(Ranges ranges)
     {
-        return intersecting(itemKeys.slice(ranges));
+        return select(itemKeys.slice(ranges, Minimal));
     }
 
     @Override
     public Read intersecting(Participants<?> participants)
     {
-        return intersecting(itemKeys.intersecting(participants));
+        return select(itemKeys.intersecting(participants, Minimal));
     }
 
-    private Read intersecting(Seekables<?, ?> select)
+    private Read select(Seekables<?, ?> select)
     {
-        // TODO (review): Why construct this keys at all and not just check 
against select?
-        Seekables<?, ?> keys = (Seekables<?, ?>)itemKeys.intersecting(select);
-        List<TxnNamedRead> reads = new ArrayList<>(keys.size());
+        if (select == keys())
+            return this;
 
-        switch (keys.domain())
+        List<TxnNamedRead> reads = new ArrayList<>(select.size());
+        switch (select.domain())
         {
             case Key:
-                for (TxnNamedRead read : items)
-                    if (keys.contains((Key)read.key()))
+            {
+                Keys keys = (Keys) select;
+                int i = 0, j = 0;
+                while (i < select.size() && j < items.length)
+                {
+                    Key key = keys.get(i);
+                    TxnNamedRead read = items[j];
+                    int c = key.compareTo((Key)read.key());
+                    if (c < 0) ++i;
+                    else if (c > 0) ++j;
+                    else
+                    {
                         reads.add(read);
+                        ++j;
+                    }
+                }
                 break;
+            }
             case Range:
-                for (TxnNamedRead read : items)
-                    if (keys.intersects((Range)read.key()))
-                        reads.add(read);
+            {
+                Ranges ranges = (Ranges) select;
+                int i = 0, j = 0;
+                while (i < select.size() && j < items.length)
+                {
+                    Range range = ranges.get(i);
+                    TxnNamedRead read = items[j];
+                    int c = range.compareIntersecting((Range) read.key());
+                    if (c < 0) ++i;
+                    else if (c > 0) ++j;
+                    else
+                    {
+                        reads.add(read.slice(range));
+                        ++j;
+                    }
+                }
                 break;
+            }
             default:
-                throw new IllegalStateException("Unhandled domain " + 
keys.domain());
+                throw new UnhandledEnum(select.domain());
         }
 
-        return createTxnRead(reads, cassandraConsistencyLevel, keys.domain());
+        return createTxnRead(reads, cassandraConsistencyLevel, 
select.domain());
     }
 
     @Override
     public Read merge(Read read)
     {
-        TxnRead txnRead = (TxnRead)read;
+        TxnRead that = (TxnRead)read;
         List<TxnNamedRead> reads = new ArrayList<>(items.length);
-        Collections.addAll(reads, items);
-
-        for (TxnNamedRead namedRead : txnRead)
-            if (!reads.contains(namedRead))
-                reads.add(namedRead);
 
-        return createTxnRead(reads, cassandraConsistencyLevel, txnRead.domain);
+        switch (domain)
+        {
+            default: throw new UnhandledEnum(domain);
+            case Key:
+            {
+                int i = 0, j = 0;
+                while (i < items.length && j < that.items.length)
+                {
+                    TxnNamedRead r1 = this.items[i], r2 = that.items[i];
+                    int c = compareKey(r1, r2);
+                    if (c <= 0)
+                    {
+                        reads.add(r1);
+                        ++i;
+                        if (c == 0)
+                            ++j;
+                    }
+                    else
+                    {
+                        reads.add(r2);
+                        ++j;
+                    }
+                }
+                break;
+            }
+            case Range:
+            {
+                int i = 0, j = 0;
+                TxnNamedRead pending = null;
+                while (i < items.length && j < that.items.length)
+                {
+                    TxnNamedRead r1 = this.items[i], r2 = that.items[i];
+                    int c = compareRange(r1, r2);
+                    TxnNamedRead add;
+                    if (c == 0)
+                    {
+                        add = r1.merge(r2);
+                        ++i;
+                        ++j;
+                    }
+                    else if (c < 0)
+                    {
+                        add = r1;
+                        ++i;
+                    }
+                    else
+                    {
+                        add = r2;
+                        ++j;
+                    }
+
+                    if (pending == null) pending = add;
+                    else
+                    {
+                        c = compareRange(pending, add);
+                        if (c < 0)
+                        {
+                            reads.add(pending);
+                            pending = add;
+                        }
+                        else
+                        {
+                            Invariants.require(c == 0);
+                            pending = pending.merge(add);
+                        }
+                    }
+                }
+                if (pending != null)
+                    reads.add(pending);
+                break;
+            }
+        }
+        return createTxnRead(reads, cassandraConsistencyLevel, that.domain);
     }
 
     public void unmemoize()
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index bc8d5d2bb3..8b1611dd7a 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -60,7 +60,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.service.accord.AccordObjectSizes;
 import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.BooleanSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -381,8 +380,6 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
     @Override
     public AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, 
TxnId txnId, Timestamp executeAt, DataStore store, PartialTxn txn)
     {
-        ClusterMetadata cm = ClusterMetadata.current();
-        checkState(cm.epoch.getEpoch() >= executeAt.epoch(), "TCM epoch %d is 
< executeAt epoch %d", cm.epoch.getEpoch(), executeAt.epoch());
         // UnrecoverableRepairUpdate will deserialize as null at other nodes
         // Accord should skip the Update for a read transaction, but handle it 
here anyways
         TxnUpdate txnUpdate = ((TxnUpdate)txn.update());
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationWriteRaceTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationWriteRaceTestBase.java
index 593b1296ce..6984fa92f7 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationWriteRaceTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationWriteRaceTestBase.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 import accord.api.RoutingKey;
 import accord.coordinate.Outcome;
 import accord.messages.PreAccept;
-import accord.primitives.PartialKeyRoute;
+import accord.primitives.KeyRoute;
 import accord.primitives.Ranges;
 import accord.primitives.Routable.Domain;
 import accord.primitives.Route;
@@ -476,7 +476,7 @@ public abstract class AccordMigrationWriteRaceTestBase 
extends AccordTestBase
                                      PreAccept preAccept = 
(PreAccept)Instance.deserializeMessage(message).payload;
                                      Route<?> route = preAccept.scope;
                                      if (route.domain() == Domain.Key)
-                                         for (RoutingKey key : 
(PartialKeyRoute)route)
+                                         for (RoutingKey key : (KeyRoute)route)
                                          {
                                              AccordRoutingKey routingKey = 
(AccordRoutingKey)key;
                                              ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(routingKey.table());
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index a63c79fae6..9a3e81d74c 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -48,7 +48,7 @@ import accord.api.RoutingKey;
 import accord.coordinate.Invalidated;
 import accord.impl.progresslog.DefaultProgressLogs;
 import accord.messages.PreAccept;
-import accord.primitives.PartialKeyRoute;
+import accord.primitives.KeyRoute;
 import accord.primitives.Routable.Domain;
 import accord.primitives.Route;
 import accord.primitives.TxnId;
@@ -656,7 +656,7 @@ public abstract class AccordTestBase extends TestBaseImpl
                     PreAccept preAccept = 
(PreAccept)Instance.deserializeMessage(message).payload;
                     Route<?> route = preAccept.scope;
                     if (route.domain() == Domain.Key)
-                        for (RoutingKey key : (PartialKeyRoute)route)
+                        for (RoutingKey key : (KeyRoute)route)
                         {
                             AccordRoutingKey routingKey = 
(AccordRoutingKey)key;
                             ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(routingKey.table());
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
index 96a393eaf3..299e702a49 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.coordinate.CoordinationFailed;
-import accord.coordinate.Invalidated;
 import org.apache.cassandra.concurrent.ExecutorFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.distributed.Cluster;
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index 458f6e52e4..04689037c1 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -58,6 +58,7 @@ import org.junit.Assume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.AbstractReadCommandBuilder;
@@ -377,7 +378,7 @@ public class Util
         catch (Throwable e)
         {
             // Use name because in-jvm dtests will have different instances of 
the class
-            assert e.getClass().getName().equals(exception.getName()) : 
e.getClass().getName() + " is not " + exception.getName();
+            
Invariants.require(e.getClass().getName().equals(exception.getName()), 
e.getClass().getName() + " is not " + exception.getName());
             thrown = true;
         }
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 9baf5f28c9..af94147517 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -144,7 +144,7 @@ public class AccordCommandTest
             builder.add(key.toUnseekable(), txnId2);
             deps = builder.build();
         }
-        Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, 
SLOW, Ballot.ZERO, executeAt, deps);
+        Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, 
SLOW, Ballot.ZERO, executeAt, deps, false);
 
         getUninterruptibly(commandStore.execute(accept, safeStore -> {
             Command before = safeStore.ifInitialised(txnId).current();
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
index 4e376e4939..c4db93f6cc 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.journal.TestParams;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -77,7 +77,7 @@ public class AccordJournalOrderTest
             
ServerTestUtils.cleanupDirectory(DatabaseDescriptor.getAccordJournalDirectory());
         AccordJournal accordJournal = new AccordJournal(TestParams.INSTANCE, 
new AccordAgent());
         accordJournal.start(null);
-        RandomSource randomSource = RandomSource.wrap(new Random());
+        RandomSource randomSource = RandomSource.wrap(new Random(0));
         TxnId id1 = AccordGens.txnIds().next(randomSource);
         TxnId id2 = AccordGens.txnIds().next(randomSource);
 
@@ -87,7 +87,7 @@ public class AccordJournalOrderTest
             TxnId txnId = randomSource.nextBoolean() ? id1 : id2;
             JournalKey key = new JournalKey(txnId, 
JournalKey.Type.COMMAND_DIFF, randomSource.nextInt(5));
             res.compute(key, (k, prev) -> prev == null ? 1 : prev + 1);
-            Participants<?> participants = RoutingKeys.of(new 
AccordRoutingKey.TokenKey(TableId.generate(), new 
Murmur3Partitioner.LongToken(1)));
+            Participants<?> participants = RoutingKeys.of(new 
AccordRoutingKey.TokenKey(TableId.generate(), new 
ByteOrderedPartitioner.BytesToken(new byte[1])));
             Command command = Command.NotDefined.notDefined(txnId, 
SaveStatus.NotDefined, Status.Durability.NotDurable, 
StoreParticipants.create(null, participants, null, participants, participants), 
Ballot.ZERO);
             accordJournal.saveCommand(key.commandStoreId,
                                       new Journal.CommandUpdate(null, command),
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
index 4bfbeb8a6f..62196013a9 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
@@ -28,12 +28,12 @@ import accord.messages.ReadData;
 import accord.messages.ReadData.CommitOrReadNack;
 import accord.topology.TopologyUtils;
 import 
org.apache.cassandra.service.accord.AccordFetchCoordinator.AccordFetchRequest;
+import org.apache.cassandra.service.accord.api.AccordAgent;
 import org.apache.cassandra.service.accord.api.AccordTimeService;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import accord.Utils;
-import accord.api.Agent;
 import accord.impl.AbstractFetchCoordinator;
 import accord.impl.IntKey;
 import accord.local.Node;
@@ -72,7 +72,7 @@ public class AccordMessageSinkTest
         DatabaseDescriptor.clientInitialization();
         DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
         ClusterMetadataService.initializeForClients();
-        sink = new AccordMessageSink(Mockito.mock(Agent.class), messaging, 
mapping, new RequestCallbacks(new AccordTimeService()));
+        sink = new AccordMessageSink(Mockito.mock(AccordAgent.class), 
messaging, mapping, new RequestCallbacks(new AccordTimeService()));
     }
 
     @Test
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index e0c1d126bf..168fb74b68 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -304,7 +304,7 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
                 }
                 else if (RoutableKey.class.isAssignableFrom(keyType))
                 {
-                    RoutableKey key = (RoutableKey) state.key();
+                    RoutingKey key = (RoutingKey) state.key();
                     if ((keys.contains(key) || ranges.intersects(key))
                         && shouldEvict.getAsBoolean())
                         cache.tryEvict(state);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 65b01668e1..04d2bd6390 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -654,7 +654,7 @@ public class CommandsForKeySerializerTest
         @Override public <T> AsyncChain<T> build(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply) { throw new 
UnsupportedOperationException(); }
         @Override public void shutdown() { }
         @Override protected void registerTransitive(SafeCommandStore 
safeStore, RangeDeps deps){ }
-        @Override public <T> AsyncChain<T> submit(Callable<T> task) { throw 
new UnsupportedOperationException(); }
+        @Override public <T> AsyncChain<T> build(Callable<T> task) { throw new 
UnsupportedOperationException(); }
         @Override public void onRecover(Node node, Result success, Throwable 
fail) { throw new UnsupportedOperationException(); }
         @Override public void onInconsistentTimestamp(Command command, 
Timestamp prev, Timestamp next) { throw new UnsupportedOperationException(); }
         @Override public void onFailedBootstrap(String phase, Ranges ranges, 
Runnable retry, Throwable failure) { throw new UnsupportedOperationException(); 
}
@@ -670,6 +670,7 @@ public class CommandsForKeySerializerTest
         @Override public long attemptCoordinationDelay(Node node, 
SafeCommandStore safeStore, TxnId txnId, TimeUnit units, int retryCount) { 
return 0; }
         @Override public long seekProgressDelay(Node node, SafeCommandStore 
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil blockedUntil, 
TimeUnit units) { return 0; }
         @Override public long retryAwaitTimeout(Node node, SafeCommandStore 
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil retrying, 
TimeUnit units) { return 0; }
+        @Override public long localSlowAt(TxnId txnId, Status.Phase phase, 
TimeUnit unit) { return 0; }
         @Override public long localExpiresAt(TxnId txnId, Status.Phase phase, 
TimeUnit unit) { return 0; }
         @Override public long expiresAt(ReplyContext replyContext, TimeUnit 
unit) { return 0; }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to