This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 76f0fb5e98 Introduce Range transactions
76f0fb5e98 is described below

commit 76f0fb5e98cd5f9bc82e7c7f0373b374cb1fa608
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Jan 9 15:40:43 2023 +0000

    Introduce Range transactions
    
    patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18174
---
 .build/include-accord.sh                           |   2 +-
 .../service/accord/AccordCommandStore.java         | 172 ++++++++++-----------
 .../service/accord/AccordCommandsForKey.java       | 153 ++++++++----------
 .../cassandra/service/accord/AccordKeyspace.java   |  22 ++-
 .../service/accord/AccordObjectSizes.java          |   4 +-
 .../service/accord/AccordPartialCommand.java       |  30 ++--
 .../cassandra/service/accord/TokenRange.java       |   7 +-
 .../service/accord/api/AccordRoutingKey.java       |  19 ++-
 .../service/accord/async/AsyncOperation.java       |   4 +-
 .../accord/serializers/AcceptSerializers.java      |   7 +-
 .../accord/serializers/CommandSerializers.java     |   6 +-
 .../service/accord/serializers/KeySerializers.java |   4 +-
 .../accord/serializers/PreacceptSerializers.java   |   4 +-
 .../accord/serializers/TxnRequestSerializer.java   |   4 +-
 .../cassandra/service/accord/txn/TxnNamedRead.java |  11 +-
 .../cassandra/service/accord/txn/TxnRead.java      |   6 +-
 .../cassandra/service/accord/txn/TxnUpdate.java    |   7 +-
 .../cassandra/service/accord/txn/TxnWrite.java     |  10 +-
 .../service/accord/AccordCommandTest.java          |  24 ++-
 .../service/accord/async/AsyncOperationTest.java   |   7 +-
 .../service/accord/async/AsyncWriterTest.java      |  23 ++-
 21 files changed, 256 insertions(+), 270 deletions(-)

diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index e7cedab46e..2f55c1d18c 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -25,7 +25,7 @@ set -o nounset
 bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
 
 accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='686326eedc8d2553c98a30abc81a925be3942b8c'
+accord_branch='1230eceb077c928123e6bf848a103964fe90c9f7'
 accord_src="$bin/cassandra-accord"
 
 checkout() {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 96c2a26c35..87c9c752d9 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -23,27 +23,31 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import java.util.function.BinaryOperator;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
+import javax.annotation.Nullable;
+
 import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.ProgressLog;
+import accord.impl.CommandsForKey;
 import accord.local.Command;
 import accord.local.CommandListener;
 import accord.local.CommandStore;
 import accord.local.CommandStores.RangesForEpoch;
 import accord.local.CommandStores.RangesForEpochHolder;
-import accord.local.CommandsForKey;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
+import accord.local.Status;
 import accord.primitives.Keys;
 import accord.primitives.Ranges;
-import accord.primitives.Routable;
 import accord.primitives.Routables;
+import accord.primitives.Seekable;
 import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.AbstractKeys;
@@ -104,43 +108,14 @@ public class AccordCommandStore extends CommandStore
             return null;
         }
 
