This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 4827516764 CEP-15: (C*) Add notion of CommandsForRanges and make this 
durable in C*
4827516764 is described below

commit 4827516764152441677032e6ef0b025b7912e111
Author: David Capwell <[email protected]>
AuthorDate: Thu May 25 13:20:39 2023 -0700

    CEP-15: (C*) Add notion of CommandsForRanges and make this durable in C*
    
    patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-18519
---
 modules/accord                                     |   2 +-
 src/java/org/apache/cassandra/dht/Token.java       |  11 +
 .../service/accord/AccordCommandStore.java         | 134 +++++-
 .../service/accord/AccordCommandStores.java        |  22 +
 .../service/accord/AccordConfigurationService.java |  13 +-
 ...AccordVerbHandler.java => AccordDataStore.java} |  35 +-
 .../cassandra/service/accord/AccordKeyspace.java   | 337 +++++++++++++-
 .../service/accord/AccordObjectSizes.java          |   3 +
 .../service/accord/AccordSafeCommandStore.java     | 152 ++++--
 .../cassandra/service/accord/AccordService.java    |   8 +-
 .../cassandra/service/accord/AccordStateCache.java |   8 +
 .../service/accord/AccordVerbHandler.java          |  15 +-
 .../service/accord/CommandsForRanges.java          | 508 +++++++++++++++++++++
 .../service/accord/api/AccordRoutingKey.java       |  17 +-
 .../service/accord/async/AsyncLoader.java          | 113 ++++-
 .../service/accord/async/AsyncOperation.java       |  20 +-
 .../serializers/CommandsForKeySerializer.java      |   2 +-
 .../accord/serializers/ListenerSerializers.java    |  35 +-
 .../org/apache/cassandra/utils/IntervalTree.java   |  89 +++-
 .../distributed/impl/AbstractCluster.java          |   7 +-
 .../distributed/test/accord/AccordCQLTest.java     |  18 +-
 .../distributed/test/accord/AccordTestBase.java    |  39 +-
 .../distributed/test/accord/NewSchemaTest.java     |  85 ++++
 .../service/accord/async/AsyncLoaderTest.java      |  12 +-
 .../serializers/CommandsForKeySerializerTest.java  |   4 +-
 25 files changed, 1542 insertions(+), 147 deletions(-)

