This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d0ff07c CEP-15: (C*) Add notion of CommandsForRanges and make this 
durable in C*
3d0ff07c is described below

commit 3d0ff07cd5c7db43390b85afa593e6f76471d886
Author: David Capwell <[email protected]>
AuthorDate: Thu May 25 13:18:56 2023 -0700

    CEP-15: (C*) Add notion of CommandsForRanges and make this durable in C*
    
    patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-18519
---
 .../java/accord/coordinate/RangeUnavailable.java   |  38 ++++
 .../java/accord/coordinate/ReadCoordinator.java    |   6 +-
 .../src/main/java/accord/coordinate/Recover.java   |   2 +-
 .../java/accord/impl/AbstractSafeCommandStore.java |   2 +-
 .../main/java/accord/impl/CommandTimeseries.java   | 229 +++++++++++++++++++++
 .../java/accord/impl/CommandTimeseriesHolder.java  |  29 +++
 .../src/main/java/accord/impl/CommandsForKey.java  | 185 +----------------
 .../java/accord/impl/InMemoryCommandStore.java     |  91 ++++----
 .../main/java/accord/impl/SafeCommandsForKey.java  |   3 +-
 .../src/main/java/accord/local/Bootstrap.java      |   4 +-
 .../src/main/java/accord/local/CommandStore.java   |  13 +-
 .../src/main/java/accord/local/CommandStores.java  |  27 ++-
 .../main/java/accord/local/SafeCommandStore.java   |   2 +
 .../src/main/java/accord/local/SaveStatus.java     |  12 ++
 .../src/main/java/accord/primitives/Ranges.java    |  20 ++
 .../src/main/java/accord/primitives/Timestamp.java |  10 +
 .../src/main/java/accord/topology/Topologies.java  |   5 +
 .../src/main/java/accord/topology/Topology.java    |  53 +++--
 .../main/java/accord/topology/TopologyManager.java |  40 ++--
 .../main/java/accord/utils/async/AsyncChains.java  |   2 +-
 .../main/java/accord/utils/async/Observable.java   | 159 ++++++++++++++
 .../src/test/java/accord/impl/mock/EpochSync.java  |   2 +-
 .../test/java/accord/messages/PreAcceptTest.java   |   4 +-
 .../java/accord/topology/TopologyManagerTest.java  |   4 +-
 .../test/java/accord/topology/TopologyTest.java    |   2 +-
 .../src/test/java/accord/maelstrom/Runner.java     |   9 +-
 .../java/accord/maelstrom/SimpleRandomTest.java    |   1 +
 27 files changed, 683 insertions(+), 271 deletions(-)

diff --git a/accord-core/src/main/java/accord/coordinate/RangeUnavailable.java 
b/accord-core/src/main/java/accord/coordinate/RangeUnavailable.java
new file mode 100644
index 00000000..b94e506c
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/RangeUnavailable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.primitives.Ranges;
+import accord.primitives.TxnId;
+
+public class RangeUnavailable extends Exhausted
+{
+    public final Ranges unavailable;
+
+    public RangeUnavailable(Ranges unavailable, TxnId txnId)
+    {
+        super(txnId, null, buildMessage(unavailable));
+        this.unavailable = unavailable;
+    }
+
+    private static String buildMessage(Ranges unavailable)
+    {
+        return "The following ranges are unavailable to read: " + unavailable;
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java 
b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
index 66fa96c2..da744f7d 100644
--- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
+++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
@@ -131,7 +131,11 @@ public abstract class ReadCoordinator<Reply extends 
accord.messages.Reply> exten
                 break;
 
             case ApprovePartial:
-                handle(recordPartialReadSuccess(from, unavailable(reply)));
+                Ranges unavailable = unavailable(reply);
+                RequestStatus result = recordPartialReadSuccess(from, 
unavailable);
+                if (result == RequestStatus.Failed && failure == null)
+                    failure = new RangeUnavailable(unavailable, txnId);
+                handle(result);
                 break;
         }
     }
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java 
b/accord-core/src/main/java/accord/coordinate/Recover.java
index 62ed788c..a0c68a0b 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -66,7 +66,7 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
 
         AwaitCommit(Node node, TxnId txnId, Unseekables<?, ?> unseekables)
         {
-            Topology topology = 
node.topology().globalForEpoch(txnId.epoch()).forSelection(unseekables);
+            Topology topology = 
node.topology().globalForEpoch(txnId.epoch()).forSelection(unseekables, 
Topology.OnUnknown.REJECT);
             this.tracker = new QuorumTracker(new 
Topologies.Single(node.topology().sorter(), topology));
             node.send(topology.nodes(), to -> new WaitOnCommit(to, topology, 
txnId, unseekables), this);
         }
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java 
b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
index cc7de494..36bc696b 100644
--- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
@@ -26,7 +26,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 import accord.api.VisibleForImplementation;
-import accord.impl.CommandsForKey.CommandLoader;
+import accord.impl.CommandTimeseries.CommandLoader;
 import accord.local.Command;
 import accord.local.CommonAttributes;
 import accord.local.PreLoadContext;