-        @Override
-        public CommandsForKey commandsForKey(Key key)
-        {
-            AccordCommandsForKey commandsForKey = 
getCommandsForKeyInternal(key);
-            if (commandsForKey.isEmpty())
-                commandsForKey.initialize();
-            return commandsForKey;
-        }
-
-        @Override
-        public CommandsForKey maybeCommandsForKey(Key key)
-        {
-            AccordCommandsForKey commandsForKey = 
getCommandsForKeyInternal(key);
-            return !commandsForKey.isEmpty() ? commandsForKey : null;
-        }
-
-        @Override
-        public void addAndInvokeListener(TxnId txnId, CommandListener listener)
-        {
-            AccordCommand.WriteOnly command = (AccordCommand.WriteOnly) 
getContext().commands.getOrCreateWriteOnly(txnId, (ignore, id) -> new 
AccordCommand.WriteOnly(id), commandStore());
-            command.addListener(listener);
-            execute(listener.listenerPreLoadContext(txnId), store -> {
-                listener.onChange(store, store.command(txnId));
-            });
-        }
-
-        @Override
-        public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+        public <T> T mapReduce(Routables<?, ?> keysOrRanges, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
         {
-            switch (keysOrRanges.kindOfContents()) {
+            switch (keysOrRanges.domain()) {
                 default:
                     throw new AssertionError();
                 case Key:
-                    // TODO: efficiency
                     AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
                     return keys.stream()
-                               .filter(slice::contains)
                                .map(this::commandsForKey)
                                .map(map)
                                .reduce(initialValue, reduce);
@@ -150,85 +125,102 @@ public class AccordCommandStore extends CommandStore
             }
         }
 
-        public <T> T mapReduce(Routables<?, ?> keysOrRanges, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+        private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges 
slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
         {
-            switch (keysOrRanges.kindOfContents()) {
+            switch (keysOrRanges.domain()) {
                 default:
                     throw new AssertionError();
                 case Key:
+                    // TODO: efficiency
                     AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
-                    return keys.stream()
-                               .map(this::commandsForKey)
-                               .map(map)
-                               .reduce(initialValue, reduce);
+                    for (Key key : keys)
+                    {
+                        if (!slice.contains(key)) continue;
+                        CommandsForKey forKey = commandsForKey(key);
+                        accumulate = map.apply(forKey, accumulate);
+                        if (accumulate.equals(terminalValue))
+                            return accumulate;
+                    }
+                    break;
                 case Range:
-                    // TODO: implement
+                    // TODO (required): implement
                     throw new UnsupportedOperationException();
             }
+            return accumulate;
         }
 
-        public void forEach(Routables<?, ?> keysOrRanges, 
Consumer<CommandsForKey> forEach)
+        @Override
+        public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, 
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep 
testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status 
maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
         {
-            switch (keysOrRanges.kindOfContents()) {
-                default:
-                    throw new AssertionError();
-                case Key:
-                    AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
-                    keys.forEach(key -> forEach.accept(commandsForKey(key)));
-                    break;
-                case Range:
-                    // TODO: implement
-                    throw new UnsupportedOperationException();
-            }
+            accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) 
-> {
+                CommandsForKey.CommandTimeseries timeseries;
+                switch (testTimestamp)
+                {
+                    default: throw new AssertionError();
+                    case STARTED_AFTER:
+                    case STARTED_BEFORE:
+                        timeseries = forKey.byId();
+                        break;
+                    case EXECUTES_AFTER:
+                    case MAY_EXECUTE_BEFORE:
+                        timeseries = forKey.byExecuteAt();
+                }
+                CommandsForKey.CommandTimeseries.TestTimestamp 
remapTestTimestamp;
+                switch (testTimestamp)
+                {
+                    default: throw new AssertionError();
+                    case STARTED_AFTER:
+                    case EXECUTES_AFTER:
+                        remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.AFTER;
+                        break;
+                    case STARTED_BEFORE:
+                    case MAY_EXECUTE_BEFORE:
+                        remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE;
+                }
+                return timeseries.mapReduce(testKind, remapTestTimestamp, 
timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
+            }, accumulate, terminalValue);
+
+            return accumulate;
         }
 
-        public void forEach(Routable keyOrRange, Consumer<CommandsForKey> 
forEach)
+        @Override
+        public void register(Seekables<?, ?> keysOrRanges, Ranges slice, 
Command command)
         {
-            switch (keyOrRange.domain())
-            {
-                default: throw new AssertionError();
-                case Key:
-                    forEach.accept(commandsForKey((Key) keyOrRange));
-                    break;
-                case Range:
-                    // TODO: implement
-                    throw new UnsupportedOperationException();
-            }
+            // TODO (required): support ranges
+            Routables.foldl((Keys)keysOrRanges, slice, (k, v, i) -> { 
commandsForKey(k).register(command); return v; }, null);
         }
 
         @Override
-        public void forEach(Routables<?, ?> keysOrRanges, Ranges slice, 
Consumer<CommandsForKey> forEach)
+        public void register(Seekable keyOrRange, Ranges slice, Command 
command)
         {
-            switch (keysOrRanges.kindOfContents()) {
-                default:
-                    throw new AssertionError();
-                case Key:
-                    AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
-                    keys.forEach(slice, key -> {
-                        forEach.accept(commandsForKey(key));
-                    });
-                    break;
-                case Range:
-                    // TODO: implement
-                    throw new UnsupportedOperationException();
-            }
+            // TODO (required): support ranges
+            Key key = (Key) keyOrRange;
+            if (slice.contains(key))
+                commandsForKey(key).register(command);
+        }
+
+        public AccordCommandsForKey commandsForKey(Key key)
+        {
+            AccordCommandsForKey commandsForKey = 
getCommandsForKeyInternal(key);
+            if (commandsForKey.isEmpty())
+                commandsForKey.initialize();
+            return commandsForKey;
+        }
+
+        public AccordCommandsForKey maybeCommandsForKey(Key key)
+        {
+            AccordCommandsForKey commandsForKey = 
getCommandsForKeyInternal(key);
+            return !commandsForKey.isEmpty() ? commandsForKey : null;
         }
 
         @Override
-        public void forEach(Routable keyOrRange, Ranges slice, 
Consumer<CommandsForKey> forEach)
+        public void addAndInvokeListener(TxnId txnId, CommandListener listener)
         {
-            switch (keyOrRange.domain())
-            {
-                default: throw new AssertionError();
-                case Key:
-                    Key key = (Key) keyOrRange;
-                    if (slice.contains(key))
-                        forEach.accept(commandsForKey(key));
-                    break;
-                case Range:
-                    // TODO: implement
-                    throw new UnsupportedOperationException();
-            }
+            AccordCommand.WriteOnly command = (AccordCommand.WriteOnly) 
getContext().commands.getOrCreateWriteOnly(txnId, (ignore, id) -> new 
AccordCommand.WriteOnly(id), commandStore());
+            command.addListener(listener);
+            execute(listener.listenerPreLoadContext(txnId), store -> {
+                listener.onChange(store, store.command(txnId));
+            });
         }
 
         @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
index ada3ee0742..b93dc27763 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
@@ -24,7 +24,6 @@ import java.util.Objects;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import java.util.stream.Stream;
 
 import javax.annotation.Nullable;
@@ -36,7 +35,10 @@ import org.slf4j.LoggerFactory;
 
 import accord.local.Command;
 import accord.local.CommandStore;
-import accord.local.CommandsForKey;
+import accord.impl.CommandsForKey;
+import accord.local.SafeCommandStore;
+import accord.local.SafeCommandStore.TestDep;
+import accord.local.SafeCommandStore.TestKind;
 import accord.local.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -50,9 +52,10 @@ import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.assertj.core.util.VisibleForTesting;
 
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.ANY_DEPS;
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.WITHOUT;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
+import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
+import static accord.local.SafeCommandStore.TestDep.WITH;
+import static accord.local.SafeCommandStore.TestKind.Ws;
+import static accord.local.Status.KnownDeps.DepsUnknown;
 import static 
org.apache.cassandra.service.accord.AccordState.WriteOnly.applyMapChanges;
 import static 
org.apache.cassandra.service.accord.AccordState.WriteOnly.applySetChanges;
 
@@ -97,37 +100,25 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
         public void applyChanges(AccordCommandsForKey instance)
         {
             applySetChanges(this, instance, cfk -> cfk.blindWitnessed);
-            applyMapChanges(this, instance, cfk -> cfk.uncommitted.map);
-            applyMapChanges(this, instance, cfk -> cfk.committedById.map);
-            applyMapChanges(this, instance, cfk -> 
cfk.committedByExecuteAt.map);
+            applyMapChanges(this, instance, cfk -> cfk.byId.map);
+            applyMapChanges(this, instance, cfk -> cfk.byExecuteAt.map);
         }
     }
 
     public enum SeriesKind
     {
-        UNCOMMITTED(Command::txnId),
-        COMMITTED_BY_ID(Command::txnId),
-        COMMITTED_BY_EXECUTE_AT(Command::executeAt);
-
-        private final Function<Command, Timestamp> getTimestamp;
-
-        SeriesKind(Function<Command, Timestamp> timestampFunction)
-        {
-            this.getTimestamp = timestampFunction;
-        }
+        BY_ID, BY_EXECUTE_AT;
     }
 
-    public class Series<T> implements CommandTimeseries<T>
+    public class Series implements CommandTimeseries
     {
         public final SeriesKind kind;
         public final StoredNavigableMap<Timestamp, ByteBuffer> map;
-        private final Function<AccordPartialCommand, T> translate;
 
-        public Series(ReadWrite readWrite, SeriesKind kind, 
Function<AccordPartialCommand, T> translate)
+        public Series(ReadWrite readWrite, SeriesKind kind)
         {
             this.kind = kind;
             map = new StoredNavigableMap<>(readWrite);
-            this.translate = translate;
         }
 
         @Override
@@ -153,24 +144,28 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
             return map.getView().isEmpty();
         }
 
-        @Override
-        public Stream<T> before(Timestamp timestamp, TestKind testKind, 
TestDep testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status 
status)
+        public <T> T mapReduce(TestKind testKind, TestTimestamp testTimestamp, 
Timestamp timestamp,
+                        TestDep testDep, @Nullable TxnId depId,
+                        @Nullable Status minStatus, @Nullable Status maxStatus,
+                        SafeCommandStore.CommandFunction<T, T> map, T 
initialValue, T terminalValue)
         {
-            return idsToCommands(map.getView().headMap(timestamp, 
false).values())
-                   .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
-                   .filter(cmd -> testDep == ANY_DEPS || (cmd.hasDep(depId) ^ 
(testDep == WITHOUT)))
-                   .filter(cmd -> TestStatus.test(cmd.status(), testStatus, 
status))
-                   .map(translate);
-        }
 
-        @Override
-        public Stream<T> after(Timestamp timestamp, TestKind testKind, TestDep 
testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status status)
-        {
-            return idsToCommands(map.getView().tailMap(timestamp, 
false).values())
-                   .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
-                   .filter(cmd -> testDep == ANY_DEPS || (cmd.hasDep(depId) ^ 
(testDep == WITHOUT)))
-                   .filter(cmd -> TestStatus.test(cmd.status(), testStatus, 
status))
-                   .map(translate);
+            for (ByteBuffer buffer : (testTimestamp == TestTimestamp.BEFORE ? 
this.map.getView().headMap(timestamp, false) : 
this.map.getView().tailMap(timestamp, false)).values())
+            {
+                AccordPartialCommand cmd = 
AccordPartialCommand.serializer.deserialize(AccordCommandsForKey.this, 
commandStore, buffer);
+                if (testKind == Ws && cmd.txnId().isRead()) continue;
+                // If we don't have any dependencies, we treat a dependency 
filter as a mismatch
+                if (testDep != ANY_DEPS && (cmd.known().deps == DepsUnknown || 
(cmd.deps().contains(depId) != (testDep == WITH))))
+                    continue;
+                if (minStatus != null && minStatus.compareTo(cmd.status()) > 0)
+                    continue;
+                if (maxStatus != null && maxStatus.compareTo(cmd.status()) < 0)
+                    continue;
+                initialValue = map.apply(key, cmd.txnId(), cmd.executeAt(), 
initialValue);
+                if (initialValue.equals(terminalValue))
+                    break;
+            }
+            return initialValue;
         }
 
         @VisibleForTesting
@@ -195,9 +190,8 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
     public final StoredLong lastExecutedMicros;
     public final StoredValue<Timestamp> lastWriteTimestamp;
     public final StoredSet.Navigable<Timestamp> blindWitnessed;
-    public final Series<TxnIdWithExecuteAt> uncommitted;
-    public final Series<TxnId> committedById;
-    public final Series<TxnId> committedByExecuteAt;
+    public final Series byId;
+    public final Series byExecuteAt;
 
     public AccordCommandsForKey(AccordCommandStore commandStore, PartitionKey 
key)
     {
@@ -208,9 +202,8 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
         lastExecutedMicros = new StoredLong(rw());
         lastWriteTimestamp = new StoredValue<>(rw());
         blindWitnessed = new StoredSet.Navigable<>(rw());
-        uncommitted = new Series<>(rw(), SeriesKind.UNCOMMITTED, x -> x);
-        committedById = new Series<>(rw(), SeriesKind.COMMITTED_BY_ID, 
AccordPartialCommand::txnId);
-        committedByExecuteAt = new Series<>(rw(), 
SeriesKind.COMMITTED_BY_EXECUTE_AT, AccordPartialCommand::txnId);
+        byId = new Series(rw(), SeriesKind.BY_ID);
+        byExecuteAt = new Series(rw(), SeriesKind.BY_EXECUTE_AT);
     }
 
     @Override
@@ -221,9 +214,8 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
                && lastExecutedMicros.isEmpty()
                && lastWriteTimestamp.isEmpty()
                && blindWitnessed.isEmpty()
-               && uncommitted.map.isEmpty()
-               && committedById.map.isEmpty()
-               && committedByExecuteAt.map.isEmpty();
+               && byId.map.isEmpty()
+               && byExecuteAt.map.isEmpty();
     }
 
     public void setEmpty()
@@ -233,9 +225,8 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
         lastExecutedMicros.setEmpty();
         lastWriteTimestamp.setEmpty();
         blindWitnessed.setEmpty();
-        uncommitted.map.setEmpty();
-        committedById.map.setEmpty();
-        committedByExecuteAt.map.setEmpty();
+        byId.map.setEmpty();
+        byExecuteAt.map.setEmpty();
     }
 
     public AccordCommandsForKey initialize()