diff --git a/modules/accord b/modules/accord
index b99c4671fa..3d0ff07cd5 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit b99c4671fa0b22bed7f5a37fc5acaa2d2579e5b2
+Subproject commit 3d0ff07cd5c7db43390b85afa593e6f76471d886
diff --git a/src/java/org/apache/cassandra/dht/Token.java 
b/src/java/org/apache/cassandra/dht/Token.java
index bd327f96b9..00bbe66406 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
 
 public abstract class Token implements RingPosition<Token>, Serializable
 {
@@ -41,6 +42,16 @@ public abstract class Token implements RingPosition<Token>, 
Serializable
         public abstract ByteBuffer toByteArray(Token token);
         public abstract Token fromByteArray(ByteBuffer bytes);
 
+        public byte[] toOrderedByteArray(Token token, ByteComparable.Version 
version)
+        {
+            return ByteSourceInverse.readBytes(asComparableBytes(token, 
version));
+        }
+
+        public Token fromOrderedByteArray(byte[] bytes, ByteComparable.Version 
version)
+        {
+            return 
fromComparableBytes(ByteSource.peekable(ByteSource.fixedLength(bytes)), 
version);
+        }
+
         /**
          * Produce a byte-comparable representation of the token.
          * See {@link Token#asComparableBytes}
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index b3db3f3822..d2f627fb9b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -18,19 +18,29 @@
 
 package org.apache.cassandra.service.accord;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.Key;
 import accord.api.ProgressLog;
+import accord.impl.CommandTimeseriesHolder;
 import accord.impl.CommandsForKey;
 import accord.local.Command;
 import accord.local.CommandStore;
@@ -39,20 +49,35 @@ import accord.local.CommandStores.RangesForEpochHolder;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
-import accord.primitives.Deps;
+import accord.local.SaveStatus;
+import accord.primitives.AbstractKeys;
+import accord.primitives.AbstractRanges;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
 import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
+import accord.utils.async.Observable;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.service.accord.async.AsyncOperation;
 import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 
 public class AccordCommandStore extends CommandStore
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordCommandStore.class);
+
     private static long getThreadId(ExecutorService executor)
     {
         try
@@ -78,6 +103,7 @@ public class AccordCommandStore extends CommandStore
     private AsyncOperation<?> currentOperation = null;
     private AccordSafeCommandStore current = null;
     private long lastSystemTimestampMicros = Long.MIN_VALUE;
+    private CommandsForRanges commandsForRanges = new CommandsForRanges();
 
     public AccordCommandStore(int id,
                               NodeTimeService time,
@@ -94,17 +120,67 @@ public class AccordCommandStore extends CommandStore
         this.commandCache = stateCache.instance(TxnId.class, 
accord.local.Command.class, AccordSafeCommand::new, AccordObjectSizes::command);
         this.commandsForKeyCache = stateCache.instance(RoutableKey.class, 
CommandsForKey.class, AccordSafeCommandsForKey::new, 
AccordObjectSizes::commandsForKey);
         executor.execute(() -> CommandStore.register(this));
+        executor.execute(this::loadRangesToCommands);
     }
 
-    @Override
-    public boolean inStore()
+    private void loadRangesToCommands()
     {
-        return Thread.currentThread().getId() == threadId;
+        AsyncPromise<CommandsForRanges> future = new AsyncPromise<>();
+        AccordKeyspace.findAllCommandsByDomain(id, Routable.Domain.Range, 
ImmutableSet.of("txn_id", "status", "txn", "execute_at", "dependencies"), new 
Observable<UntypedResultSet.Row>()
+        {
+            private CommandsForRanges.Builder builder = new 
CommandsForRanges.Builder();
+            @Override
+            public void onNext(UntypedResultSet.Row row) throws Exception
+            {
+                TxnId txnId = AccordKeyspace.deserializeTxnId(row);
+                SaveStatus status = AccordKeyspace.deserializeStatus(row);
+                Timestamp executeAt = AccordKeyspace.deserializeExecuteAt(row);
+
+                PartialTxn txn = AccordKeyspace.deserializeTxn(row);
+                Seekables<?, ?> keys = txn.keys();
+                if (keys.domain() != Routable.Domain.Range)
+                    throw new AssertionError(String.format("Txn keys are not 
range", txn));
+                Ranges ranges = (Ranges) keys;
+
+                PartialDeps deps = AccordKeyspace.deserializeDependencies(row);
+                List<TxnId> dependsOn = deps == null ? Collections.emptyList() 
: deps.txnIds();
+                builder.put(txnId, ranges, status, executeAt, dependsOn);
+            }
+
+            @Override
+            public void onError(Throwable t)
+            {
+                builder = null;
+                future.tryFailure(t);
+            }
+
+            @Override
+            public void onCompleted()
+            {
+                CommandsForRanges result = this.builder.build();
+                builder = null;
+                future.trySuccess(result);
+            }
+        });
+        try
+        {
+            commandsForRanges = future.get();
+            logger.debug("Loaded {} intervals", commandsForRanges.size());
+        }
+        catch (InterruptedException e)
+        {
+            throw new UncheckedInterruptedException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e.getCause());
+        }
     }
 
     @Override
-    protected void registerHistoricalTransactions(Deps deps)
+    public boolean inStore()
     {
+        return Thread.currentThread().getId() == threadId;
     }
 
     public void setCacheSize(long bytes)
@@ -234,7 +310,7 @@ public class AccordCommandStore extends CommandStore
 
     public AccordSafeCommandStore beginOperation(PreLoadContext preLoadContext,
                                                  Map<TxnId, AccordSafeCommand> 
commands,
-                                                 Map<RoutableKey, 
AccordSafeCommandsForKey> commandsForKeys)
+                                                 NavigableMap<RoutableKey, 
AccordSafeCommandsForKey> commandsForKeys)
     {
         Invariants.checkState(current == null);
         commands.values().forEach(AccordSafeState::preExecute);
@@ -252,6 +328,52 @@ public class AccordCommandStore extends CommandStore
         current = null;
     }
 
+    <O> O mapReduceForRange(Routables<?, ?> keysOrRanges, Ranges slice, 
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
+    {
+        keysOrRanges = keysOrRanges.slice(slice, Routables.Slice.Minimal);
+        switch (keysOrRanges.domain())
+        {
+            case Key:
+            {
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                for (CommandTimeseriesHolder summary : 
commandsForRanges.search(keys))
+                {
+                    accumulate = map.apply(summary, accumulate);
+                    if (accumulate.equals(terminalValue))
+                        return accumulate;
+                }
+            }
+            break;
+            case Range:
+            {
+                AbstractRanges<?> ranges = (AbstractRanges<?>) keysOrRanges;
+                for (Range range : ranges)
+                {
+                    CommandTimeseriesHolder summary = 
commandsForRanges.search(range);
+                    if (summary == null)
+                        continue;
+                    accumulate = map.apply(summary, accumulate);
+                    if (accumulate.equals(terminalValue))
+                        return accumulate;
+                }
+            }
+            break;
+            default:
+                throw new AssertionError("Unknown domain: " + 
keysOrRanges.domain());
+        }
+        return accumulate;
+    }
+
+    CommandsForRanges commandsForRanges()
+    {
+        return commandsForRanges;
+    }
+
+    CommandsForRanges.Updater updateRanges()
+    {
+        return commandsForRanges.update();
+    }
+
     public void abortCurrentOperation()
     {
         current = null;
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 3bad135715..38040b466b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -29,12 +29,14 @@ import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
 import accord.local.ShardDistributor;
+import accord.primitives.Range;
 import accord.primitives.Routables;
 import accord.topology.Topology;
 import accord.utils.MapReduceConsume;
 import accord.utils.RandomSource;
 import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.journal.AsyncWriteCallback;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 
 public class AccordCommandStores extends CommandStores
 {
@@ -96,6 +98,26 @@ public class AccordCommandStores extends CommandStores
         });
     }
 
+    @Override
+    protected boolean shouldBootstrap(Node node, Topology previous, Topology 
updated, Range range)
+    {
+        if (!super.shouldBootstrap(node, previous, updated, range))
+            return false;
+        // we see new ranges when a new keyspace is added, so avoid bootstrap 
in these cases
+        return contains(previous, ((AccordRoutingKey)  
range.start()).keyspace());
+    }
+
+    private static boolean contains(Topology previous, String searchKeyspace)
+    {
+        for (Range range : previous.ranges())
+        {
+            String keyspace = ((AccordRoutingKey)  range.start()).keyspace();
+            if (keyspace.equals(searchKeyspace))
+                return true;
+        }
+        return false;
+    }
+
     private long cacheSize;
 
     synchronized void setCacheSize(long bytes)
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index e8f1033664..3ab64c9bd0 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -20,8 +20,9 @@ package org.apache.cassandra.service.accord;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
-import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
 
 import accord.api.ConfigurationService;
 import accord.local.Node;
@@ -33,7 +34,8 @@ import accord.topology.Topology;
 public class AccordConfigurationService implements ConfigurationService
 {
     private final Node.Id localId;
-    private final List<Listener> listeners = new ArrayList<>();
+    private final List<Listener> listeners = new CopyOnWriteArrayList<>();
+    @GuardedBy("this")
     private final List<Topology> epochs = new ArrayList<>();
 
     public AccordConfigurationService(Node.Id localId)
@@ -43,7 +45,7 @@ public class AccordConfigurationService implements 
ConfigurationService
     }
 
     @Override
-    public synchronized void registerListener(Listener listener)
+    public void registerListener(Listener listener)
     {
         listeners.add(listener);
     }
@@ -55,7 +57,7 @@ public class AccordConfigurationService implements 
ConfigurationService
     }
 
     @Override
-    public Topology getTopologyForEpoch(long epoch)
+    public synchronized Topology getTopologyForEpoch(long epoch)
     {
         return epochs.get((int) epoch);
     }
@@ -64,7 +66,8 @@ public class AccordConfigurationService implements 
ConfigurationService
     public synchronized void fetchTopologyForEpoch(long epoch)
     {
         Topology current = currentTopology();
-        Preconditions.checkArgument(epoch > current.epoch(), "Requested to 
fetch epoch %d which is <= %d (current epoch)", epoch, current.epoch());
+        if (epoch < current.epoch())
+            return;
         while (current.epoch() < epoch)
         {
             current = AccordTopologyUtils.createTopology(epochs.size());
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java 
b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
similarity index 53%
copy from src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
copy to src/java/org/apache/cassandra/service/accord/AccordDataStore.java
index 216d64df42..b1f191a396 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
@@ -18,31 +18,30 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import accord.api.DataStore;
 import accord.local.Node;
-import accord.messages.Request;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import accord.local.SafeCommandStore;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.utils.async.AsyncResults;
 
-public class AccordVerbHandler<T extends Request> implements IVerbHandler<T>
+public enum AccordDataStore implements DataStore
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(AccordVerbHandler.class);
-
-    private final Node node;
+    INSTANCE;
 
-    public AccordVerbHandler(Node node)
+    @Override
+    public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges 
ranges, SyncPoint syncPoint, FetchRanges callback)
     {
-        this.node = node;
+        //TODO (implement): do real work
+        callback.starting(ranges).started(Timestamp.NONE);
+        callback.fetched(ranges);
+        return new ImmediateFetchFuture(ranges);
     }
 
-    @Override
-    public void doVerb(Message<T> message) throws IOException
+    private static class ImmediateFetchFuture extends 
AsyncResults.SettableResult<Ranges> implements FetchResult
     {
-        logger.debug("Receiving {} from {}", message.payload, message.from());
-        message.payload.process(node, EndpointMapping.getId(message.from()), 
message);
+        ImmediateFetchFuture(Ranges ranges) { setSuccess(ranges); }
+        @Override public void abort(Ranges abort) { }
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index cdf257f61f..7cab857ff6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumMap;
@@ -29,10 +30,12 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Lists;
@@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import accord.api.Result;
 import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKey.CommandTimeseries;
+import accord.impl.CommandTimeseries;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommonAttributes;
@@ -53,11 +56,16 @@ import accord.local.Status;
 import accord.primitives.Ballot;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
+import accord.primitives.Routable;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
+import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.Invariants;
+import accord.utils.async.Observable;
+import org.apache.cassandra.concurrent.DebuggableTask;
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
@@ -76,11 +84,14 @@ import 
org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.BTreeRow;
@@ -89,6 +100,10 @@ import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.LocalVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -96,6 +111,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
@@ -112,6 +128,7 @@ import 
org.apache.cassandra.service.accord.serializers.KeySerializers;
 import org.apache.cassandra.service.accord.serializers.ListenerSerializers;
 import org.apache.cassandra.service.accord.txn.TxnData;
 import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 
 import static java.lang.String.format;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
@@ -128,9 +145,10 @@ public class AccordKeyspace
     public static final String COMMANDS = "commands";
     public static final String COMMANDS_FOR_KEY = "commands_for_key";
 
-    private static final String TIMESTAMP_TUPLE = "tuple<bigint, bigint, int>";
     private static final TupleType TIMESTAMP_TYPE = new 
TupleType(Lists.newArrayList(LongType.instance, LongType.instance, 
Int32Type.instance));
-    private static final String KEY_TUPLE = "tuple<uuid, blob>";
+    private static final String TIMESTAMP_TUPLE = 
TIMESTAMP_TYPE.asCQL3Type().toString();
+    private static final TupleType KEY_TYPE = new 
TupleType(Arrays.asList(UUIDType.instance, BytesType.instance));
+    private static final String KEY_TUPLE = KEY_TYPE.asCQL3Type().toString();
 
     private static final ClusteringIndexFilter FULL_PARTITION = new 
ClusteringIndexSliceFilter(Slices.ALL, false);
 
@@ -156,12 +174,36 @@ public class AccordKeyspace
         }
     }
 
+    private enum TokenType
+    {
+        Murmur3((byte) 1),
+        ByteOrdered((byte) 2),
+        ;
+
+        private final byte value;
+
+        TokenType(byte b)
+        {
+            this.value = b;
+        }
+
+        static TokenType valueOf(Token token)
+        {
+            if (token instanceof Murmur3Partitioner.LongToken)
+                return Murmur3;
+            if (token instanceof ByteOrderedPartitioner.BytesToken)
+                return ByteOrdered;
+            throw new IllegalArgumentException("Unexpected token type: " + 
token.getClass());
+        }
+    }
+
     // TODO: store timestamps as blobs (confirm there are no negative numbers, 
or offset)
     private static final TableMetadata Commands =
         parse(COMMANDS,
               "accord commands",
               "CREATE TABLE %s ("
               + "store_id int,"
+              + "domain int," // this is stored as part of txn_id, used 
currently for more cheaper scans of the table
               + format("txn_id %s,", TIMESTAMP_TUPLE)
               + "status int,"
               + "home_key blob,"
@@ -178,8 +220,10 @@ public class AccordKeyspace
               + format("waiting_on_commit set<%s>,", TIMESTAMP_TUPLE)
               + format("waiting_on_apply map<%s, blob>,", TIMESTAMP_TUPLE)
               + "listeners set<blob>, "
-              + "PRIMARY KEY((store_id, txn_id))"
-              + ')');
+              + "PRIMARY KEY((store_id, domain, txn_id))"
+              + ')')
+        .partitioner(new 
LocalPartitioner(CompositeType.getInstance(Int32Type.instance, 
Int32Type.instance, TIMESTAMP_TYPE)))
+        .build();
 
     // TODO: naming is not very clearly distinct from the base serializers
     private static class CommandsSerializers
@@ -231,6 +275,7 @@ public class AccordKeyspace
               "accord commands per key",
               "CREATE TABLE %s ("
               + "store_id int, "
+              + "key_token blob, " // can't use "token" as this is restricted 
word in CQL
               + format("key %s, ", KEY_TUPLE)
               + format("max_timestamp %s static, ", TIMESTAMP_TUPLE)
               + format("last_executed_timestamp %s static, ", TIMESTAMP_TUPLE)
@@ -240,8 +285,10 @@ public class AccordKeyspace
               + "series int, "
               + format("timestamp %s, ", TIMESTAMP_TUPLE)
               + "data blob, "
-              + "PRIMARY KEY((store_id, key), series, timestamp)"
-              + ')');
+              + "PRIMARY KEY((store_id, key_token, key), series, timestamp)"
+              + ')')
+        .partitioner(new 
LocalPartitioner(CompositeType.getInstance(Int32Type.instance, 
BytesType.instance, KEY_TYPE)))
+        .build();
 
     private static class CommandsForKeyColumns
     {
@@ -292,13 +339,12 @@ public class AccordKeyspace
         }
     }
 
-    private static TableMetadata parse(String name, String description, String 
cql)
+    private static TableMetadata.Builder parse(String name, String 
description, String cql)
     {
         return CreateTableStatement.parse(format(cql, name), 
ACCORD_KEYSPACE_NAME)
                                    
.id(TableId.forSystemTable(ACCORD_KEYSPACE_NAME, name))
                                    .comment(description)
-                                   .gcGraceSeconds((int) 
TimeUnit.DAYS.toSeconds(90))
-                                   .build();
+                                   .gcGraceSeconds((int) 
TimeUnit.DAYS.toSeconds(90));
     }
 
     public static KeyspaceMetadata metadata()
@@ -338,7 +384,7 @@ public class AccordKeyspace
 
     private static <T> T deserializeOrNull(ByteBuffer bytes, 
LocalVersionedSerializer<T> serializer) throws IOException
     {
-        return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ? 
deserialize(bytes, serializer) : null;
+        return bytes != null && !ByteBufferAccessor.instance.isEmpty(bytes) ? 
deserialize(bytes, serializer) : null;
     }
 
     private static ImmutableSortedMap<Timestamp, TxnId> 
deserializeWaitingOnApply(Map<ByteBuffer, ByteBuffer> serialized)
@@ -524,6 +570,7 @@ public class AccordKeyspace
             }
 
             ByteBuffer key = 
CommandsColumns.keyComparator.make(commandStore.id(),
+                                                                
command.txnId().domain().ordinal(),
                                                                 
serializeTimestamp(command.txnId())).serializeAsPartitionKey();
             Row row = builder.build();
             if (row.isEmpty())
@@ -537,6 +584,20 @@ public class AccordKeyspace
         }
     }
 
+    public static ByteBuffer serializeToken(Token token)
+    {
+        return serializeToken(token, ByteBufferAccessor.instance);
+    }
+
+    private static <V> V serializeToken(Token token, ValueAccessor<V> accessor)
+    {
+        TokenType type = TokenType.valueOf(token);
+        byte[] ordered = 
token.getPartitioner().getTokenFactory().toOrderedByteArray(token, 
ByteComparable.Version.OSS42);
+        V value = accessor.allocate(ordered.length + 1);
+        accessor.putByte(value, 0, type.value);
+        ByteArrayAccessor.instance.copyTo(ordered, 0, value, accessor, 1, 
ordered.length);
+        return value;
+    }
 
     private static ByteBuffer serializeKey(PartitionKey key)
     {
@@ -595,13 +656,218 @@ public class AccordKeyspace
     {
         String cql = "SELECT * FROM %s.%s " +
                      "WHERE store_id = ? " +
+                     "AND domain = ? " +
                      "AND txn_id=(?, ?, ?)";
 
         return executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME, 
COMMANDS),
                                commandStore.id(),
+                               txnId.domain().ordinal(),
                                txnId.msb, txnId.lsb, txnId.node.id);
     }
 
+    public static void findAllCommandsByDomain(int commandStore, 
Routable.Domain domain, Set<String> columns, Observable<UntypedResultSet.Row> 
callback)
+    {
+        WalkCommandsForDomain work = new WalkCommandsForDomain(commandStore, 
domain, columns, Stage.READ.executor(), callback);
+        work.schedule();
+    }
+
+    private static abstract class TableWalk implements Runnable, DebuggableTask
+    {
+        private final long creationTimeNanos = Clock.Global.nanoTime();
+        private final Executor executor;
+        private final Observable<UntypedResultSet.Row> callback;
+        private long startTimeNanos = -1;
+        private int numQueries = 0;
+        private UntypedResultSet.Row lastSeen = null;
+
+        private TableWalk(Executor executor, Observable<UntypedResultSet.Row> 
callback)
+        {
+            this.executor = executor;
+            this.callback = callback;
+        }
+
+        protected abstract UntypedResultSet query(UntypedResultSet.Row 
lastSeen);
+
+        public final void schedule()
+        {
+            executor.execute(this);
+        }
+
+        @Override
+        public final void run()
+        {
+            try
+            {
+                if (startTimeNanos == -1)
+                    startTimeNanos = Clock.Global.nanoTime();
+                numQueries++;
+                UntypedResultSet result = query(lastSeen);
+                if (result.isEmpty())
+                {
+                    callback.onCompleted();
+                    return;
+                }
+                UntypedResultSet.Row lastRow = null;
+                for (UntypedResultSet.Row row : result)
+                {
+                    callback.onNext(row);
+                    lastRow = row;
+                }
+                lastSeen = lastRow;
+                schedule();
+            }
+            catch (Throwable t)
+            {
+                callback.onError(t);
+            }
+        }
+
+        @Override
+        public long creationTimeNanos()
+        {
+            return creationTimeNanos;
+        }
+
+        @Override
+        public long startTimeNanos()
+        {
+            return startTimeNanos;
+        }
+
+        @Override
+        public String description()
+        {
+            return String.format("Table Walker for %s; queries = %d", 
getClass().getSimpleName(), numQueries);
+        }
+    }
+
+    private static String selection(TableMetadata metadata, Set<String> 
requiredColumns, Set<String> forIteration)
+    {
+        StringBuilder selection = new StringBuilder();
+        if (requiredColumns.isEmpty())
+            selection.append("*");
+        else
+        {
+            Sets.SetView<String> other = Sets.difference(requiredColumns, 
forIteration);
+            for (String name : other)
+            {
+                ColumnMetadata meta = metadata.getColumn(new 
ColumnIdentifier(name, true));
+                if (meta == null)
+                    throw new IllegalArgumentException("Unknown column: " + 
name);
+            }
+            List<String> names = new ArrayList<>(forIteration.size() + 
other.size());
+            names.addAll(forIteration);
+            names.addAll(other);
+            // this sort is to make sure the CQL is determanistic
+            Collections.sort(names);
+            for (int i = 0; i < names.size(); i++)
+            {
+                if (i > 0)
+                    selection.append(", ");
+                selection.append(names.get(i));
+            }
+        }
+        return selection.toString();
+    }
+
+    private static class WalkCommandsForDomain extends TableWalk
+    {
+        private static final Set<String> COLUMNS_FOR_ITERATION = 
ImmutableSet.of("txn_id", "store_id", "domain");
+        private final String cql;
+        private final int storeId, domain;
+
+        private WalkCommandsForDomain(int commandStore, Routable.Domain 
domain, Set<String> requiredColumns, Executor executor, 
Observable<UntypedResultSet.Row> callback)
+        {
+            super(executor, callback);
+            this.storeId = commandStore;
+            this.domain = domain.ordinal();
+            cql = String.format("SELECT %s " +
+                                "FROM %s " +
+                                "WHERE store_id = ? " +
+                                "      AND domain = ? " +
+                                "      AND token(store_id, domain, txn_id) > 
token(?, ?, (?, ?, ?)) " +
+                                "ALLOW FILTERING", selection(Commands, 
requiredColumns, COLUMNS_FOR_ITERATION), Commands);
+        }
+
+        @Override
+        protected UntypedResultSet query(UntypedResultSet.Row lastSeen)
+        {
+            TxnId lastTxnId = lastSeen == null ?
+                              new TxnId(0, 0, Txn.Kind.Read, 
Routable.Domain.Key, Node.Id.NONE)
+                              : deserializeTxnId(lastSeen);
+            return executeInternal(cql, storeId, domain, storeId, domain, 
lastTxnId.msb, lastTxnId.lsb, lastTxnId.node.id);
+        }
+    }
+
+    public static void findAllKeysBetween(int commandStore,
+                                          Token start, boolean startInclusive,
+                                          Token end, boolean endInclusive,
+                                          Observable<PartitionKey> callback)
+    {
+        //TODO (optimize) : CQL doesn't look smart enough to only walk 
Index.db, and ends up walking the Data.db file for each row in the partitions 
found (for frequent keys, this cost adds up)
+        // it would be possible to find all SSTables that "could" intersect 
this range, then have a merge iterator over the Index.db (filtered to the 
range; index stores partition liveness)...
+        KeysBetween work = new KeysBetween(commandStore,
+                                           
AccordKeyspace.serializeToken(start), startInclusive,
+                                           AccordKeyspace.serializeToken(end), 
endInclusive,
+                                           ImmutableSet.of("key"),
+                                           Stage.READ.executor(), 
Observable.distinct(callback).map(value -> 
AccordKeyspace.deserializeKey(value)));
+        work.schedule();
+    }
+
+    private static class KeysBetween extends TableWalk
+    {
+        private static final Set<String> COLUMNS_FOR_ITERATION = 
ImmutableSet.of("store_id", "key_token");
+
+        private final int storeId;
+        private final ByteBuffer start, end;
+        private final String cqlFirst;
+        private final String cqlContinue;
+
+        private KeysBetween(int storeId,
+                            ByteBuffer start, boolean startInclusive,
+                            ByteBuffer end, boolean endInclusive,
+                            Set<String> requiredColumns,
+                            Executor executor, 
Observable<UntypedResultSet.Row> callback)
+        {
+            super(executor, callback);
+            this.storeId = storeId;
+            this.start = start;
+            this.end = end;
+
+            String selection = selection(CommandsForKeys, requiredColumns, 
COLUMNS_FOR_ITERATION);
+            this.cqlFirst = String.format("SELECT DISTINCT %s\n" +
+                                          "FROM %s\n" +
+                                          "WHERE store_id = ?\n" +
+                                          (startInclusive ? "  AND key_token 
>= ?\n" : "  AND key_token > ?\n") +
+                                          (endInclusive ? "  AND key_token <= 
?\n" : "  AND key_token < ?\n") +
+                                          "ALLOW FILTERING",
+                                          selection, CommandsForKeys);
+            this.cqlContinue = String.format("SELECT DISTINCT %s\n" +
+                                             "FROM %s\n" +
+                                             "WHERE store_id = ?\n" +
+                                             "  AND key_token > ?\n" +
+                                             "  AND key > ?\n" +
+                                             (endInclusive ? "  AND key_token 
<= ?\n" : "  AND key_token < ?\n") +
+                                             "ALLOW FILTERING",
+                                             selection, CommandsForKeys);
+        }
+
+        @Override
+        protected UntypedResultSet query(UntypedResultSet.Row lastSeen)
+        {
+            if (lastSeen == null)
+            {
+                return executeInternal(cqlFirst, storeId, start, end);
+            }
+            else
+            {
+                ByteBuffer previousToken = lastSeen.getBytes("key_token");
+                ByteBuffer previousKey = lastSeen.getBytes("key");
+                return executeInternal(cqlContinue, storeId, previousToken, 
previousKey, end);
+            }
+        }
+    }
+
     public static Command loadCommand(AccordCommandStore commandStore, TxnId 
txnId)
     {
         commandStore.checkNotInStoreThread();
@@ -616,19 +882,19 @@ public class AccordKeyspace
         try
         {
             UntypedResultSet.Row row = rows.one();
-            Invariants.checkState(deserializeTimestampOrNull(row, "txn_id", 
TxnId::fromBits).equals(txnId));
-            SaveStatus status = SaveStatus.values()[row.getInt("status")];
+            Invariants.checkState(deserializeTxnId(row).equals(txnId));
+            SaveStatus status = deserializeStatus(row);
             CommonAttributes.Mutable attributes = new 
CommonAttributes.Mutable(txnId);
             // TODO: something less brittle than ordinal, more efficient than 
values()
             
attributes.durability(Status.Durability.values()[row.getInt("durability", 0)]);
             attributes.homeKey(deserializeOrNull(row.getBlob("home_key"), 
CommandsSerializers.routingKey));
             
attributes.progressKey(deserializeOrNull(row.getBlob("progress_key"), 
CommandsSerializers.routingKey));
             attributes.route(deserializeOrNull(row.getBlob("route"), 
CommandsSerializers.route));
-            attributes.partialTxn(deserializeOrNull(row.getBlob("txn"), 
CommandsSerializers.partialTxn));
-            
attributes.partialDeps(deserializeOrNull(row.getBlob("dependencies"), 
CommandsSerializers.partialDeps));
+            attributes.partialTxn(deserializeTxn(row));
+            attributes.partialDeps(deserializeDependencies(row));
             attributes.setListeners(deserializeListeners(row, "listeners"));
 
-            Timestamp executeAt = deserializeTimestampOrNull(row, 
"execute_at", Timestamp::fromBits);
+            Timestamp executeAt = deserializeExecuteAt(row);
             Ballot promised = deserializeTimestampOrNull(row, 
"promised_ballot", Ballot::fromBits);
             Ballot accepted = deserializeTimestampOrNull(row, 
"accepted_ballot", Ballot::fromBits);
             ImmutableSortedSet<TxnId> waitingOnCommit = 
deserializeTxnIdNavigableSet(row, "waiting_on_commit");
@@ -669,6 +935,43 @@ public class AccordKeyspace
         }
     }
 
+    public static PartialDeps deserializeDependencies(UntypedResultSet.Row 
row) throws IOException
+    {
+        return deserializeOrNull(row.getBlob("dependencies"), 
CommandsSerializers.partialDeps);
+    }
+
+    public static Timestamp deserializeExecuteAt(UntypedResultSet.Row row)
+    {
+        return deserializeTimestampOrNull(row, "execute_at", 
Timestamp::fromBits);
+    }
+
+    public static SaveStatus deserializeStatus(UntypedResultSet.Row row)
+    {
+        return SaveStatus.values()[row.getInt("status")];
+    }
+
+    public static TxnId deserializeTxnId(UntypedResultSet.Row row)
+    {
+        return deserializeTimestampOrNull(row, "txn_id", TxnId::fromBits);
+    }
+
+    public static PartialTxn deserializeTxn(UntypedResultSet.Row row) throws 
IOException
+    {
+        return deserializeOrNull(row.getBlob("txn"), 
CommandsSerializers.partialTxn);
+    }
+
+    public static PartitionKey deserializeKey(UntypedResultSet.Row row)
+    {
+        ByteBuffer[] split = KEY_TYPE.split(ByteBufferAccessor.instance, 
row.getBytes("key"));
+        TableId tableId = 
TableId.fromUUID(UUIDSerializer.instance.deserialize(split[0]));
+        ByteBuffer key = split[1];
+
+        TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
+        if (metadata == null)
+            throw new IllegalStateException("Table with id " + tableId + " 
could not be found; was it deleted?");
+        return new PartitionKey(metadata.keyspace, tableId, 
metadata.partitioner.decorateKey(key));
+    }
+
     private static void addSeriesMutations(ImmutableSortedMap<Timestamp, 
ByteBuffer> prev,
                                            ImmutableSortedMap<Timestamp, 
ByteBuffer> value,
                                            SeriesKind kind,
@@ -713,7 +1016,9 @@ public class AccordKeyspace
 
     private static DecoratedKey makeKey(CommandStore commandStore, 
PartitionKey key)
     {
+        Token token = key.token();
         ByteBuffer pk = 
CommandsForKeyColumns.keyComparator.make(commandStore.id(),
+                                                                  
serializeToken(token),
                                                                   
serializeKey(key)).serializeAsPartitionKey();
         return CommandsForKeys.partitioner.decorateKey(pk);
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index c79d1ba5b9..eac47bfe4e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -245,12 +245,15 @@ public class AccordObjectSizes
 
     private static final long EMPTY_COMMAND_LISTENER = measure(new 
Command.ProxyListener(null));
     private static final long EMPTY_CFK_LISTENER = measure(new 
CommandsForKey.Listener((Key) null));
+    private static final long EMPTY_CFR_LISTENER = measure(new 
CommandsForRanges.Listener(null));
     public static long listener(Command.DurableAndIdempotentListener listener)
     {
         if (listener instanceof Command.ProxyListener)
             return EMPTY_COMMAND_LISTENER + timestamp(((Command.ProxyListener) 
listener).txnId());
         if (listener instanceof CommandsForKey.Listener)
             return EMPTY_CFK_LISTENER + key(((CommandsForKey.Listener) 
listener).key());
+        if (listener instanceof CommandsForRanges.Listener)
+            return EMPTY_CFR_LISTENER + 
timestamp(((CommandsForRanges.Listener) listener).txnId);
         throw new IllegalArgumentException("Unhandled listener type: " + 
listener.getClass());
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index e1bf80895b..5c6ac654f6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -18,9 +18,8 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.Comparator;
 import java.util.Map;
-import java.util.Objects;
+import java.util.NavigableMap;
 import java.util.function.BiFunction;
 
 import javax.annotation.Nullable;
@@ -30,15 +29,20 @@ import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.ProgressLog;
 import accord.impl.AbstractSafeCommandStore;
+import accord.impl.CommandTimeseries;
+import accord.impl.CommandTimeseries.CommandLoader;
+import accord.impl.CommandTimeseriesHolder;
 import accord.impl.CommandsForKey;
 import accord.impl.SafeCommandsForKey;
+import accord.local.CommandStores;
 import accord.local.CommandStores.RangesForEpoch;
 import accord.local.CommonAttributes;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
 import accord.local.Status;
 import accord.primitives.AbstractKeys;
-import accord.primitives.Keys;
+import accord.primitives.Deps;
+import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
 import accord.primitives.Routables;
@@ -51,12 +55,13 @@ import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
 public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeCommandsForKey>
 {
     private final Map<TxnId, AccordSafeCommand> commands;
-    private final Map<RoutableKey, AccordSafeCommandsForKey> commandsForKeys;
+    private final NavigableMap<RoutableKey, AccordSafeCommandsForKey> 
commandsForKeys;
     private final AccordCommandStore commandStore;
+    CommandsForRanges.Updater rangeUpdates = null;
 
     public AccordSafeCommandStore(PreLoadContext context,
                                   Map<TxnId, AccordSafeCommand> commands,
-                                  Map<RoutableKey, AccordSafeCommandsForKey> 
commandsForKey,
+                                  NavigableMap<RoutableKey, 
AccordSafeCommandsForKey> commandsForKey,
                                   AccordCommandStore commandStore)
     {
         super(context);
@@ -146,24 +151,59 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     @Override
     public Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
     {
-        // TODO: Seekables
-        // TODO: efficiency
-        return ((Keys)keysOrRanges).stream()
-                           .map(this::maybeCommandsForKey)
-                           .filter(Objects::nonNull)
-                           .map(SafeCommandsForKey::current)
-                           .filter(Objects::nonNull)
-                           .map(CommandsForKey::max)
-                           .max(Comparator.naturalOrder())
-                           .orElse(Timestamp.NONE);
+        return mapReduce(keysOrRanges, slice, (ts, accum) -> 
Timestamp.max(ts.max(), accum), Timestamp.NONE, null);
     }
 
-    private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, 
BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
+    @Override
+    public void registerHistoricalTransactions(Deps deps)
+    {
+        // used in places such as accord.local.CommandStore.fetchMajorityDeps
+        // We find a set of dependencies for a range then update CommandsFor 
to know about them
+        CommandStores.RangesForEpochHolder rangesForEpochHolder = 
commandStore.rangesForEpochHolder();
+        Ranges allRanges = rangesForEpochHolder.get().all();
+        deps.keyDeps.keys().forEach(allRanges, key -> {
+            SafeCommandsForKey cfk = commandsForKey(key);
+            deps.keyDeps.forEach(key, txnId -> {
+                // TODO (desired, efficiency): this can be made more efficient 
by batching by epoch
+                if 
(rangesForEpochHolder.get().coordinates(txnId).contains(key))
+                    return; // already coordinates, no need to replicate
+                if 
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).contains(key))
+                    return;
+
+                cfk.registerNotWitnessed(txnId);
+            });
+        });
+        CommandsForRanges commandsForRanges = commandStore.commandsForRanges();
+        deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> {
+            if (commandsForRanges.containsLocally(txnId))
+                return;
+
+            Ranges ranges = deps.rangeDeps.ranges(txnId);
+            if 
(rangesForEpochHolder.get().coordinates(txnId).intersects(ranges))
+                return; // already coordinates, no need to replicate
+            if 
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).intersects(ranges))
+                return;
+
+            updateRanges().mergeRemote(txnId, ranges.slice(allRanges), 
Ranges::with);
+        });
+    }
+
+    private <O> O mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, 
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
+    {
+        accumulate = commandStore.mapReduceForRange(keysOrRanges, slice, map, 
accumulate, terminalValue);
+        if (accumulate.equals(terminalValue))
+            return accumulate;
+        return mapReduceForKey(keysOrRanges, slice, map, accumulate, 
terminalValue);
+    }
+
+    private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, 
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
     {
-        switch (keysOrRanges.domain()) {
+        switch (keysOrRanges.domain())
+        {
             default:
-                throw new AssertionError();
+                throw new AssertionError("Unknown domain: " + 
keysOrRanges.domain());
             case Key:
+            {
                 // TODO: efficiency
                 AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
                 for (Key key : keys)
@@ -174,10 +214,26 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
                     if (accumulate.equals(terminalValue))
                         return accumulate;
                 }
-                break;
+            }
+            break;
             case Range:
-                // TODO (required): implement
-                throw new UnsupportedOperationException();
+            {
+                // Assuming the range provided is in the PreLoadContext, then 
AsyncLoader has populated commandsForKeys with keys that
+                // are contained within the ranges... so walk all keys found 
in commandsForKeys
+                Routables<?, ?> sliced = keysOrRanges.slice(slice, 
Routables.Slice.Minimal);
+                if (!context.keys().slice(slice, 
Routables.Slice.Minimal).containsAll(sliced))
+                    throw new AssertionError("Range(s) detected not present in 
the PreLoadContext: expected " + context.keys() + " but given " + keysOrRanges);
+                for (RoutableKey key : commandsForKeys.keySet())
+                {
+                    //TODO (duplicate code): this is a repeat of Key... only 
change is checking contains in range
+                    if (!sliced.contains(key)) continue;
+                    SafeCommandsForKey forKey = commandsForKey(key);
+                    accumulate = map.apply(forKey.current(), accumulate);
+                    if (accumulate.equals(terminalValue))
+                        return accumulate;
+                }
+            }
+            break;
         }
         return accumulate;
     }
@@ -185,8 +241,8 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     @Override
     public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, 
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep 
testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status 
maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
     {
-        accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> {
-            CommandsForKey.CommandTimeseries<?> timeseries;
+        accumulate = mapReduce(keysOrRanges, slice, (forKey, prev) -> {
+            CommandTimeseries<?> timeseries;
             switch (testTimestamp)
             {
                 default: throw new AssertionError();
@@ -198,17 +254,17 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
                 case MAY_EXECUTE_BEFORE:
                     timeseries = forKey.byExecuteAt();
             }
-            CommandsForKey.CommandTimeseries.TestTimestamp remapTestTimestamp;
+            CommandTimeseries.TestTimestamp remapTestTimestamp;
             switch (testTimestamp)
             {
                 default: throw new AssertionError();
                 case STARTED_AFTER:
                 case EXECUTES_AFTER:
-                    remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.AFTER;
+                    remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER;
                     break;
                 case STARTED_BEFORE:
                 case MAY_EXECUTE_BEFORE:
-                    remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE;
+                    remapTestTimestamp = 
CommandTimeseries.TestTimestamp.BEFORE;
             }
             return timeseries.mapReduce(testKind, remapTestTimestamp, 
timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
         }, accumulate, terminalValue);
@@ -227,16 +283,46 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     @Override
     public CommonAttributes completeRegistration(Seekable seekable, Ranges 
ranges, AccordSafeCommand liveCommand, CommonAttributes attrs)
     {
-        Key key = (Key) seekable;
-        if (ranges.contains(key))
+        switch (seekable.domain())
         {
-            AccordSafeCommandsForKey cfk = commandsForKey(key);
-            cfk.register(liveCommand.current());
-            attrs = attrs.mutable().addListener(new 
CommandsForKey.Listener(key));
+            case Key:
+            {
+                Key key = seekable.asKey();
+                if (ranges.contains(key))
+                {
+                    AccordSafeCommandsForKey cfk = commandsForKey(key);
+                    cfk.register(liveCommand.current());
+                    attrs = attrs.mutable().addListener(new 
CommandsForKey.Listener(key));
+                }
+            }
+            break;
+            case Range:
+                Range range = seekable.asRange();
+                if (!ranges.intersects(range))
+                    return attrs;
+                // TODO (api) : cleaner way to deal with this?  This is 
tracked at the Ranges level and not Range level
+                // but we register at the Range level...
+                if (!attrs.durableListeners().stream().anyMatch(l -> l 
instanceof CommandsForRanges.Listener))
+                {
+                    CommandsForRanges.Listener listener = new 
CommandsForRanges.Listener(liveCommand.txnId());
+                    attrs = attrs.mutable().addListener(listener);
+                    // trigger to allow it to run right away
+                    listener.onChange(this, liveCommand);
+                }
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown domain: " + 
seekable.domain());
         }
         return attrs;
     }
 
+    protected CommandsForRanges.Updater updateRanges()
+    {
+        if (rangeUpdates == null)
+            rangeUpdates = commandStore.updateRanges();
+        return rangeUpdates;
+    }
+
     @Override
     protected void invalidateSafeState()
     {
@@ -245,7 +331,7 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     }
 
     @Override
-    public CommandsForKey.CommandLoader<?> cfkLoader()
+    public CommandLoader<?> cfkLoader()
     {
         return CommandsForKeySerializer.loader;
     }
@@ -256,5 +342,7 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
         postExecute();
         commands.values().forEach(AccordSafeState::postExecute);
         commandsForKeys.values().forEach(AccordSafeState::postExecute);
+        if (rangeUpdates != null)
+            rangeUpdates.apply();
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 4de78375bd..3a979c1332 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -141,7 +141,7 @@ public class AccordService implements IAccordService, 
Shutdownable
                              messageSink,
                              configService,
                              AccordService::uniqueNow,
-                             () -> null,
+                             () -> AccordDataStore.INSTANCE,
                              new KeyspaceSplitter(new 
EvenSplit<>(DatabaseDescriptor.getAccordShardCount(), 
getPartitioner().accordSplitter())),
                              agent,
                              new DefaultRandom(),
@@ -302,6 +302,12 @@ public class AccordService implements IAccordService, 
Shutdownable
         ExecutorUtils.shutdownAndWait(timeout, unit, this);
     }
 
+    @VisibleForTesting
+    public Node node()
+    {
+        return node;
+    }
+
     private static Shutdownable toShutdownable(Node node)
     {
         return new Shutdownable() {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
index 2bb852eb54..1cb4fb5a45 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
+import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -352,6 +353,13 @@ public class AccordStateCache
             this.heapEstimator = heapEstimator;
         }
 
+        public Stream<Node<K, V>> stream()
+        {
+            return cache.entrySet().stream()
+                        .filter(e -> 
keyClass.isAssignableFrom(e.getKey().getClass()))
+                        .map(e -> (Node<K, V>) e.getValue());
+        }
+
         @Override
         public boolean equals(Object o)
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java 
b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
index 216d64df42..a35737040d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
@@ -43,6 +43,19 @@ public class AccordVerbHandler<T extends Request> implements 
IVerbHandler<T>
     public void doVerb(Message<T> message) throws IOException
     {
         logger.debug("Receiving {} from {}", message.payload, message.from());
-        message.payload.process(node, EndpointMapping.getId(message.from()), 
message);
+        T request = message.payload;
+        Node.Id from = EndpointMapping.getId(message.from());
+        long knownEpoch = request.knownEpoch();
+        if (!node.topology().hasEpoch(knownEpoch))
+        {
+            node.configService().fetchTopologyForEpoch(knownEpoch);
+            long waitForEpoch = request.waitForEpoch();
+            if (!node.topology().hasEpoch(waitForEpoch))
+            {
+                node.withEpoch(waitForEpoch, () -> request.process(node, from, 
message));
+                return;
+            }
+        }
+        request.process(node, from, message);
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java 
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
new file mode 100644
index 0000000000..24f595c5da
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+
+import accord.api.Key;
+import accord.api.RoutingKey;
+import accord.impl.CommandTimeseries;
+import accord.impl.CommandTimeseriesHolder;
+import accord.local.Command;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.SaveStatus;
+import accord.primitives.AbstractKeys;
+import accord.primitives.PartialDeps;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
+import accord.primitives.RoutableKey;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+public class CommandsForRanges
+{
+    public enum TxnType
+    {
+        UNKNOWN, LOCAL, REMOTE;
+
+        private boolean isSafeToMix(TxnType other)
+        {
+            if (this == UNKNOWN || other == UNKNOWN) return true;
+            return this == other;
+        }
+    }
+
+    public static final class RangeCommandSummary
+    {
+        public final TxnId txnId;
+        public final Ranges ranges;
+        public final SaveStatus status;
+        public final @Nullable Timestamp executeAt;
+        public final List<TxnId> deps;
+
+        RangeCommandSummary(TxnId txnId, Ranges ranges, SaveStatus status, 
@Nullable Timestamp executeAt, List<TxnId> deps)
+        {
+            this.txnId = txnId;
+            this.ranges = ranges;
+            this.status = status;
+            this.executeAt = executeAt;
+            this.deps = deps;
+        }
+
+        public boolean equalsDeep(RangeCommandSummary other)
+        {
+            return Objects.equals(txnId, other.txnId)
+                   && Objects.equals(ranges, other.ranges)
+                   && Objects.equals(status, other.status)
+                   && Objects.equals(executeAt, other.executeAt)
+                   && Objects.equals(deps, other.deps);
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            RangeCommandSummary that = (RangeCommandSummary) o;
+            return txnId.equals(that.txnId);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(txnId);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "RangeCommandSummary{" +
+                   "txnId=" + txnId +
+                   ", status=" + status +
+                   ", ranges=" + ranges +
+                   '}';
+        }
+
+        public RangeCommandSummary withRanges(Ranges ranges, BiFunction<? 
super Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
+        {
+            return new RangeCommandSummary(txnId, 
remappingFunction.apply(this.ranges, ranges), status, executeAt, deps);
+        }
+    }
+
+    private enum RangeCommandSummaryLoader implements 
CommandTimeseries.CommandLoader<RangeCommandSummary>
+    {
+        INSTANCE;
+
+        @Override
+        public RangeCommandSummary saveForCFK(Command command)
+        {
+            //TODO split write from read?
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public TxnId txnId(RangeCommandSummary data)
+        {
+            return data.txnId;
+        }
+
+        @Override
+        public Timestamp executeAt(RangeCommandSummary data)
+        {
+            return data.executeAt;
+        }
+
+        @Override
+        public SaveStatus saveStatus(RangeCommandSummary data)
+        {
+            return data.status;
+        }
+
+        @Override
+        public List<TxnId> depsIds(RangeCommandSummary data)
+        {
+            return data.deps;
+        }
+    }
+
+    public static abstract class AbstractBuilder<T extends AbstractBuilder<T>>
+    {
+        protected final Set<TxnId> localTxns = new HashSet<>();
+        protected final TreeMap<TxnId, RangeCommandSummary> txnToRange = new 
TreeMap<>();
+        protected final IntervalTree.Builder<RoutableKey, RangeCommandSummary, 
Interval<RoutableKey, RangeCommandSummary>> rangeToTxn = new 
IntervalTree.Builder<>();
+
+        public TxnType type(TxnId txnId)
+        {
+            if (!txnToRange.containsKey(txnId)) return TxnType.UNKNOWN;
+            return localTxns.contains(txnId) ? TxnType.LOCAL : TxnType.REMOTE;
+        }
+
+        public T put(TxnId txnId, Ranges ranges, SaveStatus status, Timestamp 
execteAt, List<TxnId> dependsOn)
+        {
+            remove(txnId);
+            RangeCommandSummary summary = new RangeCommandSummary(txnId, 
ranges, status, execteAt, dependsOn);
+            localTxns.add(txnId);
+            txnToRange.put(txnId, summary);
+            addRanges(summary);
+            return (T) this;
+        }
+
+        private void addRanges(RangeCommandSummary summary)
+        {
+            for (Range range : summary.ranges)
+            {
+                rangeToTxn.add(Interval.create(normalize(range.start(), 
range.startInclusive(), true),
+                                               normalize(range.end(), 
range.endInclusive(), false),
+                                               summary));
+            }
+        }
+
+        public T putAll(CommandsForRanges other)
+        {
+            for (TxnId id : other.localCommands)
+            {
+                TxnType thisType = type(id);
+                TxnType otherType = other.type(id);
+                Invariants.checkArgument(thisType.isSafeToMix(otherType), 
"Attempted to add %s; expected %s but was %s", id, thisType, otherType);
+            }
+            localTxns.addAll(other.localCommands);
+            txnToRange.putAll(other.commandsToRanges);
+            // If "put" was called before for a txn present in "other", to 
respect the "put" semantics that update must
+            // be removed from "rangeToTxn" (as it got removed from 
"txnToRange").
+            // The expected common case is that this method is called on an 
empty builder, so the removeIf is off an
+            // empty list (aka no-op)
+            rangeToTxn.removeIf(data -> 
other.commandsToRanges.containsKey(data.txnId));
+            rangeToTxn.addAll(other.rangesToCommands);
+            return (T) this;
+        }
+
+        public T mergeRemote(TxnId txnId, Ranges ranges, BiFunction<? super 
Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
+        {
+            // TODO (durability) : remote ranges are not made durable for now. 
 If this command is stored in commands table,
+            // then we have a NotWitnessed command with Ranges, which is not 
expected in accord.local.Command.NotWitnessed.
+            // To properly handle this, the long term storage looks like it 
will need to store these as well.
+            Invariants.checkArgument(!localTxns.contains(txnId), "Attempted to 
merge remote txn %s, but this is a local txn", txnId);
+            // accord.impl.CommandTimeseries.mapReduce does the check on 
status and deps type, and NotWitnessed should match the semantics hard coded in 
InMemorySafeStore...
+            // in that store, the remote history is only ever included when 
minStauts == null and deps == ANY... but mapReduce sees 
accord.local.Status.KnownDeps.hasProposedOrDecidedDeps == false
+            // as a mis-match, so will be excluded... since NotWitnessed will 
return false it will only be included IFF deps = ANY.
+            // When it comes to the minStatus check, the current usage is 
"null", "Committed", "Accepted"... so NotWitnessed will only be included in the 
null case;
+            // the only subtle difference is if minStatus = NotWitnessed, this 
API will include these but InMemoryStore won't
+            RangeCommandSummary oldValue = txnToRange.get(txnId);
+            RangeCommandSummary newValue = oldValue == null ?
+                                           new RangeCommandSummary(txnId, 
ranges, SaveStatus.NotWitnessed, null, Collections.emptyList())
+                                           : oldValue.withRanges(ranges, 
remappingFunction);
+            if (newValue == null)
+            {
+                remove(txnId);
+            }
+            else if (!oldValue.equalsDeep(newValue))
+            {
+                // changes detected... have to update range index
+                rangeToTxn.removeIf(data -> data.txnId.equals(txnId));
+                addRanges(newValue);
+            }
+            return (T) this;
+        }
+
+        public T remove(TxnId txnId)
+        {
+            if (txnToRange.containsKey(txnId))
+            {
+                localTxns.remove(txnId);
+                txnToRange.remove(txnId);
+                rangeToTxn.removeIf(data -> data.txnId.equals(txnId));
+            }
+            return (T) this;
+        }
+    }
+
+    public static class Builder extends AbstractBuilder<Builder>
+    {
+        public CommandsForRanges build()
+        {
+            CommandsForRanges cfr = new CommandsForRanges();
+            cfr.set(this);
+            return cfr;
+        }
+    }
+
+    public class Updater extends AbstractBuilder<Updater>
+    {
+        private Updater()
+        {
+            putAll(CommandsForRanges.this);
+        }
+
+        public void apply()
+        {
+            CommandsForRanges.this.set(this);
+        }
+    }
+
+    public static class Listener implements 
Command.DurableAndIdempotentListener
+    {
+        public final TxnId txnId;
+        private transient SaveStatus saveStatus;
+
+        public Listener(TxnId txnId)
+        {
+            this.txnId = txnId;
+        }
+
+        @Override
+        public void onChange(SafeCommandStore safeStore, SafeCommand 
safeCommand)
+        {
+            Command current = safeCommand.current();
+            if (current.saveStatus() == saveStatus)
+                return;
+            saveStatus = current.saveStatus();
+            PartialDeps deps = current.partialDeps();
+            if (deps == null)
+                return;
+            Seekables<?, ?> keysOrRanges = current.partialTxn().keys();
+            Invariants.checkArgument(keysOrRanges.domain() == 
Routable.Domain.Range, "Expected txn %s to be a Range txn, but was a %s", 
txnId, keysOrRanges.domain());
+
+            List<TxnId> dependsOn = deps.txnIds();
+            ((AccordSafeCommandStore) safeStore).updateRanges()
+                                                .put(txnId, (Ranges) 
keysOrRanges, current.saveStatus(), current.executeAt(), dependsOn);
+        }
+
+        @Override
+        public PreLoadContext listenerPreLoadContext(TxnId caller)
+        {
+            return caller.equals(txnId) ? PreLoadContext.contextFor(txnId) : 
PreLoadContext.contextFor(txnId, Collections.singletonList(caller));
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Listener{" +
+                   "txnId=" + txnId +
+                   ", saveStatus=" + saveStatus +
+                   '}';
+        }
+    }
+
+    private ImmutableSet<TxnId> localCommands;
+    private ImmutableSortedMap<TxnId, RangeCommandSummary> commandsToRanges;
+    private IntervalTree<RoutableKey, RangeCommandSummary, 
Interval<RoutableKey, RangeCommandSummary>> rangesToCommands;
+
+    public CommandsForRanges()
+    {
+        localCommands = ImmutableSet.of();
+        commandsToRanges = ImmutableSortedMap.of();
+        rangesToCommands = IntervalTree.emptyTree();
+    }
+
+    private void set(AbstractBuilder<?> builder)
+    {
+        this.localCommands = ImmutableSet.copyOf(builder.localTxns);
+        this.commandsToRanges = ImmutableSortedMap.copyOf(builder.txnToRange);
+        this.rangesToCommands = builder.rangeToTxn.build();
+    }
+
+    public TxnType type(TxnId txnId)
+    {
+        if (!commandsToRanges.containsKey(txnId)) return TxnType.UNKNOWN;
+        return localCommands.contains(txnId) ? TxnType.LOCAL : TxnType.REMOTE;
+    }
+
+    public boolean containsLocally(TxnId txnId)
+    {
+        return localCommands.contains(txnId);
+    }
+
+    public Iterable<CommandTimeseriesHolder> search(AbstractKeys<Key, ?> keys)
+    {
+        // group by the keyspace, as ranges are based off TokenKey, which is 
scoped to a range
+        Map<String, List<Key>> groupByKeyspace = new TreeMap<>();
+        for (Key key : keys)
+            groupByKeyspace.computeIfAbsent(((PartitionKey) key).keyspace(), 
ignore -> new ArrayList<>()).add(key);
+        return () -> new AbstractIterator<CommandTimeseriesHolder>()
+        {
+            Iterator<String> ksIt = groupByKeyspace.keySet().iterator();
+            Iterator<Map.Entry<Range, Set<RangeCommandSummary>>> rangeIt;
+
+            @Override
+            protected CommandTimeseriesHolder computeNext()
+            {
+                while (true)
+                {
+                    if (rangeIt != null && rangeIt.hasNext())
+                    {
+                        Map.Entry<Range, Set<RangeCommandSummary>> next = 
rangeIt.next();
+                        return result(next.getKey(), next.getValue());
+                    }
+                    rangeIt = null;
+                    if (!ksIt.hasNext())
+                    {
+                        ksIt = null;
+                        return endOfData();
+                    }
+                    String ks = ksIt.next();
+                    List<Key> keys = groupByKeyspace.get(ks);
+                    Map<Range, Set<RangeCommandSummary>> groupByRange = new 
TreeMap<>(Range::compare);
+                    for (Key key : keys)
+                    {
+                        List<Interval<RoutableKey, RangeCommandSummary>> 
matches = rangesToCommands.matches(key);
+                        if (matches.isEmpty())
+                            continue;
+                        for (Interval<RoutableKey, RangeCommandSummary> 
interval : matches)
+                            groupByRange.computeIfAbsent(toRange(interval), 
ignore -> new HashSet<>()).add(interval.data);
+                    }
+                    rangeIt = groupByRange.entrySet().iterator();
+                }
+            }
+        };
+    }
+
+    private static Range toRange(Interval<RoutableKey, RangeCommandSummary> 
interval)
+    {
+        TokenKey start = (TokenKey) interval.min;
+        TokenKey end = (TokenKey) interval.max;
+        // TODO (correctness) : accord doesn't support wrap around, so 
decreaseSlightly may fail in some cases
+        // TODO (correctness) : this logic is mostly used for testing, so is 
it actually safe for all partitioners?
+        return new 
TokenRange(start.withToken(start.token().decreaseSlightly()), end);
+    }
+
+    @Nullable
+    public CommandTimeseriesHolder search(Range range)
+    {
+        List<RangeCommandSummary> matches = 
rangesToCommands.search(Interval.create(normalize(range.start(), 
range.startInclusive(), true),
+                                                                               
     normalize(range.end(), range.endInclusive(), false)));
+        return result(range, matches);
+    }
+
+    private CommandTimeseriesHolder result(Seekable seekable, 
Collection<RangeCommandSummary> matches)
+    {
+        if (matches.isEmpty())
+            return null;
+        return new Holder(seekable, matches);
+    }
+
+    public int size()
+    {
+        return rangesToCommands.intervalCount();
+    }
+
+    public Updater update()
+    {
+        return new Updater();
+    }
+
+    @Override
+    public String toString()
+    {
+        return rangesToCommands.unbuild().toString();
+    }
+
+    private static RoutingKey normalize(RoutingKey key, boolean inclusive, 
boolean upOrDown)
+    {
+        if (inclusive) return key;
+        AccordRoutingKey ak = (AccordRoutingKey) key;
+        switch (ak.kindOfRoutingKey())
+        {
+            case SENTINEL:
+                return normalize(ak.asSentinelKey().toTokenKey(), inclusive, 
upOrDown);
+            case TOKEN:
+                TokenKey tk = ak.asTokenKey();
+                return tk.withToken(upOrDown ? tk.token().increaseSlightly() : 
tk.token().decreaseSlightly());
+            default:
+                throw new IllegalArgumentException("Unknown kind: " + 
ak.kindOfRoutingKey());
+        }
+    }
+
+    private static class Holder implements CommandTimeseriesHolder
+    {
+        private final Seekable keyOrRange;
+        private final Collection<RangeCommandSummary> matches;
+
+        private Holder(Seekable keyOrRange, Collection<RangeCommandSummary> 
matches)
+        {
+            this.keyOrRange = keyOrRange;
+            this.matches = matches;
+        }
+
+        @Override
+        public CommandTimeseries<?> byId()
+        {
+            return build(m -> m.txnId);
+        }
+
+        @Override
+        public CommandTimeseries<?> byExecuteAt()
+        {
+            return build(m -> m.executeAt != null ? m.executeAt : m.txnId);
+        }
+
+        @Override
+        public Timestamp max()
+        {
+            return byExecuteAt().maxTimestamp();
+        }
+
+        private CommandTimeseries<?> build(Function<RangeCommandSummary, 
Timestamp> fn)
+        {
+            CommandTimeseries.Update<RangeCommandSummary> builder = new 
CommandTimeseries.Update<>(keyOrRange, RangeCommandSummaryLoader.INSTANCE);
+            for (RangeCommandSummary m : matches)
+            {
+                if (m.status == SaveStatus.Invalidated)
+                    continue;
+                builder.add(fn.apply(m), m);
+            }
+            return builder.build();
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Holder{" +
+                   "keyOrRange=" + keyOrRange +
+                   ", matches=" + matches +
+                   '}';
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index a0c361a1db..51b8796755 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -45,7 +45,7 @@ import static 
org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
 
 public abstract class AccordRoutingKey extends AccordRoutableKey implements 
RoutingKey
 {
-    enum RoutingKeyKind
+    public enum RoutingKeyKind
     {
         TOKEN, SENTINEL
     }
@@ -58,6 +58,16 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
     public abstract RoutingKeyKind kindOfRoutingKey();
     public abstract long estimatedSizeOnHeap();
 
+    public SentinelKey asSentinelKey()
+    {
+        return (SentinelKey) this;
+    }
+
+    public TokenKey asTokenKey()
+    {
+        return (TokenKey) this;
+    }
+
     public static AccordRoutingKey of(Key key)
     {
         return (AccordRoutingKey) key;
@@ -191,6 +201,11 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             this.token = token;
         }
 
+        public TokenKey withToken(Token token)
+        {
+            return new TokenKey(keyspace, token);
+        }
+
         @Override
         public Token token()
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index 0a486ee402..9d66e75e6f 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -23,31 +23,41 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.ImmutableSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.RoutingKey;
 import accord.impl.CommandsForKey;
 import accord.local.Command;
 import accord.local.PreLoadContext;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
+import accord.primitives.Seekables;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
+import accord.utils.async.Observable;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordLoadingState;
 import org.apache.cassandra.service.accord.AccordSafeState;
 import org.apache.cassandra.service.accord.AccordStateCache;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 
 public class AsyncLoader
@@ -65,15 +75,21 @@ public class AsyncLoader
     private final AccordCommandStore commandStore;
 
     private final Iterable<TxnId> txnIds;
-    private final Iterable<RoutableKey> keys;
+    private final Seekables<?, ?> keysOrRanges;
 
     protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<RoutableKey> keys)
+    @Deprecated
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Iterable<RoutableKey> keysOrRanges)
+    {
+        this(commandStore, txnIds, (Seekables<?, ?>) keysOrRanges);
+    }
+
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> 
txnIds, Seekables<?, ?> keysOrRanges)
     {
         this.commandStore = commandStore;
         this.txnIds = txnIds;
-        this.keys = keys;
+        this.keysOrRanges = keysOrRanges;
     }
 
     protected static Iterable<TxnId> txnIds(PreLoadContext context)
@@ -85,7 +101,7 @@ public class AsyncLoader
         return Iterables.concat(Collections.singleton(primaryid), 
additionalIds);
     }
 
-    private <K, V, S extends AccordSafeState<K, V>> void 
referenceAndAssembleReads(Iterable<K> keys,
+    private <K, V, S extends AccordSafeState<K, V>> void 
referenceAndAssembleReads(Iterable<? extends K> keys,
                                                                                
    Map<K, S> context,
                                                                                
    AccordStateCache.Instance<K, V, S> cache,
                                                                                
    Function<K, V> loadFunction,
@@ -140,13 +156,24 @@ public class AsyncLoader
                                   loadCommandFunction(),
                                   readRunnables,
                                   chains);
-
-        referenceAndAssembleReads(keys,
-                                  context.commandsForKeys,
-                                  commandStore.commandsForKeyCache(),
-                                  loadCommandsPerKeyFunction(),
-                                  readRunnables,
-                                  chains);
+        switch (keysOrRanges.domain())
+        {
+            case Key:
+                // cast to Keys fails...
+                Iterable<RoutableKey> keys = (Iterable<RoutableKey>) 
keysOrRanges;
+                referenceAndAssembleReads(keys,
+                                          context.commandsForKeys,
+                                          commandStore.commandsForKeyCache(),
+                                          loadCommandsPerKeyFunction(),
+                                          readRunnables,
+                                          chains);
+            break;
+            case Range:
+                chains.add(referenceAndDispatchReadsForRange(context));
+            break;
+            default:
+                throw new UnsupportedOperationException("Unable to process 
keys of " + keysOrRanges.domain());
+        }
 
         if (chains.isEmpty())
         {
@@ -161,6 +188,66 @@ public class AsyncLoader
         return !chains.isEmpty() ? AsyncChains.reduce(chains, (a, b) -> 
null).beginAsResult() : null;
     }
 
+    private AsyncChain<?> 
referenceAndDispatchReadsForRange(AsyncOperation.Context context)
+    {
+        AsyncChain<Set<? extends RoutableKey>> overlappingKeys = 
findOverlappingKeys((Ranges) keysOrRanges);
+        return overlappingKeys.flatMap(keys -> {
+            if (keys.isEmpty())
+                return AsyncChains.success(null);
+            // TODO (duplicate code): repeat of referenceAndDispatchReads
+            List<Runnable> readRunnables = new ArrayList<>();
+            List<AsyncChain<?>> chains = new ArrayList<>();
+            referenceAndAssembleReads(keys,
+                                      context.commandsForKeys,
+                                      commandStore.commandsForKeyCache(),
+                                      loadCommandsPerKeyFunction(),
+                                      readRunnables,
+                                      chains);
+            // all keys are already loaded
+            if (chains.isEmpty())
+                return AsyncChains.success(null);
+            // runnable results are already contained in the chains collection
+            if (!readRunnables.isEmpty())
+                AsyncChains.ofRunnables(Stage.READ.executor(), 
readRunnables).begin(commandStore.agent());
+            return AsyncChains.reduce(chains, (a, b) -> null);
+        }, commandStore);
+    }
+
+    private AsyncChain<Set<? extends RoutableKey>> findOverlappingKeys(Ranges 
ranges)
+    {
+        assert !ranges.isEmpty();
+
+        List<AsyncChain<Set<PartitionKey>>> chains = new 
ArrayList<>(ranges.size());
+        for (Range range : ranges)
+            chains.add(findOverlappingKeys(range));
+        return AsyncChains.reduce(chains, (a, b) -> 
ImmutableSet.<RoutableKey>builder().addAll(a).addAll(b).build());
+    }
+
+    private AsyncChain<Set<PartitionKey>> findOverlappingKeys(Range range)
+    {
+        Set<PartitionKey> cached = commandStore.commandsForKeyCache().stream()
+                                               .map(n -> (PartitionKey) 
n.key())
+                                               .filter(range::contains)
+                                               .collect(Collectors.toSet());
+        // save to a variable as java gets confused when `.map` is called on 
the result of asChain
+        AsyncChain<Set<PartitionKey>> map = Observable.asChain(callback ->
+                                                               
AccordKeyspace.findAllKeysBetween(commandStore.id(),
+                                                                               
                  toTokenKey(range.start()).token(), range.startInclusive(),
+                                                                               
                  toTokenKey(range.end()).token(), range.endInclusive(),
+                                                                               
                  callback),
+                                                               
Collectors.toSet());
+        return map.map(s -> 
ImmutableSet.<PartitionKey>builder().addAll(s).addAll(cached).build());
+    }
+
+    private static TokenKey toTokenKey(RoutingKey start)
+    {
+        if (start instanceof TokenKey)
+            return (TokenKey) start;
+        if (start instanceof AccordRoutingKey.SentinelKey)
+            return ((AccordRoutingKey.SentinelKey) start).toTokenKey();
+        throw new IllegalArgumentException(String.format("Unable to convert 
RoutingKey %s (type %s) to TokenKey", start, start.getClass()));
+    }
+
     @VisibleForTesting
     void state(State state)
     {
@@ -169,7 +256,7 @@ public class AsyncLoader
 
     public boolean load(AsyncOperation.Context context, BiConsumer<Object, 
Throwable> callback)
     {
-        logger.trace("Running load for {} with state {}: {} {}", callback, 
state, txnIds, keys);
+        logger.trace("Running load for {} with state {}: {} {}", callback, 
state, txnIds, keysOrRanges);
         commandStore.checkInStoreThread();
         switch (state)
         {
@@ -200,7 +287,7 @@ public class AsyncLoader
                 throw new IllegalStateException("Unexpected state: " + state);
         }
 
-        logger.trace("Exiting load for {} with state {}: {} {}", callback, 
state, txnIds, keys);
+        logger.trace("Exiting load for {} with state {}: {} {}", callback, 
state, txnIds, keysOrRanges);
         return state == State.FINISHED;
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index bf86f6777b..04bd716c1b 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.accord.async;
 
 import java.util.HashMap;
+import java.util.TreeMap;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -32,7 +33,6 @@ import accord.local.CommandStore;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
 import accord.primitives.RoutableKey;
-import accord.primitives.Seekables;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChains;
@@ -57,7 +57,7 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
     static class Context
     {
         final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>();
-        final HashMap<RoutableKey, AccordSafeCommandsForKey> commandsForKeys = 
new HashMap<>();
+        final TreeMap<RoutableKey, AccordSafeCommandsForKey> commandsForKeys = 
new TreeMap<>();
 
         void releaseResources(AccordCommandStore commandStore)
         {
@@ -145,7 +145,7 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
 
     AsyncLoader createAsyncLoader(AccordCommandStore commandStore, 
PreLoadContext preLoadContext)
     {
-        return new AsyncLoader(commandStore, txnIds(preLoadContext), 
toRoutableKeys(preLoadContext.keys()));
+        return new AsyncLoader(commandStore, txnIds(preLoadContext), 
preLoadContext.keys());
     }
 
     @VisibleForTesting
@@ -268,7 +268,6 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
         }
     }
 
-
     @Override
     public void run()
     {
@@ -307,19 +306,6 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
         commandStore.executor().execute(this);
     }
 
-    private static Iterable<RoutableKey> toRoutableKeys(Seekables<?, ?> keys)
-    {
-        switch (keys.domain())
-        {
-            default: throw new AssertionError("Unexpected domain: " + 
keys.domain());
-            case Key:
-                return (Iterable<RoutableKey>) keys;
-            case Range:
-                // TODO (required): implement
-                throw new UnsupportedOperationException();
-        }
-    }
-
     static class ForFunction<R> extends AsyncOperation<R>
     {
         private final Function<? super SafeCommandStore, R> function;
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
index f65b1fb437..48ec6fae6c 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import accord.impl.CommandsForKey.CommandLoader;
+import accord.impl.CommandTimeseries.CommandLoader;
 import accord.local.Command;
 import accord.local.SaveStatus;
 import accord.primitives.PartialDeps;
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/ListenerSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/ListenerSerializers.java
index 4581ab8cad..f649a3b325 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/ListenerSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/ListenerSerializers.java
@@ -26,13 +26,14 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.CommandsForRanges;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 
 public class ListenerSerializers
 {
     public enum Kind
     {
-        COMMAND, COMMANDS_FOR_KEY;
+        COMMAND, COMMANDS_FOR_KEY, COMMANDS_FOR_RANGE;
 
         private static Kind of(Command.DurableAndIdempotentListener listener)
         {
@@ -42,6 +43,9 @@ public class ListenerSerializers
             if (listener instanceof CommandsForKey.Listener)
                 return COMMANDS_FOR_KEY;
 
+            if (listener instanceof CommandsForRanges.Listener)
+                return COMMANDS_FOR_RANGE;
+
             throw new IllegalArgumentException("Unsupported listener type: " + 
listener.getClass().getName());
         }
     }
@@ -68,6 +72,27 @@ public class ListenerSerializers
         }
     };
 
+    private static final IVersionedSerializer<CommandsForRanges.Listener> 
cfrListener = new IVersionedSerializer<CommandsForRanges.Listener>()
+    {
+        @Override
+        public void serialize(CommandsForRanges.Listener listener, 
DataOutputPlus out, int version) throws IOException
+        {
+            CommandSerializers.txnId.serialize(listener.txnId, out, version);
+        }
+
+        @Override
+        public CommandsForRanges.Listener deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            return new 
CommandsForRanges.Listener(CommandSerializers.txnId.deserialize(in, version));
+        }
+
+        @Override
+        public long serializedSize(CommandsForRanges.Listener listener, int 
version)
+        {
+            return CommandSerializers.txnId.serializedSize(listener.txnId, 
version);
+        }
+    };
+
     private static final IVersionedSerializer<CommandsForKey.Listener> 
cfkListener = new IVersionedSerializer<CommandsForKey.Listener>()
     {
         @Override
@@ -104,6 +129,9 @@ public class ListenerSerializers
                 case COMMANDS_FOR_KEY:
                     cfkListener.serialize((CommandsForKey.Listener) listener, 
out, version);
                     break;
+                case COMMANDS_FOR_RANGE:
+                    cfrListener.serialize((CommandsForRanges.Listener) 
listener, out, version);
+                    break;
                 default:
                     throw new IllegalArgumentException();
             }
@@ -119,6 +147,8 @@ public class ListenerSerializers
                     return commandListener.deserialize(in, version);
                 case COMMANDS_FOR_KEY:
                     return cfkListener.deserialize(in, version);
+                case COMMANDS_FOR_RANGE:
+                    return cfrListener.deserialize(in, version);
                 default:
                     throw new IllegalArgumentException();
             }
@@ -137,6 +167,9 @@ public class ListenerSerializers
                 case COMMANDS_FOR_KEY:
                     size += 
cfkListener.serializedSize((CommandsForKey.Listener) listener, version);
                     break;
+                case COMMANDS_FOR_RANGE:
+                    size += 
cfrListener.serializedSize((CommandsForRanges.Listener) listener, version);
+                    break;
                 default:
                     throw new IllegalArgumentException();
             }
diff --git a/src/java/org/apache/cassandra/utils/IntervalTree.java 
b/src/java/org/apache/cassandra/utils/IntervalTree.java
index 35ec614a81..97044d165b 100644
--- a/src/java/org/apache/cassandra/utils/IntervalTree.java
+++ b/src/java/org/apache/cassandra/utils/IntervalTree.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.*;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterators;
@@ -69,6 +72,16 @@ public class IntervalTree<C extends Comparable<? super C>, 
D, I extends Interval
         return EMPTY_TREE;
     }
 
+    public static <C extends Comparable<? super C>, D, I extends Interval<C, 
D>> Builder<C, D, I> builder()
+    {
+        return new Builder<>();
+    }
+
+    public Builder<C, D, I> unbuild()
+    {
+        return new Builder<C, D, I>().addAll(this);
+    }
+
     public int intervalCount()
     {
         return count;
@@ -95,13 +108,28 @@ public class IntervalTree<C extends Comparable<? super C>, 
D, I extends Interval
         return head.low;
     }
 
+    public List<Interval<C, D>> matches(Interval<C, D> searchInterval)
+    {
+        if (head == null)
+            return Collections.emptyList();
+
+        List<Interval<C, D>> results = new ArrayList<>();
+        head.searchInternal(searchInterval, i -> results.add(i));
+        return results;
+    }
+
+    public List<Interval<C, D>> matches(C point)
+    {
+        return matches(Interval.<C, D>create(point, point, null));
+    }
+
     public List<D> search(Interval<C, D> searchInterval)
     {
         if (head == null)
             return Collections.<D>emptyList();
 
         List<D> results = new ArrayList<D>();
-        head.searchInternal(searchInterval, results);
+        head.searchInternal(searchInterval, i -> results.add(i.data));
         return results;
     }
 
@@ -217,7 +245,7 @@ public class IntervalTree<C extends Comparable<? super C>, 
D, I extends Interval
             }
         }
 
-        void searchInternal(Interval<C, D> searchInterval, List<D> results)
+        void searchInternal(Interval<C, D> searchInterval, 
Consumer<Interval<C, D>> results)
         {
             if (center.compareTo(searchInterval.min) < 0)
             {
@@ -226,7 +254,7 @@ public class IntervalTree<C extends Comparable<? super C>, 
D, I extends Interval
                     return;
 
                 while (i < intersectsRight.size())
-                    results.add(intersectsRight.get(i++).data);
+                    results.accept(intersectsRight.get(i++));
 
                 if (right != null)
                     right.searchInternal(searchInterval, results);
@@ -238,7 +266,7 @@ public class IntervalTree<C extends Comparable<? super C>, 
D, I extends Interval
                     return;
 
                 for (int i = 0 ; i < j ; i++)
-                    results.add(intersectsLeft.get(i).data);
+                    results.accept(intersectsLeft.get(i));
 
                 if (left != null)
                     left.searchInternal(searchInterval, results);
@@ -248,7 +276,7 @@ public class IntervalTree<C extends Comparable<? super C>, 
D, I extends Interval
                 // Adds every interval contained in this node to the result 
set then search left and right for further
                 // overlapping intervals
                 for (Interval<C, D> interval : intersectsLeft)
-                    results.add(interval.data);
+                    results.accept(interval);
 
                 if (left != null)
                     left.searchInternal(searchInterval, results);
@@ -367,4 +395,55 @@ public class IntervalTree<C extends Comparable<? super C>, 
D, I extends Interval
             return size;
         }
     }
+
+    public static class Builder<C extends Comparable<? super C>, D, I extends 
Interval<C, D>>
+    {
+        private final List<I> intervals = new ArrayList<>();
+
+        public Builder<C, D, I> addAll(IntervalTree<C, D, I> other)
+        {
+            other.forEach(intervals::add);
+            return this;
+        }
+
+        public Builder<C, D, I> add(I interval)
+        {
+            intervals.add(interval);
+            return this;
+        }
+
+        public interface TriPredicate<A, B, C>
+        {
+            boolean test(A a, B b, C c);
+        }
+
+        public Builder<C, D, I> removeIf(TriPredicate<C, C, D> predicate)
+        {
+            intervals.removeIf(i -> predicate.test(i.min, i.max, i.data));
+            return this;
+        }
+
+        public Builder<C, D, I> removeIf(BiPredicate<C, C> predicate)
+        {
+            intervals.removeIf(i -> predicate.test(i.min, i.max));
+            return this;
+        }
+
+        public Builder<C, D, I> removeIf(Predicate<D> predicate)
+        {
+            intervals.removeIf(i -> predicate.test(i.data));
+            return this;
+        }
+
+        public IntervalTree<C, D, I> build()
+        {
+            return IntervalTree.build(intervals);
+        }
+
+        @Override
+        public String toString()
+        {
+            return intervals.toString();
+        }
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 7d333726e5..3bdce4479c 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1061,7 +1061,12 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster<I
             return true;
         });
         if (!drain.isEmpty())
-            throw new ShutdownException(drain);
+        {
+            ShutdownException shutdownException = new ShutdownException(drain);
+            // also log as java will truncate log lists
+            logger.error("Unexpected errors", shutdownException);
+            throw shutdownException;
+        }
     }
 
     private void checkForThreadLeaks()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index b0d61723e8..c43227cc65 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -26,15 +26,14 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
-import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.distributed.Cluster;
 import org.assertj.core.api.Assertions;
+
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -50,7 +49,6 @@ import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.QueryResults;
@@ -142,14 +140,8 @@ public class AccordCQLTest extends AccordTestBase
         String currentTable = keyspace + ".tbl";
         List<String> ddls = Arrays.asList("CREATE KEYSPACE " + keyspace + " 
WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}",
                                           "CREATE TABLE " + currentTable + " 
(k blob, c int, v int, primary key (k, c))");
-        List<String> tokens = SHARED_CLUSTER.stream()
-                                            .flatMap(i -> 
StreamSupport.stream(Splitter.on(",").split(i.config().getString("initial_token")).spliterator(),
 false))
-                                            .collect(Collectors.toList());
-
-        List<ByteBuffer> keys = tokens.stream()
-                                      .map(t -> (Murmur3Partitioner.LongToken) 
Murmur3Partitioner.instance.getTokenFactory().fromString(t))
-                                      
.map(Murmur3Partitioner.LongToken::keyForToken)
-                                      .collect(Collectors.toList());
+        List<String> tokens = tokens();
+        List<ByteBuffer> keys = tokensToKeys(tokens);
         List<String> keyStrings = keys.stream().map(bb -> "0x" + 
ByteBufferUtil.bytesToHex(bb)).collect(Collectors.toList());
         StringBuilder query = new StringBuilder("BEGIN TRANSACTION\n");
 
@@ -159,12 +151,12 @@ public class AccordCQLTest extends AccordTestBase
         query.append("  SELECT row0.v;\n")
              .append("  IF ");
 
-        for (int i = 0; i < keys.size(); i++)
+        for (int i = 0; i < keyStrings.size(); i++)
             query.append((i > 0 ? " AND row" : "row") + i + " IS NULL");
 
         query.append(" THEN\n");
 
-        for (int i = 0; i < keys.size(); i++)
+        for (int i = 0; i < keyStrings.size(); i++)
             query.append("    INSERT INTO " + currentTable + " (k, c, v) 
VALUES (" + keyStrings.get(i) + ", 0, " + i +");\n");
 
         query.append("  END IF\n");
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 58bb314cd6..7736c2ec90 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.distributed.test.accord;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -26,7 +27,9 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
+import com.google.common.base.Splitter;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -42,6 +45,7 @@ import net.bytebuddy.implementation.bind.annotation.This;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.TransactionStatement;
 import org.apache.cassandra.cql3.transactions.ReferenceValue;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
@@ -56,6 +60,7 @@ import 
org.apache.cassandra.service.accord.exceptions.WritePreemptedException;
 import org.apache.cassandra.service.accord.txn.TxnData;
 import org.apache.cassandra.utils.AssertionUtils;
 import org.apache.cassandra.utils.FailingConsumer;
+import org.apache.cassandra.utils.Shared;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
@@ -66,6 +71,12 @@ public abstract class AccordTestBase extends TestBaseImpl
     private static final Logger logger = 
LoggerFactory.getLogger(AccordTestBase.class);
     private static final int MAX_RETRIES = 10;
 
+    @Shared
+    public static class State
+    {
+        public static AtomicInteger coordinateCounts = new AtomicInteger();
+    }
+
     protected static final AtomicInteger COUNTER = new AtomicInteger(0);
 
     protected static Cluster SHARED_CLUSTER;
@@ -128,7 +139,7 @@ public abstract class AccordTestBase extends TestBaseImpl
 
     protected int getAccordCoordinateCount()
     {
-        return SHARED_CLUSTER.get(1).callOnInstance(() -> 
BBAccordCoordinateCountHelper.count.get());
+        return State.coordinateCounts.get();
     }
 
     private static Cluster createCluster() throws IOException
@@ -137,7 +148,7 @@ public abstract class AccordTestBase extends TestBaseImpl
         // disable vnode for now, but should enable before trunk
         return init(Cluster.build(2)
                            .withoutVNodes()
-                           .withConfig(c -> 
c.with(Feature.NETWORK).set("write_request_timeout", "10s")
+                           .withConfig(c -> c.with(Feature.NETWORK, 
Feature.GOSSIP).set("write_request_timeout", "10s")
                                                                    
.set("transaction_timeout", "15s")
                                                                    
.set("legacy_paxos_strategy", "accord"))
                            
.withInstanceInitializer(EnforceUpdateDoesNotPerformRead::install)
@@ -188,7 +199,7 @@ public abstract class AccordTestBase extends TestBaseImpl
         return result;
     }
 
-    private SimpleQueryResult executeWithRetry0(int count, Cluster cluster, 
String check, Object... boundValues)
+    private static SimpleQueryResult executeWithRetry0(int count, Cluster 
cluster, String check, Object... boundValues)
     {
         try
         {
@@ -206,7 +217,7 @@ public abstract class AccordTestBase extends TestBaseImpl
         }
     }
 
-    protected SimpleQueryResult executeWithRetry(Cluster cluster, String 
check, Object... boundValues)
+    protected static SimpleQueryResult executeWithRetry(Cluster cluster, 
String check, Object... boundValues)
     {
         check = wrapInTxn(check);
 
@@ -218,7 +229,7 @@ public abstract class AccordTestBase extends TestBaseImpl
         return executeWithRetry0(0, cluster, check, boundValues);
     }
 
-    private boolean isIdempotent(Cluster cluster, String cql)
+    private static boolean isIdempotent(Cluster cluster, String cql)
     {
         return cluster.get(1).callOnInstance(() -> {
             TransactionStatement stmt = AccordTestUtils.parse(cql);
@@ -257,6 +268,21 @@ public abstract class AccordTestBase extends TestBaseImpl
         return numConstants == 0;
     }
 
+    static List<String> tokens()
+    {
+        return SHARED_CLUSTER.stream()
+                             .flatMap(i -> 
StreamSupport.stream(Splitter.on(",").split(i.config().getString("initial_token")).spliterator(),
 false))
+                             .collect(Collectors.toList());
+    }
+
+    static List<ByteBuffer> tokensToKeys(List<String> tokens)
+    {
+        return tokens.stream()
+                     .map(t -> (Murmur3Partitioner.LongToken) 
Murmur3Partitioner.instance.getTokenFactory().fromString(t))
+                     .map(Murmur3Partitioner.LongToken::keyForToken)
+                     .collect(Collectors.toList());
+    }
+
     public static class EnforceUpdateDoesNotPerformRead
     {
         public static void install(ClassLoader classLoader, Integer num)
@@ -285,7 +311,6 @@ public abstract class AccordTestBase extends TestBaseImpl
 
     public static class BBAccordCoordinateCountHelper
     {
-        static AtomicInteger count = new AtomicInteger();
         static void install(ClassLoader cl, int nodeNumber)
         {
             if (nodeNumber != 1)
@@ -299,7 +324,7 @@ public abstract class AccordTestBase extends TestBaseImpl
 
         public static TxnData coordinate(Txn txn, @SuperCall Callable<TxnData> 
actual) throws Exception
         {
-            count.incrementAndGet();
+            State.coordinateCounts.incrementAndGet();
             return actual.call();
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/NewSchemaTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/NewSchemaTest.java
new file mode 100644
index 0000000000..196638eb86
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/NewSchemaTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.service.accord.AccordService;
+
+public class NewSchemaTest extends AccordTestBase
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(NewSchemaTest.class);
+
+    @Override
+    protected Logger logger()
+    {
+        return logger;
+    }
+
+    @Test
+    public void test()
+    {
+        for (int i = 0; i < 20; i++)
+        {
+            String ks = "ks" + i;
+            String table = ks + ".tbl" + i;
+            SHARED_CLUSTER.schemaChange("CREATE KEYSPACE " + ks + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+            SHARED_CLUSTER.schemaChange(String.format("CREATE TABLE %s (pk 
blob primary key)", table));
+            SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().createEpochFromConfigUnsafe()));
+            SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().setCacheSize(0)));
+
+            List<ByteBuffer> keys = tokensToKeys(tokens());
+
+            read(table, keys).exec();
+        }
+    }
+
+    private static Query read(String table, List<ByteBuffer> keys)
+    {
+        assert !keys.isEmpty();
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < keys.size(); i++)
+            sb.append("let row").append(i).append(" = (select * from 
").append(table).append(" where pk = ?);\n");
+        sb.append("SELECT row0.pk;");
+        return new Query(sb.toString(), keys.toArray());
+    }
+
+    private static class Query
+    {
+        final String cql;
+        final Object[] binds;
+
+        private Query(String cql, Object[] binds)
+        {
+            this.cql = cql;
+            this.binds = binds;
+        }
+
+        SimpleQueryResult exec()
+        {
+            return executeWithRetry(SHARED_CLUSTER, cql, binds);
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index db0dbd6af5..4ffdf8db7b 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.service.accord.async;
 
-import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -34,6 +33,7 @@ import org.junit.Test;
 
 import accord.impl.CommandsForKey;
 import accord.local.Command;
+import accord.primitives.Keys;
 import accord.primitives.PartialTxn;
 import accord.primitives.RoutableKey;
 import accord.primitives.TxnId;
@@ -102,7 +102,7 @@ public class AsyncLoaderTest
         testLoad(safeCfk, commandsForKey(key));
         cfkCache.release(safeCfk);
 
-        AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
singleton(key));
+        AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
Keys.of(key));
 
         // everything is cached, so the loader should return immediately
         commandStore.executeBlocking(() -> {
@@ -143,7 +143,7 @@ public class AsyncLoaderTest
         AccordKeyspace.getCommandsForKeyMutation(commandStore, cfk, 
commandStore.nextSystemTimestampMicros()).apply();
 
         // resources are on disk only, so the loader should suspend...
-        AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
singleton(key));
+        AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
Keys.of(key));
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
         Context context = new Context();
         commandStore.executeBlocking(() -> {
@@ -192,7 +192,7 @@ public class AsyncLoaderTest
         AccordKeyspace.getCommandsForKeyMutation(commandStore, safeCfk, 
commandStore.nextSystemTimestampMicros()).apply();
 
         // resources are on disk only, so the loader should suspend...
-        AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
singleton(key));
+        AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
Keys.of(key));
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
         Context context = new Context();
         commandStore.executeBlocking(() -> {
@@ -244,7 +244,7 @@ public class AsyncLoaderTest
         testLoad(safeCfk, commandsForKey(key));
         cfkCache.release(safeCfk);
 
-        AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
singleton(key));
+        AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), 
Keys.of(key));
 
         // since there's a read future associated with the txnId, we'll wait 
for it to load
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
@@ -291,7 +291,7 @@ public class AsyncLoaderTest
 
         execute(commandStore, () -> {
             AtomicInteger loadCalls = new AtomicInteger();
-            AsyncLoader loader = new AsyncLoader(commandStore, 
ImmutableList.of(txnId1, txnId2), Collections.emptyList()){
+            AsyncLoader loader = new AsyncLoader(commandStore, 
ImmutableList.of(txnId1, txnId2), Keys.EMPTY){
 
                 @Override
                 Function<TxnId, Command> loadCommandFunction()
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 547b03c10c..13485f90b3 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import accord.impl.CommandsForKey;
+import accord.impl.CommandTimeseries;
 import accord.primitives.TxnId;
 import accord.utils.AccordGens;
 import accord.utils.Gens;
@@ -75,7 +75,7 @@ public class CommandsForKeySerializerTest
     @Test
     public void serde()
     {
-        CommandsForKey.CommandLoader<ByteBuffer> loader = 
CommandsForKeySerializer.loader;
+        CommandTimeseries.CommandLoader<ByteBuffer> loader = 
CommandsForKeySerializer.loader;
         qt().forAll(AccordGenerators.commands()).check(cmd -> {
             ByteBuffer bb = loader.saveForCFK(cmd);
             int size = bb.remaining();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to