diff --git a/accord-core/src/main/java/accord/impl/CommandTimeseries.java 
b/accord-core/src/main/java/accord/impl/CommandTimeseries.java
new file mode 100644
index 00000000..69d4dbcd
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/CommandTimeseries.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableSortedMap;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.SafeCommandStore;
+import accord.local.SaveStatus;
+import accord.local.Status;
+import accord.primitives.Seekable;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
+import static accord.local.SafeCommandStore.TestDep.WITH;
+import static accord.utils.Utils.ensureSortedImmutable;
+import static accord.utils.Utils.ensureSortedMutable;
+
+public class CommandTimeseries<D>
+{
+    public enum TestTimestamp
+    {BEFORE, AFTER}
+
+    private final Seekable keyOrRange;
+    protected final CommandLoader<D> loader;
+    public final ImmutableSortedMap<Timestamp, D> commands;
+
+    public CommandTimeseries(Update<D> builder)
+    {
+        this.keyOrRange = builder.keyOrRange;
+        this.loader = builder.loader;
+        this.commands = ensureSortedImmutable(builder.commands);
+    }
+
+    CommandTimeseries(Seekable keyOrRange, CommandLoader<D> loader, 
ImmutableSortedMap<Timestamp, D> commands)
+    {
+        this.keyOrRange = keyOrRange;
+        this.loader = loader;
+        this.commands = commands;
+    }
+
+    public CommandTimeseries(Key keyOrRange, CommandLoader<D> loader)
+    {
+        this(keyOrRange, loader, ImmutableSortedMap.of());
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CommandTimeseries<?> that = (CommandTimeseries<?>) o;
+        return keyOrRange.equals(that.keyOrRange) && 
loader.equals(that.loader) && commands.equals(that.commands);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int hash = 1;
+        hash = 31 * hash + Objects.hashCode(keyOrRange);
+        hash = 31 * hash + Objects.hashCode(loader);
+        hash = 31 * hash + Objects.hashCode(commands);
+        return hash;
+    }
+
+    public D get(Timestamp key)
+    {
+        return commands.get(key);
+    }
+
+    public boolean isEmpty()
+    {
+        return commands.isEmpty();
+    }
+
+    public Timestamp maxTimestamp()
+    {
+        return commands.isEmpty() ? Timestamp.NONE : commands.keySet().last();
+    }
+
+    /**
+     * All commands before/after (exclusive of) the given timestamp
+     * <p>
+     * Note that {@code testDep} applies only to commands that know at least 
proposed deps; if specified any
+     * commands that do not know any deps will be ignored.
+     * <p>
+     * TODO (expected, efficiency): TestDep should be asynchronous; data 
should not be kept memory-resident as only used for recovery
+     */
+    public <T> T mapReduce(SafeCommandStore.TestKind testKind, TestTimestamp 
testTimestamp, Timestamp timestamp,
+                           SafeCommandStore.TestDep testDep, @Nullable TxnId 
depId,
+                           @Nullable Status minStatus, @Nullable Status 
maxStatus,
+                           SafeCommandStore.CommandFunction<T, T> map, T 
initialValue, T terminalValue)
+    {
+
+        for (D data : (testTimestamp == TestTimestamp.BEFORE ? 
commands.headMap(timestamp, false) : commands.tailMap(timestamp, 
false)).values())
+        {
+            TxnId txnId = loader.txnId(data);
+            if (!testKind.test(txnId.rw())) continue;
+            SaveStatus status = loader.saveStatus(data);
+            if (minStatus != null && minStatus.compareTo(status.status) > 0)
+                continue;
+            if (maxStatus != null && maxStatus.compareTo(status.status) < 0)
+                continue;
+            List<TxnId> deps = loader.depsIds(data);
+            // If we don't have any dependencies, we treat a dependency filter 
as a mismatch
+            if (testDep != ANY_DEPS && 
(!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) != 
(testDep == WITH))))
+                continue;
+            Timestamp executeAt = loader.executeAt(data);
+            initialValue = map.apply(keyOrRange, txnId, executeAt, 
initialValue);
+            if (initialValue.equals(terminalValue))
+                break;
+        }
+        return initialValue;
+    }
+
+    Stream<TxnId> between(Timestamp min, Timestamp max, Predicate<Status> 
statusPredicate)
+    {
+        return commands.subMap(min, true, max, true).values().stream()
+                       .filter(d -> 
statusPredicate.test(loader.status(d))).map(loader::txnId);
+    }
+
+    public Stream<D> all()
+    {
+        return commands.values().stream();
+    }
+
+    Update<D> beginUpdate()
+    {
+        return new Update<>(this);
+    }
+
+    public CommandLoader<D> loader()
+    {
+        return loader;
+    }
+
+    public interface CommandLoader<D>
+    {
+        D saveForCFK(Command command);
+
+        TxnId txnId(D data);
+        Timestamp executeAt(D data);
+        SaveStatus saveStatus(D data);
+        List<TxnId> depsIds(D data);
+
+        default Status status(D data)
+        {
+            return saveStatus(data).status;
+        }
+
+        default Status.Known known(D data)
+        {
+            return saveStatus(data).known;
+        }
+    }
+
+    public static class Update<D>
+    {
+        private final Seekable keyOrRange;
+        protected CommandLoader<D> loader;
+        protected NavigableMap<Timestamp, D> commands;
+
+        public Update(Seekable keyOrRange, CommandLoader<D> loader)
+        {
+            this.keyOrRange = keyOrRange;
+            this.loader = loader;
+            this.commands = new TreeMap<>();
+        }
+
+        public Update(CommandTimeseries<D> timeseries)
+        {
+            this.keyOrRange = timeseries.keyOrRange;
+            this.loader = timeseries.loader;
+            this.commands = timeseries.commands;
+        }
+
+        public Update<D> add(Timestamp timestamp, Command command)
+        {
+            commands = ensureSortedMutable(commands);
+            commands.put(timestamp, loader.saveForCFK(command));
+            return this;
+        }
+
+        public Update<D> add(Timestamp timestamp, D value)
+        {
+            commands = ensureSortedMutable(commands);
+            commands.put(timestamp, value);
+            return this;
+        }
+
+        public Update<D> remove(Timestamp timestamp)
+        {
+            commands = ensureSortedMutable(commands);
+            commands.remove(timestamp);
+            return this;
+        }
+
+        public CommandTimeseries<D> build()
+        {
+            return new CommandTimeseries<>(this);
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/impl/CommandTimeseriesHolder.java 
b/accord-core/src/main/java/accord/impl/CommandTimeseriesHolder.java
new file mode 100644
index 00000000..f0a88e75
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/CommandTimeseriesHolder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.primitives.Timestamp;
+
+public interface CommandTimeseriesHolder
+{
+    CommandTimeseries<?> byId();
+    CommandTimeseries<?> byExecuteAt();
+
+    Timestamp max();
+}
diff --git a/accord-core/src/main/java/accord/impl/CommandsForKey.java 
b/accord-core/src/main/java/accord/impl/CommandsForKey.java
index a85ac114..97142d3a 100644
--- a/accord-core/src/main/java/accord/impl/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/CommandsForKey.java
@@ -19,24 +19,19 @@
 package accord.impl;
 
 import accord.api.Key;
+import accord.impl.CommandTimeseries.CommandLoader;
 import accord.local.*;
 import accord.primitives.*;
 import com.google.common.collect.ImmutableSortedMap;
 
-import javax.annotation.Nullable;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
 
-import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
-import static accord.local.SafeCommandStore.TestDep.WITH;
 import static accord.local.Status.PreAccepted;
 import static accord.local.Status.PreCommitted;
-import static accord.utils.Utils.*;
 
-public class CommandsForKey
+public class CommandsForKey implements CommandTimeseriesHolder
 {
     public static class SerializerSupport
     {
@@ -55,179 +50,6 @@ public class CommandsForKey
         }
     }
 
-    public interface CommandLoader<D>
-    {
-        D saveForCFK(Command command);
-
-        TxnId txnId(D data);
-        Timestamp executeAt(D data);
-        SaveStatus saveStatus(D data);
-        List<TxnId> depsIds(D data);
-
-        default Status status(D data)
-        {
-            return saveStatus(data).status;
-        }
-
-        default Status.Known known(D data)
-        {
-            return saveStatus(data).known;
-        }
-    }
-
-    public static class CommandTimeseries<D>
-    {
-        public enum TestTimestamp {BEFORE, AFTER}
-
-        private final Key key;
-        protected final CommandLoader<D> loader;
-        public final ImmutableSortedMap<Timestamp, D> commands;
-
-        public CommandTimeseries(Update<D> builder)
-        {
-            this.key = builder.key;
-            this.loader = builder.loader;
-            this.commands = ensureSortedImmutable(builder.commands);
-        }
-
-        CommandTimeseries(Key key, CommandLoader<D> loader, 
ImmutableSortedMap<Timestamp, D> commands)
-        {
-            this.key = key;
-            this.loader = loader;
-            this.commands = commands;
-        }
-
-        public CommandTimeseries(Key key, CommandLoader<D> loader)
-        {
-            this(key, loader, ImmutableSortedMap.of());
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            CommandTimeseries<?> that = (CommandTimeseries<?>) o;
-            return key.equals(that.key) && loader.equals(that.loader) && 
commands.equals(that.commands);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            int hash = 1;
-            hash = 31 * hash + Objects.hashCode(key);
-            hash = 31 * hash + Objects.hashCode(loader);
-            hash = 31 * hash + Objects.hashCode(commands);
-            return hash;
-        }
-
-        public D get(Timestamp key)
-        {
-            return commands.get(key);
-        }
-
-        public boolean isEmpty()
-        {
-            return commands.isEmpty();
-        }
-
-        /**
-         * All commands before/after (exclusive of) the given timestamp
-         * <p>
-         * Note that {@code testDep} applies only to commands that know at 
least proposed deps; if specified any
-         * commands that do not know any deps will be ignored.
-         * <p>
-         * TODO (expected, efficiency): TestDep should be asynchronous; data 
should not be kept memory-resident as only used for recovery
-         */
-        public <T> T mapReduce(SafeCommandStore.TestKind testKind, 
TestTimestamp testTimestamp, Timestamp timestamp,
-                               SafeCommandStore.TestDep testDep, @Nullable 
TxnId depId,
-                               @Nullable Status minStatus, @Nullable Status 
maxStatus,
-                               SafeCommandStore.CommandFunction<T, T> map, T 
initialValue, T terminalValue)
-        {
-
-            for (D data : (testTimestamp == TestTimestamp.BEFORE ? 
commands.headMap(timestamp, false) : commands.tailMap(timestamp, 
false)).values())
-            {
-                TxnId txnId = loader.txnId(data);
-                Timestamp executeAt = loader.executeAt(data);
-                SaveStatus status = loader.saveStatus(data);
-                List<TxnId> deps = loader.depsIds(data);
-                if (!testKind.test(txnId.rw())) continue;
-                // If we don't have any dependencies, we treat a dependency 
filter as a mismatch
-                if (testDep != ANY_DEPS && 
(!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) != 
(testDep == WITH))))
-                    continue;
-                if (minStatus != null && minStatus.compareTo(status.status) > 
0)
-                    continue;
-                if (maxStatus != null && maxStatus.compareTo(status.status) < 
0)
-                    continue;
-                initialValue = map.apply(key, txnId, executeAt, initialValue);
-                if (initialValue.equals(terminalValue))
-                    break;
-            }
-            return initialValue;
-        }
-
-        Stream<TxnId> between(Timestamp min, Timestamp max, Predicate<Status> 
statusPredicate)
-        {
-            return commands.subMap(min, true, max, true).values().stream()
-                    .filter(d -> 
statusPredicate.test(loader.status(d))).map(loader::txnId);
-        }
-
-        public Stream<D> all()
-        {
-            return commands.values().stream();
-        }
-
-        Update<D> beginUpdate()
-        {
-            return new Update<>(this);
-        }
-
-        public CommandLoader<D> loader()
-        {
-            return loader;
-        }
-
-        public static class Update<D>
-        {
-            private final Key key;
-            protected CommandLoader<D> loader;
-            protected NavigableMap<Timestamp, D> commands;
-
-            public Update(Key key, CommandLoader<D> loader)
-            {
-                this.key = key;
-                this.loader = loader;
-                this.commands = new TreeMap<>();
-            }
-
-            public Update(CommandTimeseries<D> timeseries)
-            {
-                this.key = timeseries.key;
-                this.loader = timeseries.loader;
-                this.commands = timeseries.commands;
-            }
-
-            public CommandsForKey.CommandTimeseries.Update<D> add(Timestamp 
timestamp, Command command)
-            {
-                commands = ensureSortedMutable(commands);
-                commands.put(timestamp, loader.saveForCFK(command));
-                return this;
-            }
-
-            public CommandsForKey.CommandTimeseries.Update<D> remove(Timestamp 
timestamp)
-            {
-                commands = ensureSortedMutable(commands);
-                commands.remove(timestamp);
-                return this;
-            }
-
-            CommandTimeseries<D> build()
-            {
-                return new CommandTimeseries<>(this);
-            }
-        }
-    }
-
     public static class Listener implements 