@@ -245,9 +236,8 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
         lastExecutedMicros.load(Defaults.lastExecutedMicros);
         lastWriteTimestamp.load(Defaults.lastWriteTimestamp);
         blindWitnessed.load(new TreeSet<>());
-        uncommitted.map.load(new TreeMap<>());
-        committedById.map.load(new TreeMap<>());
-        committedByExecuteAt.map.load(new TreeMap<>());
+        byId.map.load(new TreeMap<>());
+        byExecuteAt.map.load(new TreeMap<>());
         return this;
     }
 
@@ -259,9 +249,8 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
                || lastExecutedMicros.hasModifications()
                || lastWriteTimestamp.hasModifications()
                || blindWitnessed.hasModifications()
-               || uncommitted.map.hasModifications()
-               || committedById.map.hasModifications()
-               || committedByExecuteAt.map.hasModifications();
+               || byId.map.hasModifications()
+               || byExecuteAt.map.hasModifications();
     }
 
     @Override
@@ -272,9 +261,8 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
         lastExecutedMicros.clearModifiedFlag();
         lastWriteTimestamp.clearModifiedFlag();
         blindWitnessed.clearModifiedFlag();
-        uncommitted.map.clearModifiedFlag();
-        committedById.map.clearModifiedFlag();
-        committedByExecuteAt.map.clearModifiedFlag();
+        byId.map.clearModifiedFlag();
+        byExecuteAt.map.clearModifiedFlag();
     }
 
     @Override
@@ -285,9 +273,8 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
                && lastExecutedMicros.isLoaded()
                && lastWriteTimestamp.isLoaded()
                && blindWitnessed.isLoaded()
-               && uncommitted.map.isLoaded()
-               && committedById.map.isLoaded()
-               && committedByExecuteAt.map.isLoaded();
+               && byId.map.isLoaded()
+               && byExecuteAt.map.isLoaded();
     }
 
     public CommandStore commandStore()
@@ -310,28 +297,21 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
         size += lastExecutedMicros.estimatedSizeOnHeap();
         size += 
lastWriteTimestamp.estimatedSizeOnHeap(AccordObjectSizes::timestamp);
         size += 
blindWitnessed.estimatedSizeOnHeap(AccordObjectSizes::timestamp);
-        size += 
uncommitted.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp, 
ByteBufferUtil::estimatedSizeOnHeap);
-        size += 
committedById.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp, 
ByteBufferUtil::estimatedSizeOnHeap);
-        size += 
committedByExecuteAt.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp, 
ByteBufferUtil::estimatedSizeOnHeap);
+        size += byId.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp, 
ByteBufferUtil::estimatedSizeOnHeap);
+        size += 
byExecuteAt.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp, 
ByteBufferUtil::estimatedSizeOnHeap);
         return size;
     }
 
     @Override
-    public Series<TxnIdWithExecuteAt> uncommitted()
+    public Series byId()
     {
-        return uncommitted;
+        return byId;
     }
 
     @Override
-    public Series<TxnId> committedById()
+    public Series byExecuteAt()
     {
-        return committedById;
-    }
-
-    @Override
-    public Series<TxnId> committedByExecuteAt()
-    {
-        return committedByExecuteAt;
+        return byExecuteAt;
     }
 
     @Override
@@ -368,19 +348,9 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
 
     public void updateSummaries(AccordCommand command)
     {
-        if (command.status().hasBeen(Status.Committed))
-        {
-            if (command.status.previous() == null || 
!command.status.previous().status.hasBeen(Status.Committed))
-                uncommitted.map.blindRemove(command.txnId());
-
-            ByteBuffer partialCommand = 
AccordPartialCommand.serializer.serialize(new AccordPartialCommand(key, 
command));
-            committedById.map.blindPut(command.txnId(), partialCommand);
-            committedByExecuteAt.map.blindPut(command.executeAt(), 
partialCommand);
-        }
-        else
-        {   // TODO: somebody is inserting large buffers into this map 
(presumably from loading from disk)
-            uncommitted.map.blindPut(command.txnId(), 
AccordPartialCommand.serializer.serialize(new AccordPartialCommand(key, 
command)));
-        }
+        ByteBuffer partialCommand = 
AccordPartialCommand.serializer.serialize(new AccordPartialCommand(key, 
command));
+        byId.map.blindPut(command.txnId(), partialCommand);
+        byExecuteAt.map.blindPut(command.executeAt(), partialCommand);
     }
 
     private static long getTimestampMicros(Timestamp timestamp)
@@ -439,15 +409,14 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
                && lastExecutedMicros.equals(that.lastExecutedMicros)
                && lastWriteTimestamp.equals(that.lastWriteTimestamp)
                && blindWitnessed.equals(that.blindWitnessed)
-               && uncommitted.map.equals(that.uncommitted.map)
-               && committedById.map.equals(that.committedById.map)
-               && 
committedByExecuteAt.map.equals(that.committedByExecuteAt.map);
+               && byId.map.equals(that.byId.map)
+               && byExecuteAt.map.equals(that.byExecuteAt.map);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(commandStore, key, blindWitnessed, maxTimestamp, 
lastExecutedTimestamp, lastExecutedMicros, lastWriteTimestamp, uncommitted, 
committedById, committedByExecuteAt);
+        return Objects.hash(commandStore, key, blindWitnessed, maxTimestamp, 
lastExecutedTimestamp, lastExecutedMicros, lastWriteTimestamp, byId, 
byExecuteAt);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index bee00a965b..9148bb4311 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -251,9 +251,8 @@ public class AccordKeyspace
 
         private static boolean hasRegularChanges(AccordCommandsForKey 
commandsForKey)
         {
-            return commandsForKey.uncommitted.map.hasModifications()
-                   || commandsForKey.committedById.map.hasModifications()
-                   || 
commandsForKey.committedByExecuteAt.map.hasModifications();
+            return commandsForKey.byId.map.hasModifications()
+                   || commandsForKey.byExecuteAt.map.hasModifications();
         }
 
         static RegularAndStaticColumns columnsFor(AccordCommandsForKey 
commandsForKey)
@@ -617,7 +616,7 @@ public class AccordKeyspace
         }
     }
 
-    private static void addSeriesMutations(AccordCommandsForKey.Series<?> 
series,
+    private static void addSeriesMutations(AccordCommandsForKey.Series series,
                                            PartitionUpdate.Builder 
partitionBuilder,
                                            Row.Builder rowBuilder,
                                            long timestampMicros,
@@ -661,9 +660,8 @@ public class AccordKeyspace
         int nowInSeconds = (int) 
TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
 
         int expectedRows = (CommandsForKeyColumns.hasStaticChanges(cfk) ? 1 : 
0)
-                           + cfk.uncommitted.map.totalModifications()
-                           + cfk.committedById.map.totalModifications()
-                           + cfk.committedByExecuteAt.map.totalModifications();
+                           + cfk.byId.map.totalModifications()
+                           + cfk.byExecuteAt.map.totalModifications();
 
         PartitionUpdate.Builder partitionBuilder = new 
PartitionUpdate.Builder(CommandsForKey,
                                                                                
makeKey(cfk),
@@ -700,9 +698,8 @@ public class AccordKeyspace
             partitionBuilder.add(rowBuilder.build());
         }
 
-        addSeriesMutations(cfk.uncommitted, partitionBuilder, rowBuilder, 
timestampMicros, nowInSeconds);
-        addSeriesMutations(cfk.committedById, partitionBuilder, rowBuilder, 
timestampMicros, nowInSeconds);
-        addSeriesMutations(cfk.committedByExecuteAt, partitionBuilder, 
rowBuilder, timestampMicros, nowInSeconds);
+        addSeriesMutations(cfk.byId, partitionBuilder, rowBuilder, 
timestampMicros, nowInSeconds);
+        addSeriesMutations(cfk.byExecuteAt, partitionBuilder, rowBuilder, 
timestampMicros, nowInSeconds);
 
         return new Mutation(partitionBuilder.build());
     }
@@ -804,9 +801,8 @@ public class AccordKeyspace
             }
             Preconditions.checkState(!partitions.hasNext());
 