Command.DurableAndIdempotentListener
     {
         protected final Key listenerKey;
@@ -363,6 +185,7 @@ public class CommandsForKey
         return key;
     }
 
+    @Override
     public Timestamp max()
     {
         return max;
@@ -383,11 +206,13 @@ public class CommandsForKey
         return lastWriteTimestamp;
     }
 
+    @Override
     public CommandTimeseries<?> byId()
     {
         return byId;
     }
 
+    @Override
     public CommandTimeseries<?> byExecuteAt()
     {
         return byExecuteAt;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index aefdf4eb..a380bdec 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -43,6 +43,7 @@ import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.ProgressLog;
+import accord.impl.CommandTimeseries.CommandLoader;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores.RangesForEpoch;
@@ -109,6 +110,16 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         return agent;
     }
 
+    TreeMap<TxnId, Ranges> historicalRangeCommands()
+    {
+        return historicalRangeCommands;
+    }
+
+    TreeMap<TxnId, RangeCommand> rangeCommands()
+    {
+        return rangeCommands;
+    }
+
     public GlobalCommand ifPresent(TxnId txnId)
     {
         return commands.get(txnId);
@@ -333,38 +344,6 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         }
     }
 
-    @Override
-    protected void registerHistoricalTransactions(Deps deps)
-    {
-        Ranges allRanges = rangesForEpochHolder.get().all();
-        deps.keyDeps.keys().forEach(allRanges, key -> {
-            SafeCommandsForKey cfk = commandsForKey(key).createSafeReference();
-            deps.keyDeps.forEach(key, txnId -> {
-                // TODO (desired, efficiency): this can be made more efficient 
by batching by epoch
-                if 
(rangesForEpochHolder.get().coordinates(txnId).contains(key))
-                    return; // already coordinates, no need to replicate
-                if 
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).contains(key))
-                    return;
-
-                cfk.registerNotWitnessed(txnId);
-            });
-
-        });
-        deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> {
-
-            if (rangeCommands.containsKey(txnId))
-                return;
-
-            Ranges ranges = deps.rangeDeps.ranges(txnId);
-            if 
(rangesForEpochHolder.get().coordinates(txnId).intersects(ranges))
-                return; // already coordinates, no need to replicate
-            if 
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).intersects(ranges))
-                return;
-
-            historicalRangeCommands.merge(txnId, ranges.slice(allRanges), 
Ranges::with);
-        });
-    }
-
     protected InMemorySafeStore createSafeStore(PreLoadContext context, 
RangesForEpoch ranges, Map<TxnId, InMemorySafeCommand> commands, 
Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys)
     {
         return new InMemorySafeStore(this, cfkLoader, ranges, context, 
commands, commandsForKeys);
@@ -482,7 +461,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         }
     }
 