-            cfk.uncommitted.map.load(seriesMaps.get(SeriesKind.UNCOMMITTED));
-            
cfk.committedById.map.load(seriesMaps.get(SeriesKind.COMMITTED_BY_ID));
-            
cfk.committedByExecuteAt.map.load(seriesMaps.get(SeriesKind.COMMITTED_BY_EXECUTE_AT));
+            cfk.byId.map.load(seriesMaps.get(SeriesKind.BY_ID));
+            cfk.byExecuteAt.map.load(seriesMaps.get(SeriesKind.BY_EXECUTE_AT));
         }
         catch (Throwable t)
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index ca18f67c75..bef7b43ad2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -89,7 +89,7 @@ public class AccordObjectSizes
 
     public static long seekables(Seekables<?, ?> seekables)
     {
-        switch (seekables.kindOfContents())
+        switch (seekables.domain())
         {
             default: throw new AssertionError();
             case Key: return keys((Keys) seekables);
@@ -219,7 +219,7 @@ public class AccordObjectSizes
     {
         long size = EMPTY_WRITES_SIZE;
         size += timestamp(writes.executeAt);
-        size += keys(writes.keys);
+        size += seekables(writes.keys);
         if (writes.write != null)
             size += ((TxnWrite) writes.write).estimatedSizeOnHeap();
         return size;
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java 
b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index 2f708c53b4..19f71da75c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -26,8 +26,9 @@ import java.util.Objects;
 
 import accord.api.Key;
 import accord.local.Command;
-import accord.local.CommandsForKey;
+import accord.local.SaveStatus;
 import accord.local.Status;
+import accord.local.Status.Known;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -44,18 +45,22 @@ import static 
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
 import static 
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
-public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
+public class AccordPartialCommand
 {
     public static final PartialCommandSerializer serializer = new 
PartialCommandSerializer();
 
+    private final TxnId txnId;
+    private final Timestamp executeAt;
+
     // TODO (soon): this should only be a list of TxnId (the deps for the key 
we are persisted against); but should also be stored separately and not brought 
into memory
     private final List<TxnId> deps;
     // TODO (soon): we only require this for Accepted; perhaps more tightly 
couple query API for efficiency
-    private final Status status;
+    private final SaveStatus status;
 
-    AccordPartialCommand(TxnId txnId, Timestamp executeAt, List<TxnId> deps, 
Status status)
+    AccordPartialCommand(TxnId txnId, Timestamp executeAt, List<TxnId> deps, 
SaveStatus status)
     {
-        super(txnId, executeAt);
+        this.txnId = txnId;
+        this.executeAt = executeAt;
         this.deps = deps;
         this.status = status;
     }
@@ -64,7 +69,7 @@ public class AccordPartialCommand extends 
CommandsForKey.TxnIdWithExecuteAt
     {
         this(command.txnId(), command.executeAt(),
              command.partialDeps() == null ? Collections.emptyList() : 
command.partialDeps().txnIds(key),
-             command.status());
+             command.saveStatus());
     }
 
     public TxnId txnId()
@@ -89,7 +94,12 @@ public class AccordPartialCommand extends 
CommandsForKey.TxnIdWithExecuteAt
 
     public Status status()
     {
-        return status;
+        return status.status;
+    }
+
+    public Known known()
+    {
+        return status.known;
     }
 
     @Override
@@ -111,7 +121,7 @@ public class AccordPartialCommand extends 
CommandsForKey.TxnIdWithExecuteAt
             out.write(version.version);
             CommandSerializers.txnId.serialize(command.txnId(), out, 
version.msgVersion);
             serializeNullable(command.executeAt(), out, version.msgVersion, 
CommandSerializers.timestamp);
-            CommandSerializers.status.serialize(command.status(), out, 
version.msgVersion);
+            CommandSerializers.saveStatus.serialize(command.status, out, 
version.msgVersion);
             serializeCollection(command.deps, out, version.msgVersion, 
CommandSerializers.txnId);
         }
 
@@ -146,7 +156,7 @@ public class AccordPartialCommand extends 
CommandsForKey.TxnIdWithExecuteAt
                 return command;
 
             Timestamp executeAt = deserializeNullable(in, version.msgVersion, 
CommandSerializers.timestamp);
-            Status status = CommandSerializers.status.deserialize(in, 
version.msgVersion);
+            SaveStatus status = CommandSerializers.saveStatus.deserialize(in, 
version.msgVersion);
             List<TxnId> deps = deserializeList(in, version.msgVersion, 
CommandSerializers.txnId);
             AccordPartialCommand partial = new AccordPartialCommand(txnId, 
executeAt, deps, status);
             addToContext(partial, context);
@@ -170,7 +180,7 @@ public class AccordPartialCommand extends 
CommandsForKey.TxnIdWithExecuteAt
             int size = 
Math.toIntExact(AccordSerializerVersion.serializer.serializedSize(version));
             size += CommandSerializers.txnId.serializedSize();
             size += serializedNullableSize(command.executeAt(), 
version.msgVersion, CommandSerializers.timestamp);
-            size += CommandSerializers.status.serializedSize(command.status(), 
version.msgVersion);
+            size += 
CommandSerializers.saveStatus.serializedSize(command.status, 
version.msgVersion);
             size += serializedCollectionSize(command.deps, version.msgVersion, 
CommandSerializers.txnId);
             return size;
         }
diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java 
b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index 81cb329f59..a0b6f67ccc 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import accord.api.RoutingKey;
 import accord.primitives.Range;
+import accord.primitives.Ranges;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -41,15 +42,15 @@ public class TokenRange extends Range.EndInclusive
     }
 
     @Override
-    public TokenRange subRange(RoutingKey start, RoutingKey end)
+    public TokenRange newRange(RoutingKey start, RoutingKey end)
     {
         return new TokenRange((AccordRoutingKey) start, (AccordRoutingKey) 
end);
     }
 
     @Override
-    public RoutingKey someIntersectingRoutingKey()
+    public RoutingKey someIntersectingRoutingKey(Ranges ranges)
     {
-        RoutingKey pick = startInclusive() ? start() : end();
+        RoutingKey pick = super.someIntersectingRoutingKey(ranges);
         if (pick instanceof SentinelKey)
             pick = ((SentinelKey) pick).toTokenKey();
         return pick;
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 8a763f5e61..a0c361a1db 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 
@@ -154,6 +155,12 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
                 return TypeSizes.BOOL_SIZE + TypeSizes.sizeof(key.keyspace);
             }
         };
+
+        @Override
+        public Range asRange()
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 
     // final in part because we refer to its class directly in 