-    class CFKLoader implements CommandsForKey.CommandLoader<TxnId>
+    class CFKLoader implements CommandLoader<TxnId>
     {
         private Command loadForCFK(TxnId data)
         {
@@ -702,6 +681,41 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             return timestamp;
         }
 
+        @Override
+        public void registerHistoricalTransactions(Deps deps)
+        {
+            RangesForEpochHolder rangesForEpochHolder = 
commandStore.rangesForEpochHolder();
+            Ranges allRanges = rangesForEpochHolder.get().all();
+            deps.keyDeps.keys().forEach(allRanges, key -> {
+                SafeCommandsForKey cfk = commandsForKey(key);
+                deps.keyDeps.forEach(key, txnId -> {
+                    // TODO (desired, efficiency): this can be made more 
efficient by batching by epoch
+                    if 
(rangesForEpochHolder.get().coordinates(txnId).contains(key))
+                        return; // already coordinates, no need to replicate
+                    if 
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).contains(key))
+                        return;
+
+                    cfk.registerNotWitnessed(txnId);
+                });
+
+            });
+            TreeMap<TxnId, RangeCommand> rangeCommands = 
commandStore.rangeCommands();
+            TreeMap<TxnId, Ranges> historicalRangeCommands = 
commandStore.historicalRangeCommands();
+            deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> {
+
+                if (rangeCommands.containsKey(txnId))
+                    return;
+
+                Ranges ranges = deps.rangeDeps.ranges(txnId);
+                if 
(rangesForEpochHolder.get().coordinates(txnId).intersects(ranges))
+                    return; // already coordinates, no need to replicate
+                if 
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).intersects(ranges))
+                    return;
+
+                historicalRangeCommands.merge(txnId, ranges.slice(allRanges), 
Ranges::with);
+            });
+        }
+
         public Timestamp maxApplied(Seekables<?, ?> keysOrRanges, Ranges slice)
         {
             Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
@@ -725,7 +739,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, 
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep 
testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status 
maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
         {
             accumulate = commandStore.mapReduceForKey(this, keysOrRanges, 
slice, (forKey, prev) -> {
-                CommandsForKey.CommandTimeseries<?> timeseries;
+                CommandTimeseries<?> timeseries;
                 switch (testTimestamp)
                 {
                     default: throw new AssertionError();
@@ -737,17 +751,17 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                     case MAY_EXECUTE_BEFORE:
                         timeseries = forKey.byExecuteAt();
                 }
-                CommandsForKey.CommandTimeseries.TestTimestamp 
remapTestTimestamp;
+                CommandTimeseries.TestTimestamp remapTestTimestamp;
                 switch (testTimestamp)
                 {
                     default: throw new AssertionError();
                     case STARTED_AFTER:
                     case EXECUTES_AFTER:
-                        remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.AFTER;
+                        remapTestTimestamp = 
CommandTimeseries.TestTimestamp.AFTER;
                         break;
                     case STARTED_BEFORE:
                     case MAY_EXECUTE_BEFORE:
-                        remapTestTimestamp = 
CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE;
+                        remapTestTimestamp = 
CommandTimeseries.TestTimestamp.BEFORE;
                 }
                 return timeseries.mapReduce(testKind, remapTestTimestamp, 
timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
             }, accumulate, terminalValue);
@@ -866,7 +880,8 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             return commandStore.register(this, keyOrRange, slice, command, 
attrs);
         }
 
-        public CommandsForKey.CommandLoader<?> cfkLoader()
+        @Override
+        public CommandLoader<?> cfkLoader()
         {
             return cfkLoader;
         }
diff --git a/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java 
b/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java
index 527c12d1..6ba8d900 100644
--- a/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java
@@ -20,8 +20,7 @@ package accord.impl;
 
 import accord.api.Key;
 import accord.api.VisibleForImplementation;
-import accord.impl.CommandsForKey.CommandLoader;
-import accord.impl.CommandsForKey.CommandTimeseries;
+import accord.impl.CommandTimeseries.CommandLoader;
 import accord.local.Command;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java 
b/accord-core/src/main/java/accord/local/Bootstrap.java
index 9fdd2044..b66a0d9a 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -134,6 +134,8 @@ class Bootstrap
             Ranges commitRanges = valid;
             store.markBootstrapping(safeStore0, globalSyncId, valid);
             CoordinateSyncPoint.coordinate(node, globalSyncId, commitRanges)
+               // TODO (correcness) : PreLoadContext only works with 
Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys 
AND Ranges!
+               // ATM all known implementations store ranges in-memory, but 
this will not be true soon, so this will need to be addressed
                .flatMap(syncPoint -> node.withEpoch(epoch, () -> 
store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys()), 
safeStore1 -> {
                    if (valid.isEmpty()) // we've lost ownership of the range
                        return AsyncResults.success(Ranges.EMPTY);
@@ -141,7 +143,7 @@ class Bootstrap
                    Commands.commitRecipientLocalSyncPoint(safeStore1, 
localSyncId, syncPoint, valid);
                    // TODO (now): this should use a dedicated local id, 
distinct from the one we use to coordinate globally, as this may also be 
committed and applied locally
                    // TODO (now): should we even be putting any partialDeps 
here? Doesn't seem like it, as they're handled on source nodes.
-                   
safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor);
+                   
safeStore1.registerHistoricalTransactions(syncPoint.waitFor);
                    return fetch = safeStore1.dataStore().fetch(node, 
safeStore1, valid, syncPoint, this);
                })))
                .flatMap(i -> i)
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index c9b54432..37046e42 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -115,6 +115,11 @@ public abstract class CommandStore implements AgentExecutor
         return agent;
     }
 
+    public RangesForEpochHolder rangesForEpochHolder()
+    {
+        return rangesForEpochHolder;
+    }
+
     public abstract boolean inStore();
 
     public abstract AsyncChain<Void> execute(PreLoadContext context, 
Consumer<? super SafeCommandStore> consumer);
@@ -141,8 +146,6 @@ public abstract class CommandStore implements AgentExecutor
         this.bootstrapBeganAt = newBootstrapBeganAt;
     }
 
-    protected abstract void registerHistoricalTransactions(Deps deps);
-
     /**
      * This method may be invoked on a non-CommandStore thread
      */
@@ -304,8 +307,10 @@ public abstract class CommandStore implements AgentExecutor
             }
             else
             {
-                execute(contextFor(null, deps.txnIds()), safeStore -> {
-                    registerHistoricalTransactions(deps);
+                // TODO (correcness) : PreLoadContext only works with 
Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys 
AND Ranges!
+                // ATM all known implementations store ranges in-memory, but 
this will not be true soon, so this will need to be addressed
+                execute(contextFor(null, deps.txnIds(), deps.keyDeps.keys()), 
safeStore -> {
+                    safeStore.registerHistoricalTransactions(deps);
                 }).begin((success, fail2) -> {
                     if (fail2 != null) fetchMajorityDeps(coordination, node, 
epoch, ranges);
                     else coordination.setSuccess(null);
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index b8203a1f..c5dca548 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -37,6 +37,7 @@ import accord.utils.async.AsyncChains;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -325,6 +326,11 @@ public abstract class CommandStores
         }
     }
 
+    protected boolean shouldBootstrap(Node node, Topology local, Topology 
newLocalTopology, Range add)
+    {
+        return newLocalTopology.epoch() != 1;
+    }
+
     private synchronized TopologyUpdate updateTopology(Node node, Snapshot 
prev, Topology newTopology)
     {
         checkArgument(!newTopology.isSubset(), "Use full topology for 
CommandStores.updateTopology");
@@ -371,9 +377,12 @@ public abstract class CommandStores
                 RangesForEpochHolder rangesHolder = new RangesForEpochHolder();
                 ShardHolder shardHolder = new 
ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder);
                 rangesHolder.current = new RangesForEpoch(epoch, add, 
shardHolder.store);
-                // the first epoch we assume is either empty, or correctly 
initialised by whatever system is migrating
-                if (epoch == 1) bootstrapUpdates.add(() -> 
shardHolder.store.initialise(epoch, add));
-                else bootstrapUpdates.add(shardHolder.store.bootstrapper(node, 
add, newLocalTopology.epoch()));
+
+                Map<Boolean, Ranges> partitioned = add.partitioningBy(range -> 
shouldBootstrap(node, prev.local, newLocalTopology, range));
+                if (partitioned.containsKey(true))
+                    bootstrapUpdates.add(shardHolder.store.bootstrapper(node, 
partitioned.get(true), newLocalTopology.epoch()));
+                if (partitioned.containsKey(false))
+                    bootstrapUpdates.add(() -> 
shardHolder.store.initialise(epoch, partitioned.get(false)));
                 result.add(shardHolder);
             }
         }
@@ -554,6 +563,18 @@ public abstract class CommandStores
         return snapshot.byId.get(id);
     }
 
+    public int[] ids()
+    {
+        Snapshot snapshot = current;
+        Int2ObjectHashMap<CommandStore>.KeySet set = snapshot.byId.keySet();
+        int[] ids = new int[set.size()];
+        int idx = 0;
+        for (int a : set)
+            ids[idx++] = a;
+        Arrays.sort(ids);
+        return ids;
+    }
+
     public int count()
     {
         return current.shards.length;
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index dd2f7995..93532e85 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -23,6 +23,7 @@ import java.util.function.Predicate;
 import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.ProgressLog;
+import accord.primitives.Deps;
 import accord.primitives.Keys;
 import accord.primitives.Ranges;
 import accord.primitives.Seekable;
@@ -155,6 +156,7 @@ public interface SafeCommandStore
     NodeTimeService time();
     CommandStores.RangesForEpoch ranges();
     Timestamp maxConflict(Seekables<?, ?> keys, Ranges slice);
+    void registerHistoricalTransactions(Deps deps);
 
     default long latestEpoch()
     {
diff --git a/accord-core/src/main/java/accord/local/SaveStatus.java 
b/accord-core/src/main/java/accord/local/SaveStatus.java
index 1b2f5a31..ec0b4eb9 100644
--- a/accord-core/src/main/java/accord/local/SaveStatus.java
+++ b/accord-core/src/main/java/accord/local/SaveStatus.java
@@ -74,6 +74,18 @@ public enum SaveStatus
         return this.status.compareTo(status) >= 0;
     }
 
+    public boolean isComplete()
+    {
+        switch (this)
+        {
+            case Applied:
+            case Invalidated:
+                return true;
+            default:
+                return false;
+        }
+    }
+
     // TODO (expected, testing): exhaustive testing, particularly around 
PreCommitted
     public static SaveStatus get(Status status, Known known)
     {
diff --git a/accord-core/src/main/java/accord/primitives/Ranges.java 
b/accord-core/src/main/java/accord/primitives/Ranges.java
index 9a560311..4badc911 100644
--- a/accord-core/src/main/java/accord/primitives/Ranges.java
+++ b/accord-core/src/main/java/accord/primitives/Ranges.java
@@ -22,8 +22,15 @@ import accord.api.RoutingKey;
 import accord.utils.ArrayBuffers.ObjectBuffers;
 
 import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableMap;
+
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
 import static accord.primitives.Routables.Slice.Overlapping;
 import static accord.utils.ArrayBuffers.cachedRanges;
@@ -227,4 +234,17 @@ public class Ranges extends AbstractRanges<Ranges> 
implements Iterable<Range>, S
         return construct(cachedRanges.completeAndDiscard(result, count));
     }
 
+    public Map<Boolean, Ranges> partitioningBy(Predicate<? super Range> test)
+    {
+        if (isEmpty())
+            return Collections.emptyMap();
+        List<Range> trues = new ArrayList<>();
+        List<Range> falses = new ArrayList<>();
+        for (Range range : this)
+            (test.test(range) ? trues : falses).add(range);
+        if (trues.isEmpty()) return ImmutableMap.of(Boolean.FALSE, this);
+        if (falses.isEmpty()) return ImmutableMap.of(Boolean.TRUE, this);
+        return ImmutableMap.of(Boolean.TRUE, 
Ranges.ofSortedAndDeoverlapped(trues.toArray(new Range[0])),
+                               Boolean.FALSE, 
Ranges.ofSortedAndDeoverlapped(falses.toArray(new Range[0])));
+    }
 }
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java 
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 13bc2a55..1a5a44e4 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -345,4 +345,14 @@ public class Timestamp implements Comparable<Timestamp>
     {
         return "[" + epoch() + ',' + hlc() + ',' + flags() + ',' + node + ']';
     }
+
+    public static Timestamp fromString(String string)
+    {
+        String[] split = string.replaceFirst("\\[", "").replaceFirst("\\]", 
"").split(",");
+        assert split.length == 4;
+        return Timestamp.fromValues(Long.parseLong(split[0]),
+                                    Long.parseLong(split[1]),
+                                    Integer.parseInt(split[2]),
+                                    new Id(Integer.parseInt(split[3])));
+    }
 }
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java 
b/accord-core/src/main/java/accord/topology/Topologies.java
index fef92829..5832cce0 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -51,6 +51,11 @@ public interface Topologies extends TopologySorter
 
     int size();
 