AccordRoutableKey.compareToe
@@ -161,6 +168,16 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
     {
         private static final long EMPTY_SIZE;
 
+        @Override
+        public Range asRange()
+        {
+            AccordRoutingKey before = token.isMinimum()
+                                      ? new SentinelKey(keyspace, true)
+                                      : new TokenKey(keyspace, 
token.decreaseSlightly());
+
+            return new TokenRange(before, this);
+        }
+
         static
         {
             Token key = 
getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER).getToken();
@@ -308,7 +325,7 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
                 for (int i = 0; i < splits.size(); i++)
                 {
                     if (i == results.size()) results.add(Ranges.EMPTY);
-                    results.set(i, results.get(i).union(splits.get(i)));
+                    results.set(i, results.get(i).with(splits.get(i)));
                 }
             }
             return results;
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index f03eab2f0d..e302b42489 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -212,13 +212,13 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
 
     private static Iterable<PartitionKey> toPartitionKeys(Seekables<?, ?> keys)
     {
-        switch (keys.kindOfContents())
+        switch (keys.domain())
         {
             default: throw new AssertionError();
             case Key:
                 return (Iterable<PartitionKey>) keys;
             case Range:
-                // TODO: implement
+                // TODO (required): implement
                 throw new UnsupportedOperationException();
         }
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index 91bf1fed10..609e7ee080 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -45,7 +45,6 @@ public class AcceptSerializers
             CommandSerializers.timestamp.serialize(accept.executeAt, out, 
version);
             KeySerializers.seekables.serialize(accept.keys, out, version);
             DepsSerializer.partialDeps.serialize(accept.partialDeps, out, 
version);
-            CommandSerializers.kind.serialize(accept.kind, out, version);
         }
 
         @Override
@@ -55,8 +54,7 @@ public class AcceptSerializers
                           CommandSerializers.ballot.deserialize(in, version),
                           CommandSerializers.timestamp.deserialize(in, 
version),
                           KeySerializers.seekables.deserialize(in, version),
-                          DepsSerializer.partialDeps.deserialize(in, version),
-                          CommandSerializers.kind.deserialize(in, version));
+                          DepsSerializer.partialDeps.deserialize(in, version));
         }
 
         @Override
@@ -65,8 +63,7 @@ public class AcceptSerializers
             return CommandSerializers.ballot.serializedSize(accept.ballot, 
version)
                    + 
CommandSerializers.timestamp.serializedSize(accept.executeAt, version)
                    + KeySerializers.seekables.serializedSize(accept.keys, 
version)
-                   + 
DepsSerializer.partialDeps.serializedSize(accept.partialDeps, version)
-                   + CommandSerializers.kind.serializedSize(accept.kind, 
version);
+                   + 
DepsSerializer.partialDeps.serializedSize(accept.partialDeps, version);
         }
     };
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index 98c1f93eaa..0441c640c8 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -170,7 +170,7 @@ public class CommandSerializers
         public void serialize(Writes writes, DataOutputPlus out, int version) 
throws IOException
         {
             timestamp.serialize(writes.executeAt, out, version);
-            KeySerializers.keys.serialize(writes.keys, out, version);
+            KeySerializers.seekables.serialize(writes.keys, out, version);
             boolean hasWrites = writes.write != null;
             out.writeBoolean(hasWrites);
             if (hasWrites)
@@ -181,7 +181,7 @@ public class CommandSerializers
         public Writes deserialize(DataInputPlus in, int version) throws 
IOException
         {
             return new Writes(timestamp.deserialize(in, version),
-                              KeySerializers.keys.deserialize(in, version),
+                              KeySerializers.seekables.deserialize(in, 
version),
                               in.readBoolean() ? 
TxnWrite.serializer.deserialize(in, version) : null);
         }
 
@@ -189,7 +189,7 @@ public class CommandSerializers
         public long serializedSize(Writes writes, int version)
         {
             long size = timestamp.serializedSize(writes.executeAt, version);
-            size += KeySerializers.keys.serializedSize(writes.keys, version);
+            size += KeySerializers.seekables.serializedSize(writes.keys, 
version);
             boolean hasWrites = writes.write != null;
             size += TypeSizes.sizeof(hasWrites);
             if (hasWrites)
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java 
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
index 5d6453f807..078051bfe1 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
@@ -288,7 +288,7 @@ public class KeySerializers
         @Override
         public void serialize(Seekables<?, ?> t, DataOutputPlus out, int 
version) throws IOException
         {
-            switch (t.kindOfContents())
+            switch (t.domain())
             {
                 default: throw new AssertionError();
                 case Key:
@@ -317,7 +317,7 @@ public class KeySerializers
         @Override
         public long serializedSize(Seekables<?, ?> t, int version)
         {
-            switch (t.kindOfContents())
+            switch (t.domain())
             {
                 default: throw new AssertionError();
                 case Key:
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
index cce1942a07..c12e92e18b 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
@@ -49,7 +49,7 @@ public class PreacceptSerializers
         {
             CommandSerializers.partialTxn.serialize(msg.partialTxn, out, 
version);
             serializeNullable(msg.route, out, version, 
KeySerializers.fullRoute);
-            out.writeUnsignedVInt(msg.maxEpoch - msg.minEpoch);
+            out.writeUnsignedVInt(msg.maxEpoch - msg.minUnsyncedEpoch);
         }
 
         @Override
@@ -67,7 +67,7 @@ public class PreacceptSerializers
         {
             return 
CommandSerializers.partialTxn.serializedSize(msg.partialTxn, version)
                    + serializedNullableSize(msg.route, version, 
KeySerializers.fullRoute)
-                   + TypeSizes.sizeofUnsignedVInt(msg.maxEpoch - msg.minEpoch);
+                   + TypeSizes.sizeofUnsignedVInt(msg.maxEpoch - 
msg.minUnsyncedEpoch);
         }
     };
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
index 1db73e38ab..c9da12801e 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
@@ -79,7 +79,7 @@ public abstract class TxnRequestSerializer<T extends 
TxnRequest<?>> implements I
         void serializeHeader(T msg, DataOutputPlus out, int version) throws 
IOException
         {
             super.serializeHeader(msg, out, version);
-            out.writeUnsignedVInt(msg.minEpoch);
+            out.writeUnsignedVInt(msg.minUnsyncedEpoch);
             out.writeBoolean(msg.doNotComputeProgressKey);
         }
 
@@ -97,7 +97,7 @@ public abstract class TxnRequestSerializer<T extends 
TxnRequest<?>> implements I
         long serializedHeaderSize(T msg, int version)
         {
             long size = super.serializedHeaderSize(msg, version);
-            size += TypeSizes.sizeofUnsignedVInt(msg.minEpoch);
+            size += TypeSizes.sizeofUnsignedVInt(msg.minUnsyncedEpoch);
             size += TypeSizes.BOOL_SIZE;
             return size;
         }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 8eda32ad82..ea11312d72 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord.txn;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 import accord.api.Data;
 import accord.local.SafeCommandStore;
@@ -37,7 +38,6 @@ import 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.accord.AccordCommandsForKey;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -114,8 +114,13 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
     public Future<Data> read(boolean isForWriteTxn, SafeCommandStore 
safeStore, Timestamp executeAt)
     {
         SinglePartitionReadCommand command = (SinglePartitionReadCommand) 
get();
-        AccordCommandsForKey cfk = (AccordCommandsForKey) 
safeStore.commandsForKey(key);
-        int nowInSeconds = cfk.nowInSecondsFor(executeAt, isForWriteTxn);
+        // TODO (required, safety): before release, double check reasoning 
that this is safe
+//        AccordCommandsForKey cfk = 
((SafeAccordCommandStore)safeStore).commandsForKey(key);
+//        int nowInSeconds = cfk.nowInSecondsFor(executeAt, isForWriteTxn);
+        // It's fine for our nowInSeconds to lag slightly our insertion 
timestamp, as to the user
+        // this simply looks like the transaction witnessed TTL'd data and the 
data then expired
+        // immediately after the transaction executed, and this simplifies 
things a great deal
+        int nowInSeconds = (int) 
TimeUnit.MICROSECONDS.toSeconds(executeAt.hlc());
 
         return Stage.READ.submit(() ->
         {
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index 1b42635021..eeb57f13f5 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -28,11 +28,11 @@ import com.google.common.collect.ImmutableList;
 
 import accord.api.Data;
 import accord.api.DataStore;
-import accord.api.Key;
 import accord.api.Read;
 import accord.local.SafeCommandStore;
 import accord.primitives.Keys;
 import accord.primitives.Ranges;
+import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
@@ -139,11 +139,11 @@ public class TxnRead extends 
AbstractKeySorted<TxnNamedRead> implements Read
             if (!reads.contains(namedRead))
                 reads.add(namedRead);
 
-        return new TxnRead(reads, txnKeys.union(read.keys()));
+        return new TxnRead(reads, txnKeys.with((Keys)read.keys()));
     }
 
     @Override
-    public Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore 
safeStore, Timestamp executeAt, DataStore store)
+    public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore 
safeStore, Timestamp executeAt, DataStore store)
     {
         List<Future<Data>> futures = new ArrayList<>();
         forEachWithKey((PartitionKey) key, read -> 
futures.add(read.read(kind.isWrite(), safeStore, executeAt)));
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index ddd57eaf72..f7b6565dbe 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 
+import static accord.utils.SortedArrays.Search.CEIL;
 import static org.apache.cassandra.service.accord.AccordSerializers.serialize;
 import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
 import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
@@ -135,7 +136,7 @@ public class TxnUpdate implements Update
         int j = 0;
         for (int i = 0 ; i < out.size() ; ++i)
         {
-            j = in.findNext(out.get(i), j);
+            j = in.findNext(j, out.get(i), CEIL);
             result[i] = from[j];
         }
         return result;
@@ -146,7 +147,7 @@ public class TxnUpdate implements Update
     {
         // TODO: special method for linear merging keyed and non-keyed lists 
simultaneously
         TxnUpdate that = (TxnUpdate) update;
-        Keys mergedKeys = this.keys.union(that.keys);
+        Keys mergedKeys = this.keys.with(that.keys);
         ByteBuffer[] mergedFragments = merge(this.keys, that.keys, 
this.fragments, that.fragments, mergedKeys.size());
         return new TxnUpdate(mergedKeys, mergedFragments, condition);
     }
@@ -226,7 +227,7 @@ public class TxnUpdate implements Update
             while (j < mi && toKey.apply(items.get(j)).equals(key))
                 ++j;
 
-            int nextki = keys.findNext(key, ki);
+            int nextki = keys.findNext(ki, key, CEIL);
             Arrays.fill(result, ki, nextki, ByteBufferUtil.EMPTY_BYTE_BUFFER);
             ki = nextki;
             result[ki++] = toSerializedValues(items, i, j, serializer, 
MessagingService.current_version);
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index 42ec0143e3..94593ffc52 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -34,6 +34,7 @@ import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.Write;
 import accord.local.SafeCommandStore;
+import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import accord.primitives.Writes;
 import org.apache.cassandra.concurrent.Stage;
@@ -50,6 +51,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
+import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
 import org.apache.cassandra.service.accord.AccordCommandsForKey;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -340,10 +342,14 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
     }
 
     @Override
-    public Future<Void> apply(Key key, SafeCommandStore safeStore, Timestamp 
executeAt, DataStore store)
+    public Future<Void> apply(Seekable key, SafeCommandStore safeStore, 
Timestamp executeAt, DataStore store)
     {
-        AccordCommandsForKey cfk = (AccordCommandsForKey) 
safeStore.commandsForKey(key);
+        AccordCommandsForKey cfk = ((SafeAccordCommandStore) 
safeStore).commandsForKey((Key)key);
+        // TODO (expected, efficiency): 99.9999% of the time we can just use 
executeAt.hlc(), so can avoid bringing
+        //  cfk into memory by retaining at all times in memory key ranges 
that are dirty and must use this logic;
+        //  any that aren't can just use executeAt.hlc
         long timestamp = cfk.timestampMicrosFor(executeAt, true);
+        // TODO (low priority - do we need to compute nowInSeconds, or can we 
just use executeAt?)
         int nowInSeconds = cfk.nowInSecondsFor(executeAt, true);
 
         List<Future<?>> futures = new ArrayList<>();
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 1f6a3b2b6f..6d89246593 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
 import accord.api.Key;
 import accord.api.RoutingKey;
 import accord.local.Command;
-import accord.local.CommandsForKey;
 import accord.local.Node;
 import accord.local.PreLoadContext;
 import accord.local.Status;
@@ -113,11 +112,10 @@ public class AccordCommandTest
             Assert.assertEquals(Status.PreAccepted, command.status());
             Assert.assertTrue(command.partialDeps().isEmpty());
 
-            CommandsForKey cfk = instance.commandsForKey(key(1));
+            AccordCommandsForKey cfk = 
((SafeAccordCommandStore)instance).commandsForKey(key(1));
             Assert.assertEquals(txnId, cfk.max());
-            
Assert.assertNotNull(((AccordCommandsForKey.Series<?>)cfk.uncommitted()).get(txnId));
-            
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.committedById()).get(txnId));
-            
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.committedByExecuteAt()).get(txnId));
+            Assert.assertNotNull((cfk.byId()).get(txnId));
+            Assert.assertNotNull((cfk.byExecuteAt()).get(txnId));
         }).get();
 
         // check accept
@@ -129,7 +127,7 @@ public class AccordCommandTest
             builder.add(key, txnId2);
             deps = builder.build();
         }