+    default boolean isEmpty()
+    {
+        return size() == 0;
+    }
+
     int totalShards();
 
     boolean contains(Id to);
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index f0c5e79b..728553c1 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -193,24 +193,24 @@ public class Topology
         return Arrays.binarySearch(supersetIndexes, i);
     }
 
-    public Topology forSelection(Unseekables<?, ?> select)
+    public Topology forSelection(Unseekables<?, ?> select, OnUnknown onUnknown)
     {
-        return forSelection(select, (ignore, index) -> true, null);
+        return forSelection(select, onUnknown, (ignore, index) -> true, null);
     }
 
-    public <P1> Topology forSelection(Unseekables<?, ?> select, 
IndexedPredicate<P1> predicate, P1 param)
+    public <P1> Topology forSelection(Unseekables<?, ?> select, OnUnknown 
onUnknown, IndexedPredicate<P1> predicate, P1 param)
     {
-        return forSubset(subsetFor(select, predicate, param));
+        return forSubset(subsetFor(select, predicate, param, onUnknown));
     }
 
-    public Topology forSelection(Unseekables<?, ?> select, Collection<Id> 
nodes)
+    public Topology forSelection(Unseekables<?, ?> select, OnUnknown 
onUnknown, Collection<Id> nodes)
     {
-        return forSelection(select, nodes, (ignore, index) -> true, null);
+        return forSelection(select, onUnknown, nodes, (ignore, index) -> true, 
null);
     }
 
-    public <P1> Topology forSelection(Unseekables<?, ?> select, Collection<Id> 
nodes, IndexedPredicate<P1> predicate, P1 param)
+    public <P1> Topology forSelection(Unseekables<?, ?> select, OnUnknown 
onUnknown, Collection<Id> nodes, IndexedPredicate<P1> predicate, P1 param)
     {
-        return forSubset(subsetFor(select, predicate, param), nodes);
+        return forSubset(subsetFor(select, predicate, param, onUnknown), 
nodes);
     }
 
     private Topology forSubset(int[] newSubset)
@@ -236,7 +236,9 @@ public class Topology
         return new Topology(epoch, shards, ranges, nodeLookup, rangeSubset, 
newSubset);
     }
 
-    private <P1> int[] subsetFor(Unseekables<?, ?> select, 
IndexedPredicate<P1> predicate, P1 param)
+    public enum OnUnknown { REJECT, IGNORE }
+
+    private <P1> int[] subsetFor(Unseekables<?, ?> select, 
IndexedPredicate<P1> predicate, P1 param, OnUnknown onUnknown)
     {
         int count = 0;
         IntBuffers cachedInts = ArrayBuffers.cachedInts();
@@ -258,16 +260,35 @@ public class Topology
                     if (abi < 0)
                     {
                         if (ailim < as.size())
-                            throw new IllegalArgumentException("Range not 
found for " + as.get(ailim));
+                        {
+                            switch (onUnknown)
+                            {
+                                case REJECT: throw new 
IllegalArgumentException("Range not found for " + as.get(ailim));
+                                case IGNORE:
+                                    break;
+                                default:
+                                    throw new 
IllegalArgumentException("Unknown option: " + onUnknown);
+                            }
+                        }
                         break;
                     }
 
                     ai = (int)abi;
+                    boolean skip = false;
                     if (ailim < ai)
-                        throw new IllegalArgumentException("Range not found 
for " + as.get(ailim));
+                    {
+                        switch (onUnknown)
+                        {
+                            default:
+                                throw new IllegalArgumentException("Unknown 
option: " + onUnknown);
+                            case REJECT: throw new 
IllegalArgumentException("Range not found for " + as.get(ailim));
+                            case IGNORE:
+                                skip = true;
+                        }
+                    }
 
                     bi = (int)(abi >>> 32);
-                    if (predicate.test(param, bi))
+                    if (!skip && predicate.test(param, bi))
                     {
                         if (count == newSubset.length)
                             newSubset = cachedInts.resize(newSubset, count, 
count * 2);
@@ -305,14 +326,14 @@ public class Topology
         return cachedInts.completeAndDiscard(newSubset, count);
     }
 
-    public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, 
Consumer<Id> nodes)
+    public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, 
OnUnknown onUnknown, Consumer<Id> nodes)
     {
-        visitNodeForKeysOnceOrMore(select, (i1, i2) -> true, null, nodes);
+        visitNodeForKeysOnceOrMore(select, onUnknown, (i1, i2) -> true, null, 
nodes);
     }
 
-    public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, 
IndexedPredicate<P1> predicate, P1 param, Consumer<Id> nodes)
+    public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select, 
OnUnknown onUnknown, IndexedPredicate<P1> predicate, P1 param, Consumer<Id> 
nodes)
     {
-        for (int shardIndex : subsetFor(select, predicate, param))
+        for (int shardIndex : subsetFor(select, predicate, param, onUnknown))
         {
             Shard shard = shards[shardIndex];
             for (Id id : shard.nodes)
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 0fcdc8fd..aea8c535 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -230,16 +230,20 @@ public class TopologyManager
     {
         Epochs current = epochs;
 
-        checkArgument(topology.epoch == current.nextEpoch());
+        checkArgument(topology.epoch == current.nextEpoch(), "Expected 
topology update %d to be %d", topology.epoch, current.nextEpoch());
         EpochState[] nextEpochs = new EpochState[current.epochs.length + 1];
         List<Set<Id>> pendingSync = new 
ArrayList<>(current.pendingSyncComplete);
         Set<Id> alreadySyncd = Collections.emptySet();
         if (!pendingSync.isEmpty())
         {
-            EpochState currentEpoch = current.epochs[0];
-            if (current.epochs[0].syncComplete())
-                currentEpoch.markPrevSynced();
-            alreadySyncd = pendingSync.remove(0);
+            // if empty, then notified about an epoch from a peer before first 
epoch seen
+            if (current.epochs.length != 0)
+            {
+                EpochState currentEpoch = current.epochs[0];
+                if (currentEpoch.syncComplete())
+                    currentEpoch.markPrevSynced();
+                alreadySyncd = pendingSync.remove(0);
+            }
         }
         System.arraycopy(current.epochs, 0, nextEpochs, 1, 
current.epochs.length);
 
@@ -308,15 +312,15 @@ public class TopologyManager
 
     public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, long 
minEpoch, long maxEpoch)
     {
-        Invariants.checkArgument(minEpoch <= maxEpoch);
+        Invariants.checkArgument(minEpoch <= maxEpoch, "min epoch %d > max 
%d", minEpoch, maxEpoch);
         Epochs snapshot = epochs;
 
         if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch;
-        else Invariants.checkState(snapshot.currentEpoch >= maxEpoch);
+        else Invariants.checkState(snapshot.currentEpoch >= maxEpoch, "current 
epoch %d < max %d", snapshot.currentEpoch, maxEpoch);
 
         EpochState maxEpochState = nonNull(snapshot.get(maxEpoch));
         if (minEpoch == maxEpoch && maxEpochState.syncCompleteFor(select))
-            return new Single(sorter, 
maxEpochState.global.forSelection(select));
+            return new Single(sorter, 
maxEpochState.global.forSelection(select, Topology.OnUnknown.REJECT));
 
         int start = (int)(snapshot.currentEpoch - maxEpoch);
         int limit = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, 
snapshot.epochs.length));
@@ -334,20 +338,22 @@ public class TopologyManager
         {
             EpochState epochState = snapshot.epochs[i];
             if (epochState.epoch() < minEpoch)
-                epochState.global.visitNodeForKeysOnceOrMore(select, 
EpochState::shardIsUnsynced, epochState, nodes::add);
+                epochState.global.visitNodeForKeysOnceOrMore(select, 
Topology.OnUnknown.IGNORE, EpochState::shardIsUnsynced, epochState, nodes::add);
             else
-                epochState.global.visitNodeForKeysOnceOrMore(select, 
nodes::add);
+                epochState.global.visitNodeForKeysOnceOrMore(select, 
Topology.OnUnknown.IGNORE, nodes::add);
         }
+        Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that 
contained %s", select);
 
         Topologies.Multi topologies = new Topologies.Multi(sorter, count);
         for (int i = start; i < limit ; ++i)
         {
             EpochState epochState = snapshot.epochs[i];
             if (epochState.epoch() < minEpoch)
-                topologies.add(epochState.global.forSelection(select, nodes, 
EpochState::shardIsUnsynced, epochState));
+                topologies.add(epochState.global.forSelection(select, 
Topology.OnUnknown.IGNORE, nodes, EpochState::shardIsUnsynced, epochState));
             else
-                topologies.add(epochState.global.forSelection(select, nodes, 
(ignore, idx) -> true, null));
+                topologies.add(epochState.global.forSelection(select, 
Topology.OnUnknown.IGNORE, nodes, (ignore, idx) -> true, null));
         }
+        Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch 
that contained %s", select);
 
         return topologies;
     }
@@ -357,16 +363,18 @@ public class TopologyManager
         Epochs snapshot = epochs;
 
         if (minEpoch == maxEpoch)
-            return new Single(sorter, 
snapshot.get(minEpoch).global.forSelection(keys));
+            return new Single(sorter, 
snapshot.get(minEpoch).global.forSelection(keys, Topology.OnUnknown.REJECT));
 
         Set<Id> nodes = new LinkedHashSet<>();
         int count = (int)(1 + maxEpoch - minEpoch);
         for (int i = count - 1 ; i >= 0 ; --i)
-            snapshot.get(minEpoch + 
i).global().visitNodeForKeysOnceOrMore(keys, nodes::add);
+            snapshot.get(minEpoch + 
i).global().visitNodeForKeysOnceOrMore(keys, Topology.OnUnknown.IGNORE, 
nodes::add);
+        Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that 
contained %s", keys);
 
         Topologies.Multi topologies = new Topologies.Multi(sorter, count);
         for (int i = count - 1 ; i >= 0 ; --i)
-            topologies.add(snapshot.get(minEpoch + 
i).global.forSelection(keys, nodes));
+            topologies.add(snapshot.get(minEpoch + 
i).global.forSelection(keys, Topology.OnUnknown.IGNORE, nodes));
+        Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch 
that contained %s", keys);
 
         return topologies;
     }
@@ -374,7 +382,7 @@ public class TopologyManager
     public Topologies forEpoch(Unseekables<?, ?> select, long epoch)
     {
         EpochState state = epochs.get(epoch);
-        return new Single(sorter, state.global.forSelection(select));
+        return new Single(sorter, state.global.forSelection(select, 
Topology.OnUnknown.REJECT));
     }
 
     public Shard forEpochIfKnown(RoutingKey key, long epoch)
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java 
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index 809ca1c7..4c3e7eb4 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -462,7 +462,7 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
 
     public static <V> AsyncChain<V> reduce(List<? extends AsyncChain<? extends 
V>> chains, BiFunction<V, V, V> reducer)
     {
-        Invariants.checkArgument(!chains.isEmpty());
+        Invariants.checkArgument(!chains.isEmpty(), "List of chains is empty");
         if (chains.size() == 1)
             return (AsyncChain<V>) chains.get(0);
         if (Reduce.canAppendTo(chains.get(0), reducer))
diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java 
b/accord-core/src/main/java/accord/utils/async/Observable.java
new file mode 100644
index 00000000..c87f12e7
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/async/Observable.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+/**
+ * Stream like interface that is "pushed" results (to the {@link 
#onNext(Object)} method).  This interface is similar,
+ * yet different from {@link AsyncChain} as that type works with a single 
element, whereas this type works with 0-n.
+ */
+public interface Observable<T>
+{
+    void onNext(T value) throws Exception;
+
+    void onError(Throwable t);
+
+    void onCompleted();
+
+    default <R> Observable<R> map(Function<? super R, ? extends T> mapper)
+    {
+        Observable<T> self = this;
+        // since this project still targets jdk8, can't create private 
classes, so to avoid adding types to the public api,
+        // use ananomus classes.
+        return new Observable<R>()
+        {
+            @Override
+            public void onNext(R value) throws Exception
+            {
+                self.onNext(mapper.apply(value));
+            }
+
+            @Override
+            public void onError(Throwable t)
+            {
+                self.onError(t);
+            }
+
+            @Override
+            public void onCompleted()
+            {
+                self.onCompleted();
+            }
+        };
+    }
+
+    static <T> Observable<T> distinct(Observable<T> callback)
+    {
+        return new Observable<T>()
+        {
+            Set<T> keys = new HashSet<>();
+
+            @Override
+            public void onNext(T value) throws Exception
+            {
+                if (keys.add(value))
+                    callback.onNext(value);
+            }
+
+            @Override
+            public void onError(Throwable t)
+            {
+                keys.clear();
+                keys = null;
+                callback.onError(t);
+            }
+
+            @Override
+            public void onCompleted()
+            {
+                keys.clear();
+                keys = null;
+                callback.onCompleted();
+            }
+        };
+    }
+
+    static <T> Observable<T> forCallback(BiConsumer<? super List<T>, 
Throwable> callback)
+    {
+        return forCallback(callback, Collectors.toList());
+    }
+
+    static <T, Result, Accumulator> Observable<T> forCallback(BiConsumer<? 
super Result, Throwable> callback,
+                                                              Collector<? 
super T, Accumulator, Result> collector)
+    {
+        return new Observable<T>()
+        {
+            Accumulator values = collector.supplier().get();
+
+            @Override
+            public void onNext(T value)
+            {
+                collector.accumulator().accept(values, value);
+            }
+
+            @Override
+            public void onError(Throwable t)
+            {
+                values = null;
+                callback.accept(null, t);
+            }
+
+            @Override
+            public void onCompleted()
+            {
+                Result result = collector.finisher().apply(this.values);
+                this.values = null;
+                callback.accept(result, null);
+            }
+        };
+    }
+
+    static <A> AsyncChain<List<A>> asChain(Consumer<Observable<A>> work)
+    {
+        return asChain(work, Function.identity(), Collectors.toList());
+    }
+
+    static <A, Result, Accumulator> AsyncChain<Result> 
asChain(Consumer<Observable<A>> work,
+                                                               Collector<? 
super A, Accumulator, Result> collector)
+    {
+        return asChain(work, Function.identity(), collector);
+    }
+
+    static <A, B, Result, Accumulator> AsyncChain<Result> 
asChain(Consumer<Observable<A>> work,
+                                                                  Function<? 
super A, ? extends B> mapper,
+                                                                  Collector<? 
super B, Accumulator, Result> collector)
+    {
+        return new AsyncChains.Head<Result>()
+        {
+            @Override
+            protected void start(BiConsumer<? super Result, Throwable> 
callback)
+            {
+                work.accept(forCallback(callback, collector).map(mapper));
+            }
+        };
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/mock/EpochSync.java 
b/accord-core/src/test/java/accord/impl/mock/EpochSync.java
index 422c400c..0b4e1a50 100644
--- a/accord-core/src/test/java/accord/impl/mock/EpochSync.java
+++ b/accord-core/src/test/java/accord/impl/mock/EpochSync.java
@@ -102,7 +102,7 @@ public class EpochSync implements Runnable
 
         public CommandSync(Node node, Route<?> route, SyncCommitted message, 
Topology topology)
         {
-            this.tracker = new QuorumTracker(new 
Single(node.topology().sorter(), topology.forSelection(route)));
+            this.tracker = new QuorumTracker(new 
Single(node.topology().sorter(), topology.forSelection(route, 
Topology.OnUnknown.REJECT)));
             node.send(tracker.nodes(), message, this);
         }
 
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java 
b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 3bb0e488..b3a6994e 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -20,8 +20,8 @@ package accord.messages;
 
 import accord.api.RoutingKey;
 import accord.impl.*;
-import accord.impl.CommandsForKey.CommandLoader;
-import accord.impl.CommandsForKey.CommandTimeseries;
+import accord.impl.CommandTimeseries.CommandLoader;
+import accord.impl.CommandTimeseries;
 import accord.impl.IntKey.Raw;
 import accord.impl.mock.*;
 import accord.local.Node;
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java 
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index 3ab88af0..ec85d157 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -162,12 +162,12 @@ public class TopologyManagerTest
         Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
 
         RoutingKeys keys = keys(150).toUnseekables();
-        Assertions.assertEquals(topologies(topology2.forSelection(keys), 
topology1.forSelection(keys)),
+        Assertions.assertEquals(topologies(topology2.forSelection(keys, 
Topology.OnUnknown.REJECT), topology1.forSelection(keys, 
Topology.OnUnknown.REJECT)),
                                 service.withUnsyncedEpochs(keys, 2, 2));
 
         service.onEpochSyncComplete(id(2), 2);
         service.onEpochSyncComplete(id(3), 2);
-        Assertions.assertEquals(topologies(topology2.forSelection(keys)),
+        Assertions.assertEquals(topologies(topology2.forSelection(keys, 
Topology.OnUnknown.REJECT)),
                                 service.withUnsyncedEpochs(keys, 2, 2));
     }
 
diff --git a/accord-core/src/test/java/accord/topology/TopologyTest.java 
b/accord-core/src/test/java/accord/topology/TopologyTest.java
index 6631c15b..24b663e1 100644
--- a/accord-core/src/test/java/accord/topology/TopologyTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyTest.java
@@ -44,7 +44,7 @@ public class TopologyTest
         Assertions.assertTrue(shard.range.contains(expectedKey));
         Assertions.assertEquals(expectedRange, shard.range);
 
-        Topology subTopology = 
topology.forSelection(Keys.of(expectedKey).toUnseekables());
+        Topology subTopology = 
topology.forSelection(Keys.of(expectedKey).toUnseekables(), 
Topology.OnUnknown.REJECT);
         shard = Iterables.getOnlyElement(subTopology.shards());
         Assertions.assertTrue(shard.range.contains(expectedKey));
         Assertions.assertEquals(expectedRange, shard.range);
diff --git a/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java 
b/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java
index 70b120b7..ab0053d8 100644
--- a/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java
+++ b/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java
@@ -185,7 +185,14 @@ public class Runner
         {
             logger.info("Seed {}; rerun with -D{}={}", seed, key, seed);
             RandomSource randomSource = new DefaultRandom(seed);
-            Runner.run(nodeCount, new StandardQueue.Factory(randomSource), 
randomSource::fork, factory, commands);
+            try
+            {
+                Runner.run(nodeCount, new StandardQueue.Factory(randomSource), 
randomSource::fork, factory, commands);
+            }
+            catch (Throwable t)
+            {
+                throw new AssertionError("Failure for seed " + seed, t);
+            }
         }
     }
 }
diff --git 
a/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java 
b/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java
index e7772f75..a65fd234 100644
--- a/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java
+++ b/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 
 import static accord.maelstrom.Runner.test;
 
+// TODO (correctness) : if you run the tests with the same seed, you get 
different outcomes... this makes it hard to rerun a failure found from CI
 public class SimpleRandomTest
 {
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to