-        Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, 
false, Ballot.ZERO, executeAt, partialTxn.keys(), deps, partialTxn.kind());
+        Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, 
false, Ballot.ZERO, executeAt, partialTxn.keys(), deps);
 
         commandStore.execute(accept, instance -> {
             Accept.AcceptReply reply = accept.apply(instance);
@@ -143,11 +141,10 @@ public class AccordCommandTest
             Assert.assertEquals(Status.Accepted, command.status());
             Assert.assertEquals(deps, command.partialDeps());
 
-            CommandsForKey cfk = instance.commandsForKey(key(1));
+            AccordCommandsForKey cfk = 
((SafeAccordCommandStore)instance).commandsForKey(key(1));
             Assert.assertEquals(executeAt, cfk.max());
-            
Assert.assertNotNull(((AccordCommandsForKey.Series<?>)cfk.uncommitted()).get(txnId));
-            
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.committedById()).get(txnId));
-            
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.committedByExecuteAt()).get(txnId));
+            Assert.assertNotNull((cfk.byId()).get(txnId));
+            Assert.assertNotNull((cfk.byExecuteAt()).get(txnId));
         }).get();
 
         // check commit
@@ -160,10 +157,9 @@ public class AccordCommandTest
             Assert.assertTrue(command.hasBeen(Status.Committed));
             Assert.assertEquals(commit.partialDeps, command.partialDeps());
 
-            CommandsForKey cfk = instance.commandsForKey(key(1));
-            
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.uncommitted()).get(txnId));
-            
Assert.assertNotNull(((AccordCommandsForKey.Series<?>)cfk.committedById()).get(txnId));
-            
Assert.assertNotNull(((AccordCommandsForKey.Series<?>)cfk.committedByExecuteAt()).get(commit.executeAt));
+            AccordCommandsForKey cfk = 
((SafeAccordCommandStore)instance).commandsForKey(key(1));
+            Assert.assertNotNull((cfk.byId()).get(txnId));
+            Assert.assertNotNull((cfk.byExecuteAt()).get(commit.executeAt));
         }).get();
     }
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index 87a00fce02..470df82832 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -30,7 +30,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import accord.local.Command;
-import accord.local.CommandsForKey;
 import accord.local.SafeCommandStore;
 import accord.local.Status;
 import accord.primitives.Keys;
@@ -47,6 +46,8 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.accord.AccordCommand;
 import org.apache.cassandra.service.accord.AccordCommandStore;
+import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
+import org.apache.cassandra.service.accord.AccordCommandsForKey;
 import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordStateCache;
 import org.apache.cassandra.service.accord.api.PartitionKey;
@@ -109,8 +110,8 @@ public class AsyncOperationTest
         Txn txn = createTxn((int)clock.incrementAndGet());
         PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
 
-        commandStore.execute(contextFor(Collections.emptyList(), 
Keys.of(key)), instance -> {
-            CommandsForKey cfk = instance.maybeCommandsForKey(key);
+        commandStore.execute(contextFor(Collections.emptyList(), 
Keys.of(key)),instance -> {
+            AccordCommandsForKey cfk = 
((SafeAccordCommandStore)instance).maybeCommandsForKey(key);
             Assert.assertNull(cfk);
         }).get();
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
index 5c8d72f5ba..8526daa675 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
@@ -141,9 +141,8 @@ public class AsyncWriterTest
 
         AccordCommandsForKey cfk = new AccordCommandsForKey(commandStore, 
key).initialize();
         AccordKeyspace.getCommandsForKeyMutation(commandStore, cfk, 
commandStore.nextSystemTimestampMicros()).apply();
-        Assert.assertTrue(cfk.uncommitted.isEmpty());
-        Assert.assertTrue(cfk.committedByExecuteAt.isEmpty());
-        Assert.assertTrue(cfk.committedById.isEmpty());
+        Assert.assertTrue(cfk.byExecuteAt.isEmpty());
+        Assert.assertTrue(cfk.byId.isEmpty());
 
         AccordCommand command = new AccordCommand(txnId).initialize();
         command.setPartialTxn(txn.slice(ranges, true));
@@ -157,13 +156,11 @@ public class AsyncWriterTest
         execute(commandStore, () -> {
             AsyncContext ctx = new AsyncContext();
             commandStore.setContext(ctx);
-            AccordPartialCommand summary = 
getOnlyElement(cfkUncommitted.uncommitted().all().collect(Collectors.toList()));
-            
Assert.assertTrue(cfkUncommitted.uncommitted.map.getView().containsKey(txnId));
+            AccordPartialCommand summary = 
getOnlyElement(cfkUncommitted.byId().all().collect(Collectors.toList()));
+            
Assert.assertTrue(cfkUncommitted.byId.map.getView().containsKey(txnId));
+            
Assert.assertTrue(cfkUncommitted.byExecuteAt.map.getView().containsKey(executeAt));
             Assert.assertEquals(Status.Accepted, summary.status());
             Assert.assertEquals(executeAt, summary.executeAt());
-
-            Assert.assertTrue(cfkUncommitted.committedByExecuteAt.isEmpty());
-            Assert.assertTrue(cfkUncommitted.committedById.isEmpty());
             commandStore.unsetContext(ctx);
         });
 
@@ -177,17 +174,15 @@ public class AsyncWriterTest
         execute(commandStore, () -> {
             AsyncContext ctx = new AsyncContext();
             commandStore.setContext(ctx);
-            AccordPartialCommand idSummary = 
getOnlyElement(cfkCommitted.committedById().all().collect(Collectors.toList()));
-            AccordPartialCommand executeSummary = 
getOnlyElement(cfkCommitted.committedByExecuteAt().all().collect(Collectors.toList()));
+            AccordPartialCommand idSummary = 
getOnlyElement(cfkCommitted.byId().all().collect(Collectors.toList()));
+            AccordPartialCommand executeSummary = 
getOnlyElement(cfkCommitted.byExecuteAt().all().collect(Collectors.toList()));
 
-            
Assert.assertTrue(cfkCommitted.committedById.map.getView().containsKey(txnId));
-            
Assert.assertTrue(cfkCommitted.committedByExecuteAt.map.getView().containsKey(executeAt));
+            
Assert.assertTrue(cfkCommitted.byId.map.getView().containsKey(txnId));
+            
Assert.assertTrue(cfkCommitted.byExecuteAt.map.getView().containsKey(executeAt));
             Assert.assertEquals(idSummary, executeSummary);
 
             Assert.assertEquals(Status.Committed, idSummary.status());
             Assert.assertEquals(executeAt, idSummary.executeAt());
-
-            Assert.assertTrue(cfkCommitted.uncommitted.isEmpty());
             commandStore.unsetContext(ctx);
         });
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to