This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 03f937175dbcf04243bb0ac48b64746c1a07bc9c
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Tue Jun 20 16:04:33 2023 -0700

    Accord/C*/TCM bootstrap integration
    
    Patch by Blake Eggleston; Reviewed by David Capwell for CASSANDRA-18444
---
 .../main/java/accord/api/ConfigurationService.java |  13 +-
 .../java/accord/coordinate/FetchCoordinator.java   |   1 +
 .../accord/impl/AbstractConfigurationService.java  | 173 +++++++++----
 .../java/accord/impl/AbstractFetchCoordinator.java | 281 +++++++++++++++++++++
 .../java/accord/impl/InMemoryCommandStore.java     |  15 --
 .../src/main/java/accord/local/CommandStore.java   |  32 ++-
 .../src/main/java/accord/local/CommandStores.java  |  14 +-
 accord-core/src/main/java/accord/local/Node.java   |  13 +-
 .../src/main/java/accord/messages/MessageType.java |   4 +-
 .../main/java/accord/messages/WaitAndReadData.java |  12 +-
 .../src/main/java/accord/topology/Topology.java    |   8 +
 .../main/java/accord/topology/TopologyManager.java |  29 ++-
 .../java/accord/utils/ReducingIntervalMap.java     |  27 +-
 .../main/java/accord/utils/ReducingRangeMap.java   |   8 +
 .../main/java/accord/utils/async/AsyncChains.java  |  12 +
 .../accord/api/TestableConfigurationService.java   |   3 +-
 .../accord/burn/BurnTestConfigurationService.java  |  10 +-
 .../src/test/java/accord/burn/TopologyUpdates.java |   2 +-
 .../java/accord/coordinate/TopologyChangeTest.java |   3 +-
 .../impl/AbstractConfigurationServiceTest.java     |  10 +-
 .../accord/impl/list/ListFetchCoordinator.java     |  69 +++++
 .../src/test/java/accord/impl/list/ListStore.java  | 245 +-----------------
 .../test/java/accord/impl/mock/MockCluster.java    |   2 +-
 .../accord/impl/mock/MockConfigurationService.java |  19 +-
 .../java/accord/local/ImmutableCommandTest.java    |   2 +-
 .../java/accord/topology/TopologyManagerTest.java  |  32 ++-
 26 files changed, 665 insertions(+), 374 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java 
b/accord-core/src/main/java/accord/api/ConfigurationService.java
index f7ecfece..177094be 100644
--- a/accord-core/src/main/java/accord/api/ConfigurationService.java
+++ b/accord-core/src/main/java/accord/api/ConfigurationService.java
@@ -18,6 +18,8 @@
 
 package accord.api;
 
+import javax.annotation.Nullable;
+
 import accord.local.Node;
 import accord.topology.Topology;
 import accord.utils.async.AsyncResult;
@@ -50,7 +52,7 @@ import accord.utils.async.AsyncResults;
  *      nodes that this node has synced data for the previous epoch.
  *
  *  - ConfigurationService will notify the node when other nodes complete 
syncing an epoch by calling
- *      {@link 
accord.api.ConfigurationService.Listener#onEpochSyncComplete(accord.local.Node.Id,
 long)}
+ *      {@link 
accord.api.ConfigurationService.Listener#onRemoteSyncComplete(accord.local.Node.Id,
 long)}
  *
  */
 public interface ConfigurationService
@@ -111,13 +113,13 @@ public interface ConfigurationService
          *
          * TODO (required): document what this Future represents, or maybe 
refactor it away - only used for testing
          */
-        AsyncResult<Void> onTopologyUpdate(Topology topology);
+        AsyncResult<Void> onTopologyUpdate(Topology topology, boolean 
startSync);
 
         /**
          * Called when accord data associated with a superseded epoch has been 
sync'd from previous replicas.
          * This should be invoked on each replica once EpochReady.coordination 
has returned on a replica.
          */
-        void onEpochSyncComplete(Node.Id node, long epoch);
+        void onRemoteSyncComplete(Node.Id node, long epoch);
 
         /**
          * Called when the configuration service is meant to truncate it's 
topology data up to (but not including)
@@ -138,7 +140,10 @@ public interface ConfigurationService
         return currentTopology().epoch();
     }
 
-    Topology getTopologyForEpoch(long epoch);
+    /**
+     * Returns the topology for the given epoch if it's available, null 
otherwise
+     */
+    @Nullable Topology getTopologyForEpoch(long epoch);
 
     /**
      * Method for reporting epochs the configuration service may not be aware 
of. To be notified when the new epoch
diff --git a/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java 
b/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java
index ab903bc8..60d0bba1 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java
@@ -218,6 +218,7 @@ public abstract class FetchCoordinator
     // it must only ensure needed.isEmpty() (if possible)
     protected Ranges trySendMore(List<State> states, Ranges needed)
     {
+        // TODO (soon, required) : need to correctly handle the cluster having 
fewer nodes than replication factor
         for (State state : states)
         {
             Ranges contact = state.uncontacted.slice(needed, Minimal);
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 6d2aaf39..fc77187a 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -30,28 +30,32 @@ import accord.api.ConfigurationService;
 import accord.local.Node;
 import accord.topology.Topology;
 import accord.utils.Invariants;
+import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 
-public abstract class AbstractConfigurationService implements 
ConfigurationService
+public abstract class AbstractConfigurationService<EpochState extends 
AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends 
AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractConfigurationService.class);
 
-    protected final Node.Id node;
+    protected final Node.Id localId;
 
-    protected final EpochHistory epochs = new EpochHistory();
+    protected final EpochHistory epochs = createEpochHistory();
 
     protected final List<Listener> listeners = new ArrayList<>();
 
-    static class EpochState
+    public abstract static class AbstractEpochState
     {
-        private final long epoch;
-        private final AsyncResult.Settable<Topology> received = 
AsyncResults.settable();
-        private final AsyncResult.Settable<Void> acknowledged = 
AsyncResults.settable();
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = 
AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = 
AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
 
-        private Topology topology = null;
+        protected Topology topology = null;
 
-        public EpochState(long epoch)
+        public AbstractEpochState(long epoch)
         {
             this.epoch = epoch;
         }
@@ -68,21 +72,26 @@ public abstract class AbstractConfigurationService 
implements ConfigurationServi
         }
     }
 
+    /**
+     * Access needs to be synchronized by the parent ConfigurationService class
+     */
     @VisibleForTesting
-    protected static class EpochHistory
+    public abstract static class AbstractEpochHistory<EpochState extends 
AbstractEpochState>
     {
         // TODO (low priority): move pendingEpochs / FetchTopology into here?
         private List<EpochState> epochs = new ArrayList<>();
 
         protected long lastReceived = 0;
-        private long lastAcknowledged = 0;
+        protected long lastAcknowledged = 0;
 
-        long minEpoch()
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
         {
             return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
         }
 
-        long maxEpoch()
+        public long maxEpoch()
         {
             int size = epochs.size();
             return size == 0 ? 0L : epochs.get(size - 1).epoch;
@@ -102,10 +111,10 @@ public abstract class AbstractConfigurationService 
implements ConfigurationServi
 
         EpochState getOrCreate(long epoch)
         {
-            Invariants.checkArgument(epoch > 0);
+            Invariants.checkArgument(epoch > 0, "Epoch must be positive but 
given %d", epoch);
             if (epochs.isEmpty())
             {
-                EpochState state = new EpochState(epoch);
+                EpochState state = createEpochState(epoch);
                 epochs.add(state);
                 return state;
             }
@@ -116,34 +125,31 @@ public abstract class AbstractConfigurationService 
implements ConfigurationServi
                 int prepend = Ints.checkedCast(minEpoch - epoch);
                 List<EpochState> next = new ArrayList<>(epochs.size() + 
prepend);
                 for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
-                    next.add(new EpochState(addEpoch));
+                    next.add(createEpochState(addEpoch));
                 next.addAll(epochs);
                 epochs = next;
                 minEpoch = minEpoch();
-                Invariants.checkState(minEpoch == epoch);
+                Invariants.checkState(minEpoch == epoch, "Epoch %d != %d", 
epoch, minEpoch);
             }
             long maxEpoch = maxEpoch();
             int idx = Ints.checkedCast(epoch - minEpoch);
 
             // add any missing epochs
             for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
-                epochs.add(new EpochState(addEpoch));
+                epochs.add(createEpochState(addEpoch));
 
             return epochs.get(idx);
         }
 
-        public EpochHistory receive(Topology topology)
+        public void receive(Topology topology)
         {
             long epoch = topology.epoch();
-            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || 
lastReceived == 0);
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || 
lastReceived == 0,
+                                  "Epoch %d != %d + 1", epoch, lastReceived);
             lastReceived = epoch;
             EpochState state = getOrCreate(epoch);
-            if (state != null)
-            {
-                state.topology = topology;
-                state.received.setSuccess(topology);
-            }
-            return this;
+            state.topology = topology;
+            state.received.setSuccess(topology);
         }
 
         AsyncResult<Topology> receiveFuture(long epoch)
@@ -156,12 +162,16 @@ public abstract class AbstractConfigurationService 
implements ConfigurationServi
             return getOrCreate(epoch).topology;
         }
 
-        public EpochHistory acknowledge(long epoch)
+        public void acknowledge(EpochReady ready)
         {
-            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 
|| lastAcknowledged == 0);
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 
|| lastAcknowledged == 0,
+                                  "Epoch %d != %d + 1", epoch, 
lastAcknowledged);
             lastAcknowledged = epoch;
-            getOrCreate(epoch).acknowledged.setSuccess(null);
-            return this;
+            EpochState state = getOrCreate(epoch);
+            Invariants.checkState(state.reads == null, "Reads result was 
already set for epoch", epoch);
+            state.reads = ready.reads;
+            state.acknowledged.setSuccess(null);
         }
 
         AsyncResult<Void> acknowledgeFuture(long epoch)
@@ -171,19 +181,26 @@ public abstract class AbstractConfigurationService 
implements ConfigurationServi
 
         void truncateUntil(long epoch)
         {
-            Invariants.checkArgument(epoch <= maxEpoch());
+            Invariants.checkArgument(epoch <= maxEpoch(), "epoch %d > %d", 
epoch, maxEpoch());
             long minEpoch = minEpoch();
             int toTrim = Ints.checkedCast(epoch - minEpoch);
-            if (toTrim <=0)
+            if (toTrim <= 0)
                 return;
 
             epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
         }
     }
 
-    public AbstractConfigurationService(Node.Id node)
+    public AbstractConfigurationService(Node.Id localId)
     {
-        this.node = node;
+        this.localId = localId;
+    }
+
+    protected abstract EpochHistory createEpochHistory();
+
+    protected synchronized EpochState getOrCreateEpochState(long epoch)
+    {
+        return epochs.getOrCreate(epoch);
     }
 
     @Override
@@ -215,54 +232,68 @@ public abstract class AbstractConfigurationService 
implements ConfigurationServi
         fetchTopologyInternal(epoch);
     }
 
-    protected abstract void epochSyncComplete(Topology topology );
+    protected abstract void localSyncComplete(Topology topology);
 
     @Override
-    public synchronized void acknowledgeEpoch(EpochReady ready)
+    public void acknowledgeEpoch(EpochReady ready)
     {
-        ready.metadata.addCallback(() -> epochs.acknowledge(ready.epoch));
-        ready.coordination.addCallback(() ->  
epochSyncComplete(epochs.getOrCreate(ready.epoch).topology));
+        ready.metadata.addCallback(() -> {
+            synchronized (AbstractConfigurationService.this)
+            {
+                epochs.acknowledge(ready);
+            }
+        });
+        ready.coordination.addCallback(() ->  {
+            synchronized (AbstractConfigurationService.this)
+            {
+                localSyncComplete(epochs.getOrCreate(ready.epoch).topology);
+            }
+        });
     }
 
     protected void topologyUpdatePreListenerNotify(Topology topology) {}
     protected void topologyUpdatePostListenerNotify(Topology topology) {}
 
-    public synchronized AsyncResult<Void> reportTopology(Topology topology)
+    public synchronized void reportTopology(Topology topology, boolean 
startSync)
     {
         long lastReceived = epochs.lastReceived;
         if (topology.epoch() <= lastReceived)
-            return AsyncResults.success(null);
+            return;
 
         if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
         {
             fetchTopologyForEpoch(lastReceived + 1);
-            epochs.receiveFuture(lastReceived + 1).addCallback(() -> 
reportTopology(topology));
-            return AsyncResults.success(null);
+            epochs.receiveFuture(lastReceived + 1).addCallback(() -> 
reportTopology(topology, startSync));
+            return;
         }
 
         long lastAcked = epochs.lastAcknowledged;
         if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
         {
-            epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> 
reportTopology(topology));
-            return AsyncResults.success(null);
+            epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> 
reportTopology(topology, startSync));
+            return;
         }
-        logger.trace("Epoch {} received by {}", topology.epoch(), node);
+        logger.trace("Epoch {} received by {}", topology.epoch(), localId);
 
         epochs.receive(topology);
         topologyUpdatePreListenerNotify(topology);
         for (Listener listener : listeners)
-            listener.onTopologyUpdate(topology);
+            listener.onTopologyUpdate(topology, startSync);
         topologyUpdatePostListenerNotify(topology);
-        return AsyncResults.success(null);
     }
 
-    protected void epochSyncCompletePreListenerNotify(Node.Id node, long 
epoch) {}
+    public synchronized void reportTopology(Topology topology)
+    {
+        reportTopology(topology, true);
+    }
+
+    protected void remoteSyncCompletePreListenerNotify(Node.Id node, long 
epoch) {}
 
-    public synchronized void epochSyncComplete(Node.Id node, long epoch)
+    public synchronized void remoteSyncComplete(Node.Id node, long epoch)
     {
-        epochSyncCompletePreListenerNotify(node, epoch);
+        remoteSyncCompletePreListenerNotify(node, epoch);
         for (Listener listener : listeners)
-            listener.onEpochSyncComplete(node, epoch);
+            listener.onRemoteSyncComplete(node, epoch);
     }
 
     protected void truncateTopologiesPreListenerNotify(long epoch) {}
@@ -276,4 +307,44 @@ public abstract class AbstractConfigurationService 
implements ConfigurationServi
         truncateTopologiesPostListenerNotify(epoch);
         epochs.truncateUntil(epoch);
     }
+
+    public synchronized AsyncChain<Void> epochReady(long epoch)
+    {
+        EpochState state = epochs.getOrCreate(epoch);
+        if (state.reads != null)
+            return state.reads;
+
+        return state.acknowledged.flatMap(r -> state.reads);
+    }
+
+    public abstract static class Minimal extends 
AbstractConfigurationService<Minimal.EpochState, Minimal.EpochHistory>
+    {
+        static class EpochState extends AbstractEpochState
+        {
+            public EpochState(long epoch)
+            {
+                super(epoch);
+            }
+        }
+
+        static class EpochHistory extends AbstractEpochHistory<EpochState>
+        {
+            @Override
+            protected EpochState createEpochState(long epoch)
+            {
+                return new EpochState(epoch);
+            }
+        }
+
+        public Minimal(Node.Id node)
+        {
+            super(node);
+        }
+
+        @Override
+        protected EpochHistory createEpochHistory()
+        {
+            return new EpochHistory();
+        }
+    }
 }
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java 
b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
new file mode 100644
index 00000000..283611e7
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.coordinate.FetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Status;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.ReadData;
+import accord.messages.WaitAndReadData;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.primitives.Routables.Slice.Minimal;
+
+public abstract class AbstractFetchCoordinator extends FetchCoordinator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractFetchCoordinator.class);
+
+    static class FetchResult extends AsyncResults.SettableResult<Ranges> 
implements DataStore.FetchResult
+    {
+        final AbstractFetchCoordinator coordinator;
+
+        FetchResult(AbstractFetchCoordinator coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        @Override
+        public void abort(Ranges abort)
+        {
+            coordinator.abort(abort);
+        }
+    }
+
+    static class Key
+    {
+        final Node.Id id;
+        final Ranges ranges;
+
+        Key(Node.Id id, Ranges ranges)
+        {
+            this.id = id;
+            this.ranges = ranges;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return (31  + id.hashCode()) * 31 + ranges.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj) return true;
+            if (!(obj instanceof Key)) return false;
+            Key that = (Key) obj;
+            return id.equals(that.id) && ranges.equals(that.ranges);
+        }
+    }
+
+    final DataStore.FetchRanges fetchRanges;
+    final CommandStore commandStore;
+    final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>();
+    final FetchResult result = new FetchResult(this);
+    final List<AsyncResult<Void>> persisting = new ArrayList<>();
+
+    protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint 
syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore)
+    {
+        super(node, ranges, syncPoint, fetchRanges);
+        this.fetchRanges = fetchRanges;
+        this.commandStore = commandStore;
+    }
+
+    public CommandStore commandStore()
+    {
+        return commandStore;
+    }
+
+    protected abstract PartialTxn rangeReadTxn(Ranges ranges);
+
+    protected abstract void onReadOk(Node.Id from, CommandStore commandStore, 
Data data, Ranges ranges);
+
+    @Override
+    public void contact(Node.Id to, Ranges ranges)
+    {
+        Key key = new Key(to, ranges);
+        inflight.put(key, starting(to, ranges));
+        Ranges ownedRanges = ownedRangesForNode(to);
+        Invariants.checkArgument(ownedRanges.containsAll(ranges), "Got a reply 
from %s for ranges %s, but owned ranges %s does not contain all the ranges", 
to, ranges, ownedRanges);
+        PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
+        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), 
syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new 
Callback<ReadData.ReadReply>()
+        {
+            @Override
+            public void onSuccess(Node.Id from, ReadData.ReadReply reply)
+            {
+                if (!reply.isOk())
+                {
+                    fail(to, new RuntimeException(reply.toString()));
+                    inflight.remove(key).cancel();
+                    switch ((ReadData.ReadNack) reply)
+                    {
+                        default: throw new AssertionError("Unhandled enum: " + 
reply);
+                        case Invalid:
+                        case Redundant:
+                        case NotCommitted:
+                            throw new AssertionError(String.format("Unexpected 
reply: %s", reply));
+                        case Error:
+                            // TODO (required): ensure errors are propagated 
to coordinators and can be logged
+                    }
+                    return;
+                }
+
+                FetchResponse ok = (FetchResponse) reply;
+                Ranges received;
+                if (ok.unavailable != null)
+                {
+                    unavailable(to, ok.unavailable);
+                    if (ok.data == null)
+                    {
+                        inflight.remove(key).cancel();
+                        return;
+                    }
+                    received = ranges.difference(ok.unavailable);
+                }
+                else
+                {
+                    received = ranges;
+                }
+
+                // TODO (now): make sure it works if invoked in either order
+                inflight.remove(key).started(ok.maxApplied);
+                onReadOk(to, commandStore, ok.data, received);
+                // received must be invoked after submitting the persistence 
future, as it triggers onDone
+                // which creates a ReducingFuture over {@code persisting}
+            }
+
+            @Override
+            public void onFailure(Node.Id from, Throwable failure)
+            {
+                inflight.remove(key).cancel();
+                fail(from, failure);
+            }
+
+            @Override
+            public void onCallbackFailure(Node.Id from, Throwable failure)
+            {
+                // TODO (soon)
+                logger.error("Fetch coordination failure from " + from, 
failure);
+            }
+        });
+    }
+
+    public FetchResult result()
+    {
+        return result;
+    }
+
+    @Override
+    protected void onDone(Ranges success, Throwable failure)
+    {
+        if (failure != null || success.isEmpty()) result.setFailure(failure);
+        else if (persisting.isEmpty()) result.setSuccess(Ranges.EMPTY);
+        else AsyncChains.reduce(persisting, (a, b) -> null)
+                        .begin((s, f) -> {
+                            if (f == null) result.setSuccess(ranges);
+                            else result.setFailure(f);
+                        });
+    }
+
+    @Override
+    public void start()
+    {
+        super.start();
+    }
+
+    void abort(Ranges abort)
+    {
+        // TODO (required, later): implement abort
+    }
+
+    public static class FetchRequest extends WaitAndReadData
+    {
+        public final PartialDeps partialDeps;
+        private transient Timestamp maxApplied;
+
+        public FetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, 
PartialDeps partialDeps, PartialTxn partialTxn)
+        {
+            super(ranges, sourceEpoch, Status.Applied, partialDeps, 
Timestamp.MAX, syncId, partialTxn);
+            this.partialDeps = partialDeps;
+        }
+
+        @Override
+        protected void readComplete(CommandStore commandStore, Data result, 
Ranges unavailable)
+        {
+            Ranges slice = 
commandStore.rangesForEpochHolder().get().allAt(executeReadAt).difference(unavailable);
+            commandStore.maxAppliedFor(readScope, slice).begin((newMaxApplied, 
failure) -> {
+                if (failure != null)
+                {
+                    commandStore.agent().onUncaughtException(failure);
+                }
+                else
+                {
+                    synchronized (this)
+                    {
+                        if (maxApplied == null) maxApplied = newMaxApplied;
+                        else maxApplied = Timestamp.max(maxApplied, 
newMaxApplied);
+                        Ranges reportUnavailable = 
unavailable.slice((Ranges)this.readScope, Minimal);
+                        super.readComplete(commandStore, result, 
reportUnavailable);
+                    }
+                }
+            });
+        }
+
+        @Override
+        protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
+        {
+            node.reply(replyTo, replyContext, new FetchResponse(unavailable, 
data, maxApplied));
+        }
+
+        @Override
+        public MessageType type()
+        {
+            return MessageType.FETCH_DATA_REQ;
+        }
+    }
+
+    public static class FetchResponse extends ReadData.ReadOk
+    {
+        public final Timestamp maxApplied;
+        public FetchResponse(@Nullable Ranges unavailable, @Nullable Data 
data, Timestamp maxApplied)
+        {
+            super(unavailable, data);
+            this.maxApplied = maxApplied;
+        }
+
+        @Override
+        public MessageType type()
+        {
+            return MessageType.FETCH_DATA_RSP;
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index a380bdec..f4b160bd 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -81,8 +81,6 @@ import static accord.local.SafeCommandStore.TestDep.WITH;
 import static accord.local.Status.Committed;
 import static accord.local.Status.PreAccepted;
 import static accord.local.Status.PreCommitted;
-import static accord.local.Status.Applied;
-import static accord.local.Status.Invalidated;
 import static accord.primitives.Routables.Slice.Minimal;
 
 public abstract class InMemoryCommandStore extends CommandStore
@@ -716,19 +714,6 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             });
         }
 
-        public Timestamp maxApplied(Seekables<?, ?> keysOrRanges, Ranges slice)
-        {
-            Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
-            Timestamp timestamp = Timestamp.NONE;
-            for (SafeCommand safeCommand : commands.values())
-            {
-                Command command = safeCommand.current();
-                if (command.hasBeen(Applied) && !command.hasBeen(Invalidated) 
&& command.partialTxn().keys().intersects(sliced))
-                    timestamp = Timestamp.max(timestamp, command.executeAt());
-            }
-            return timestamp;
-        }
-
         @Override
         public NodeTimeService time()
         {
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 37046e42..becc7538 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -128,6 +128,21 @@ public abstract class CommandStore implements AgentExecutor
 
     public abstract void shutdown();
 
+    private static Timestamp maxApplied(SafeCommandStore safeStore, 
Seekables<?, ?> keysOrRanges, Ranges slice)
+    {
+        return safeStore.mapReduce(keysOrRanges, slice, 
SafeCommandStore.TestKind.Ws,
+                                   
SafeCommandStore.TestTimestamp.STARTED_AFTER, Timestamp.NONE,
+                                   SafeCommandStore.TestDep.ANY_DEPS, null,
+                                   Status.Applied, Status.Applied,
+                                   (key, txnId, executeAt, max) -> 
Timestamp.max(max, executeAt),
+                                   Timestamp.NONE, Timestamp.MAX);
+    }
+
+    public AsyncChain<Timestamp> maxAppliedFor(Seekables<?, ?> keysOrRanges, 
Ranges slice)
+    {
+        return submit(PreLoadContext.contextFor(keysOrRanges), safeStore -> 
maxApplied(safeStore, keysOrRanges, slice));
+    }
+
     // implementations are expected to override this for persistence
     protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore)
     {
@@ -154,6 +169,21 @@ public abstract class CommandStore implements AgentExecutor
         this.safeToRead = newSafeToRead;
     }
 
+    public NavigableMap<TxnId, Ranges> bootstrapBeganAt()
+    {
+        return bootstrapBeganAt;
+    }
+
+    public NavigableMap<Timestamp, Ranges> safeToRead()
+    {
+        return safeToRead;
+    }
+
+    public long maxBootstrapEpoch()
+    {
+        return maxBootstrapEpoch;
+    }
+
     public void markExclusiveSyncPoint(TxnId txnId, Ranges ranges, 
SafeCommandStore safeStore)
     {
         Invariants.checkArgument(txnId.rw() == ExclusiveSyncPoint);
@@ -286,7 +316,7 @@ public abstract class CommandStore implements AgentExecutor
      * So, the outer future's success is sufficient for the topology to be 
acknowledged, and the inner future for the
      * bootstrap to be complete.
      */
-    Supplier<EpochReady> sync(Node node, Ranges ranges, long epoch)
+    protected Supplier<EpochReady> sync(Node node, Ranges ranges, long epoch)
     {
         return () -> {
             AsyncResults.SettableResult<Void> whenDone = new 
AsyncResults.SettableResult<>();
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index c5dca548..2cb4eef1 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -331,7 +331,7 @@ public abstract class CommandStores
         return newLocalTopology.epoch() != 1;
     }
 
-    private synchronized TopologyUpdate updateTopology(Node node, Snapshot 
prev, Topology newTopology)
+    private synchronized TopologyUpdate updateTopology(Node node, Snapshot 
prev, Topology newTopology, boolean startSync)
     {
         checkArgument(!newTopology.isSubset(), "Use full topology for 
CommandStores.updateTopology");
 
@@ -364,8 +364,10 @@ public abstract class CommandStores
                 bootstrapUpdates.add(shard.store.interruptBootstraps(epoch, 
newRanges.currentRanges()));
             }
             // TODO (desired): only sync affected shards
-            if (epoch > 1)
-                bootstrapUpdates.add(shard.store.sync(node, 
shard.ranges().currentRanges(), epoch));
+            Ranges ranges = shard.ranges().currentRanges();
+            // ranges can be empty when ranges are lost or consolidated across 
epochs.
+            if (epoch > 1 && startSync && !ranges.isEmpty())
+                bootstrapUpdates.add(shard.store.sync(node, ranges, epoch));
             result.add(shard);
         }
 
@@ -378,7 +380,7 @@ public abstract class CommandStores
                 ShardHolder shardHolder = new 
ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder);
                 rangesHolder.current = new RangesForEpoch(epoch, add, 
shardHolder.store);
 
-                Map<Boolean, Ranges> partitioned = add.partitioningBy(range -> 
shouldBootstrap(node, prev.local, newLocalTopology, range));
+                Map<Boolean, Ranges> partitioned = add.partitioningBy(range -> 
shouldBootstrap(node, prev.global, newLocalTopology, range));
                 if (partitioned.containsKey(true))
                     bootstrapUpdates.add(shardHolder.store.bootstrapper(node, 
partitioned.get(true), newLocalTopology.epoch()));
                 if (partitioned.containsKey(false))
@@ -515,9 +517,9 @@ public abstract class CommandStores
         return chain;
     }
 
-    public synchronized Supplier<EpochReady> updateTopology(Node node, 
Topology newTopology)
+    public synchronized Supplier<EpochReady> updateTopology(Node node, 
Topology newTopology, boolean startSync)
     {
-        TopologyUpdate update = updateTopology(node, current, newTopology);
+        TopologyUpdate update = updateTopology(node, current, newTopology, 
startSync);
         current = update.snapshot;
         return update.bootstrap;
     }
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 01e83ebb..973885fd 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -149,9 +149,10 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         configService.registerListener(this);
     }
 
+    // TODO (cleanup, testing): remove, only used by Maelstrom
     public AsyncResult<Void> start()
     {
-        return 
onTopologyUpdateInternal(configService.currentTopology()).metadata;
+        return onTopologyUpdateInternal(configService.currentTopology(), 
false).metadata;
     }
 
     public CommandStores commandStores()
@@ -175,25 +176,25 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         return topology().epoch();
     }
 
-    private synchronized EpochReady onTopologyUpdateInternal(Topology topology)
+    private synchronized EpochReady onTopologyUpdateInternal(Topology 
topology, boolean startSync)
     {
-        Supplier<EpochReady> bootstrap = commandStores.updateTopology(this, 
topology);
+        Supplier<EpochReady> bootstrap = commandStores.updateTopology(this, 
topology, startSync);
         this.topology.onTopologyUpdate(topology);
         return bootstrap.get();
     }
 
     @Override
-    public synchronized AsyncResult<Void> onTopologyUpdate(Topology topology)
+    public synchronized AsyncResult<Void> onTopologyUpdate(Topology topology, 
boolean startSync)
     {
         if (topology.epoch() <= this.topology.epoch())
             return AsyncResults.success(null);
-        EpochReady ready = onTopologyUpdateInternal(topology);
+        EpochReady ready = onTopologyUpdateInternal(topology, startSync);
         configService.acknowledgeEpoch(ready);
         return ready.coordination;
     }
 
     @Override
-    public void onEpochSyncComplete(Id node, long epoch)
+    public void onRemoteSyncComplete(Id node, long epoch)
     {
         topology.onEpochSyncComplete(node, epoch);
     }
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java 
b/accord-core/src/main/java/accord/messages/MessageType.java
index f5361e6e..f3b44648 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -47,6 +47,8 @@ public enum MessageType
     INFORM_HOME_DURABLE_REQ (true ),
     CHECK_STATUS_REQ        (false),
     CHECK_STATUS_RSP        (false),
+    FETCH_DATA_REQ          (false),
+    FETCH_DATA_RSP          (false)
     ;
 
     /**
@@ -58,4 +60,4 @@ public enum MessageType
     {
         this.hasSideEffects = hasSideEffects;
     }
-}
\ No newline at end of file
+}
diff --git a/accord-core/src/main/java/accord/messages/WaitAndReadData.java 
b/accord-core/src/main/java/accord/messages/WaitAndReadData.java
index 33716a7c..c854aa94 100644
--- a/accord-core/src/main/java/accord/messages/WaitAndReadData.java
+++ b/accord-core/src/main/java/accord/messages/WaitAndReadData.java
@@ -30,12 +30,12 @@ import org.agrona.collections.Int2ObjectHashMap;
 
 public abstract class WaitAndReadData extends ReadData
 {
-    final Status waitForStatus;
-    final Deps waitOn;
-    protected final Timestamp waitUntil; // this may be set to Timestamp.MAX 
if we want to wait for all deps, regardless of when they execute
-    protected final Timestamp executeReadAt;
-    final PartialTxn read;
-    final Int2ObjectHashMap<LocalBarrier> barriers = new Int2ObjectHashMap<>();
+    public final Status waitForStatus;
+    public final Deps waitOn;
+    public final Timestamp waitUntil; // this may be set to Timestamp.MAX if 
we want to wait for all deps, regardless of when they execute
+    public final Timestamp executeReadAt;
+    public final PartialTxn read;
+    transient final Int2ObjectHashMap<LocalBarrier> barriers = new 
Int2ObjectHashMap<>();
 
     protected WaitAndReadData(Seekables<?, ?> readScope, long waitForEpoch, 
Status waitForStatus, Deps waitOn, Timestamp waitUntil, Timestamp 
executeReadAt, PartialTxn read)
     {
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index 56b251e9..30d06788 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -28,6 +28,7 @@ import accord.local.Node.Id;
 import accord.primitives.*;
 import accord.utils.*;
 import accord.utils.ArrayBuffers.IntBuffers;
+import com.google.common.annotations.VisibleForTesting;
 
 import static accord.utils.SortedArrays.Search.FLOOR;
 import static accord.utils.SortedArrays.exponentialSearch;
@@ -35,6 +36,7 @@ import static accord.utils.SortedArrays.exponentialSearch;
 public class Topology
 {
     public static final Topology EMPTY = new Topology(0, new Shard[0], 
Ranges.EMPTY, Collections.emptyMap(), Ranges.EMPTY, new int[0]);
+    private static final int[] EMPTY_SUBSET = new int[0];
     final long epoch;
     final Shard[] shards;
     final Ranges ranges;
@@ -193,6 +195,12 @@ public class Topology
         return Arrays.binarySearch(supersetIndexes, i);
     }
 
+    @VisibleForTesting
+    public Topology withEmptySubset()
+    {
+        return forSubset(EMPTY_SUBSET);
+    }
+
     public Topology forSelection(Unseekables<?, ?> select, OnUnknown onUnknown)
     {
         return forSelection(select, onUnknown, (ignore, index) -> true, null);
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 9c083a5c..d1fb8bfd 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -69,9 +69,19 @@ public class TopologyManager
             this.local = global.forNode(node).trim();
             Invariants.checkArgument(!global().isSubset());
             // TODO: can we just track sync for local ranges here?
-            this.syncTracker = new QuorumTracker(new Single(sorter, global()));
-            this.syncComplete = syncComplete;
-            this.prevSynced = prevSynced;
+            if (global().size() > 0)
+            {
+                this.syncTracker = new QuorumTracker(new Single(sorter, 
global()));
+                this.syncComplete = syncComplete;
+                this.prevSynced = prevSynced;
+            }
+            else
+            {
+                // if topology is empty, there is nothing to sync
+                this.syncTracker = null;
+                this.syncComplete = true;
+                this.prevSynced = true;
+            }
         }
 
         void markPrevSynced()
@@ -124,7 +134,7 @@ public class TopologyManager
 
         boolean shardIsUnsynced(int idx)
         {
-            return !prevSynced || !syncTracker.get(idx).hasReachedQuorum();
+            return !prevSynced || (!syncComplete && 
!syncTracker.get(idx).hasReachedQuorum());
         }
     }
 
@@ -178,6 +188,8 @@ public class TopologyManager
 
         public long minEpoch()
         {
+            if (currentEpoch == 0)
+                return 0;
             return currentEpoch - epochs.length + 1;
         }
 
@@ -292,17 +304,16 @@ public class TopologyManager
     public synchronized void truncateTopologyUntil(long epoch)
     {
         Epochs current = epochs;
-        checkArgument(current.epoch() >= epoch);
+        checkArgument(current.epoch() >= epoch, "Unable to truncate; epoch %d 
is > current epoch %d", epoch , current.epoch());
 
         if (current.minEpoch() >= epoch)
             return;
 
         int newLen = current.epochs.length - (int) (epoch - 
current.minEpoch());
-        Invariants.checkState(current.epochs[newLen - 1].syncComplete());
+        Invariants.checkState(current.epochs[newLen - 1].syncComplete(), 
"Epoch %d's sync is not complete", current.epochs[newLen - 1].epoch());
 
-        EpochState[] nextEpochs = new EpochState[newLen];
-        System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
-        epochs = new Epochs(nextEpochs, current.pendingSyncComplete, 
current.futureEpochFutures);
+        epochs = new Epochs(Arrays.copyOfRange(current.epochs, 0, newLen),
+                            current.pendingSyncComplete, 
current.futureEpochFutures);
     }
 
     public TopologySorter.Supplier sorter()
diff --git a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java 
b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
index b5962058..e9e4d1b4 100644
--- a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
@@ -62,6 +62,11 @@ public class ReducingIntervalMap<K extends Comparable<? 
super K>, V>
     @VisibleForTesting
     ReducingIntervalMap(boolean inclusiveEnds, K[] ends, V[] values)
     {
+        if (ends.length != values.length - 1)
+            throw new IllegalArgumentException(String.format("Length %d != %d 
- 1; %s -> %s",
+                                                             ends.length, 
values.length,
+                                                             
Arrays.toString(ends), Arrays.toString(values)));
+
         this.inclusiveEnds = inclusiveEnds;
         this.ends = ends;
         this.values = values;
@@ -101,6 +106,11 @@ public class ReducingIntervalMap<K extends Comparable<? 
super K>, V>
         return Arrays.hashCode(values);
     }
 
+    public boolean inclusiveEnds()
+    {
+        return inclusiveEnds;
+    }
+
     public V get(K key)
     {
         int idx = Arrays.binarySearch(ends, key);
@@ -109,11 +119,22 @@ public class ReducingIntervalMap<K extends Comparable<? 
super K>, V>
         return values[idx];
     }
 
-    public V value(int idx)
+    private void checkIndex(int idx)
     {
-        if (idx < 0 || idx > size())
-            throw new IndexOutOfBoundsException();
+        if (idx < 0 || idx > size() - 1)
+            throw new IndexOutOfBoundsException(String.format("%d < 0 or > %d 
- 1", idx, size()));
+    }
+
+    public K key(int idx)
+    {
+        checkIndex(idx);
+        return ends[idx];
+    }
 
+    public V value(int idx)
+    {
+        if (idx < 0 || idx >= values.length)
+            throw new IndexOutOfBoundsException(String.format("%d < 0 or > %d 
- 1", idx, size()));
         return values[idx];
     }
 
diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java 
b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
index 7555a059..4788c176 100644
--- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
@@ -28,6 +28,14 @@ import static accord.utils.SortedArrays.Search.FAST;
 
 public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V>
 {
+    public static class SerializerSupport
+    {
+        public static <V> ReducingRangeMap<V> create(boolean inclusiveEnds, 
RoutingKey[] ends, V[] values)
+        {
+            return new ReducingRangeMap<>(inclusiveEnds, ends, values);
+        }
+    }
+
     final RoutingKeys endKeys;
 
     public ReducingRangeMap(V value)
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java 
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index 4c3e7eb4..5fb6ede3 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -587,4 +587,16 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
             // ignore
         }
     }
+
+    public static void awaitUninterruptiblyAndRethrow(AsyncChain<?> chain)
+    {
+        try
+        {
+            getUninterruptibly(chain);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e.getCause());
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/accord-core/src/test/java/accord/api/TestableConfigurationService.java 
b/accord-core/src/test/java/accord/api/TestableConfigurationService.java
index 264f30bc..0d1a7490 100644
--- a/accord-core/src/test/java/accord/api/TestableConfigurationService.java
+++ b/accord-core/src/test/java/accord/api/TestableConfigurationService.java
@@ -19,9 +19,8 @@
 package accord.api;
 
 import accord.topology.Topology;
-import accord.utils.async.AsyncResult;
 
 public interface TestableConfigurationService extends ConfigurationService
 {
-    AsyncResult<?> reportTopology(Topology topology);
+    void reportTopology(Topology topology);
 }
diff --git 
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java 
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 855366f6..f893e38f 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -34,7 +34,7 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-public class BurnTestConfigurationService extends AbstractConfigurationService 
implements TestableConfigurationService
+public class BurnTestConfigurationService extends 
AbstractConfigurationService.Minimal implements TestableConfigurationService
 {
     private final AgentExecutor executor;
     private final Function<Node.Id, Node> lookup;
@@ -121,7 +121,7 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService i
             if (candidates.isEmpty())
             {
                 candidates.addAll(currentTopology().nodes());
-                candidates.remove(node);
+                candidates.remove(localId);
             }
             int idx = randomSupplier.get().nextInt(candidates.size());
             Node.Id node = candidates.remove(idx);
@@ -157,9 +157,9 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService i
     }
 
     @Override
-    protected void epochSyncComplete(Topology topology)
+    protected void localSyncComplete(Topology topology)
     {
-        topologyUpdates.syncComplete(lookup.apply(node), topology.nodes(), 
topology.epoch());
+        topologyUpdates.syncComplete(lookup.apply(localId), topology.nodes(), 
topology.epoch());
     }
 
     @Override
@@ -174,6 +174,6 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService i
 
     private Node originator()
     {
-        return lookup.apply(node);
+        return lookup.apply(localId);
     }
 }
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java 
b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index 4d99b393..7092dd31 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -69,7 +69,7 @@ public class TopologyUpdates
             pendingTopologies.remove(epoch);
 
         MessageTask.begin(originator, cluster, executor, "SyncComplete:" + 
epoch, (node, from, onDone) -> {
-            node.onEpochSyncComplete(originator.id(), epoch);
+            node.onRemoteSyncComplete(originator.id(), epoch);
             onDone.accept(true);
         });
     }
diff --git 
a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java 
b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
index 658ffd28..dcdafcc6 100644
--- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
@@ -73,7 +73,8 @@ public class TopologyChangeTest
             cluster.configServices(4).forEach(config -> {
                 try
                 {
-                    getUninterruptibly(config.reportTopology(topology2));
+                    config.reportTopology(topology2);
+                    
getUninterruptibly(config.ackFor(topology2.epoch()).coordination);
                 }
                 catch (ExecutionException e)
                 {
diff --git 
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
index d0e6ee03..567736d0 100644
--- 
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
+++ 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
@@ -33,7 +33,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import accord.api.ConfigurationService;
-import accord.impl.AbstractConfigurationService.EpochHistory;
+import accord.impl.AbstractConfigurationService.Minimal.EpochHistory;
 import accord.local.Node.Id;
 import accord.primitives.Range;
 import accord.topology.Shard;
@@ -58,7 +58,7 @@ public class AbstractConfigurationServiceTest
         }
 
         @Override
-        public AsyncResult<Void> onTopologyUpdate(Topology topology)
+        public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean 
startSync)
         {
             if (topologies.put(topology.epoch(), topology) != null)
                 Assertions.fail("Received topology twice for epoch " + 
topology.epoch());
@@ -68,7 +68,7 @@ public class AbstractConfigurationServiceTest
         }
 
         @Override
-        public void onEpochSyncComplete(Id node, long epoch)
+        public void onRemoteSyncComplete(Id node, long epoch)
         {
             Set<Id> synced = syncCompletes.computeIfAbsent(epoch, e -> new 
HashSet<>());
             if (!synced.add(node))
@@ -113,7 +113,7 @@ public class AbstractConfigurationServiceTest
         }
     }
 
-    private static class TestableConfigurationService extends 
AbstractConfigurationService
+    private static class TestableConfigurationService extends 
AbstractConfigurationService.Minimal
     {
         final Set<Long> syncStarted = new HashSet<>();
         final Set<Long> epochsFetched = new HashSet<>();
@@ -130,7 +130,7 @@ public class AbstractConfigurationServiceTest
         }
 
         @Override
-        protected void epochSyncComplete(Topology topology)
+        protected void localSyncComplete(Topology topology)
         {
             if (!syncStarted.add(topology.epoch()))
                 Assertions.fail("Sync started multiple times for " + 
topology.epoch());
diff --git 
a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java 
b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
new file mode 100644
index 00000000..a6684bd9
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.list;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.impl.AbstractFetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.PreLoadContext;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Txn;
+import accord.utils.Timestamped;
+import accord.utils.async.AsyncResult;
+
+public class ListFetchCoordinator extends AbstractFetchCoordinator
+{
+    private final ListStore listStore;
+    final List<AsyncResult<Void>> persisting = new ArrayList<>();
+
+    public ListFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, 
DataStore.FetchRanges fetchRanges, CommandStore commandStore, ListStore 
listStore)
+    {
+        super(node, ranges, syncPoint, fetchRanges, commandStore);
+        this.listStore = listStore;
+    }
+
+    @Override
+    protected PartialTxn rangeReadTxn(Ranges ranges)
+    {
+        return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, new 
ListRead(Function.identity(), ranges, ranges), new ListQuery(Node.Id.NONE, 
Long.MIN_VALUE), null);
+    }
+
+    @Override
+    protected void onReadOk(Node.Id from, CommandStore commandStore, Data 
data, Ranges received)
+    {
+        if (data == null)
+            return;
+
+        ListData listData = (ListData) data;
+        persisting.add(commandStore.execute(PreLoadContext.empty(), safeStore 
-> {
+            listData.forEach((key, value) -> listStore.data.merge(key, value, 
Timestamped::merge));
+        }).addCallback((ignore, fail) -> {
+            if (fail == null) success(from, received);
+            else fail(from, received, fail);
+        }).beginAsResult());
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index 1385d9f5..770cc038 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -18,256 +18,25 @@
 
 package accord.impl.list;
 
-import java.util.*;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
-import javax.annotation.Nullable;
-
-import accord.api.Data;
+import accord.api.DataStore;
 import accord.api.Key;
-import accord.coordinate.FetchCoordinator;
-import accord.impl.InMemoryCommandStore;
-import accord.local.CommandStore;
 import accord.local.Node;
-import accord.api.DataStore;
-import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
-import accord.local.Status;
-import accord.messages.ReadData.ReadNack;
-import accord.messages.Callback;
-import accord.messages.ReadData.ReadOk;
-import accord.messages.ReadData.ReadReply;
-import accord.messages.WaitAndReadData;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
 import accord.primitives.SyncPoint;
 import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.utils.Invariants;
 import accord.utils.Timestamped;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults.SettableResult;
-
-import static accord.primitives.Routables.Slice.Minimal;
 
 public class ListStore implements DataStore
 {
-    static class SyncResult extends SettableResult<Ranges> implements 
FetchResult
-    {
-        final SyncCoordinator coordinator;
-
-        SyncResult(SyncCoordinator coordinator)
-        {
-            this.coordinator = coordinator;
-        }
-
-        @Override
-        public void abort(Ranges abort)
-        {
-            coordinator.abort(abort);
-        }
-    }
-    static class SyncCoordinator extends FetchCoordinator
-    {
-        static class Key
-        {
-            final Node.Id id;
-            final Ranges ranges;
-
-            Key(Node.Id id, Ranges ranges)
-            {
-                this.id = id;
-                this.ranges = ranges;
-            }
-
-            @Override
-            public int hashCode()
-            {
-                return id.hashCode() + ranges.hashCode();
-            }
-
-            @Override
-            public boolean equals(Object obj)
-            {
-                if (this == obj) return true;
-                if (!(obj instanceof Key)) return false;
-                Key that = (Key) obj;
-                return id.equals(that.id) && ranges.equals(that.ranges);
-            }
-        }
-
-        final FetchRanges fetchRanges;
-        final CommandStore commandStore;
-        final ListStore dataStore;
-        final Map<Key, StartingRangeFetch> inflight = new HashMap<>();
-        final SyncResult done = new SyncResult(this);
-        final List<AsyncResult<Void>> persisting = new ArrayList<>();
-
-        private SyncCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, 
FetchRanges fetchRanges, CommandStore commandStore, ListStore dataStore)
-        {
-            super(node, ranges, syncPoint, fetchRanges);
-            this.fetchRanges = fetchRanges;
-            this.commandStore = commandStore;
-            this.dataStore = dataStore;
-        }
-
-        @Override
-        public void contact(Node.Id to, Ranges ranges)
-        {
-            Key key = new Key(to, ranges);
-            inflight.put(key, starting(to, ranges));
-            Ranges ownedRanges = ownedRangesForNode(to);
-            Invariants.checkArgument(ownedRanges.containsAll(ranges));
-            PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, 
ranges);
-            PartialTxn partialTxn = new PartialTxn.InMemory(ranges, 
Txn.Kind.Read, ranges, new ListRead(unsafeStore -> unsafeStore, ranges, 
ranges), new ListQuery(Node.Id.NONE, Long.MIN_VALUE), null);
-            node.send(to, new StoreSync(syncPoint.sourceEpoch(), 
syncPoint.syncId, ranges, partialDeps, partialTxn), new Callback<ReadReply>()
-            {
-                @Override
-                public void onSuccess(Node.Id from, ReadReply reply)
-                {
-                    if (!reply.isOk())
-                    {
-                        fail(to, new RuntimeException(reply.toString()));
-                        inflight.remove(key).cancel();
-                        switch ((ReadNack) reply)
-                        {
-                            default: throw new AssertionError("Unhandled 
enum");
-                            case Invalid:
-                            case Redundant:
-                            case NotCommitted:
-                                throw new AssertionError();
-                            case Error:
-                                // TODO (required): ensure errors are 
propagated to coordinators and can be logged
-                        }
-                        return;
-                    }
-
-                    SyncReply ok = (SyncReply) reply;
-                    Ranges received;
-                    if (ok.unavailable != null)
-                    {
-                        unavailable(to, ok.unavailable);
-                        if (ok.data == null)
-                        {
-                            inflight.remove(key).cancel();
-                            return;
-                        }
-                        received = ranges.difference(ok.unavailable);
-                    }
-                    else
-                    {
-                        received = ranges;
-                    }
-
-                    // TODO (now): make sure it works if invoked in either 
order
-                    inflight.remove(key).started(ok.maxApplied);
-                    ListData data = (ListData) ok.data;
-                    if (data != null)
-                    {
-                        
persisting.add(commandStore.execute(PreLoadContext.empty(), safeStore -> {
-                            data.forEach((key, value) -> 
dataStore.data.merge(key, value, Timestamped::merge));
-                        }).addCallback((ignore, fail) -> {
-                            synchronized (this)
-                            {
-                                if (fail == null) success(to, received);
-                                else fail(to, received, fail);
-                            }
-                        }).beginAsResult());
-                    }
-                    // received must be invoked after submitting the 
persistence future, as it triggers onDone
-                    // which creates a ReducingFuture over {@code persisting}
-                }
-
-                @Override
-                public void onFailure(Node.Id from, Throwable failure)
-                {
-                    inflight.remove(key).cancel();
-                    fail(from, failure);
-                }
-
-                @Override
-                public void onCallbackFailure(Node.Id from, Throwable failure)
-                {
-                    // TODO (soon)
-                    failure.printStackTrace();
-                }
-            });
-        }
-
-        @Override
-        protected void onDone(Ranges success, Throwable failure)
-        {
-            if (success.isEmpty()) done.setFailure(failure);
-            else if (persisting.isEmpty()) done.setSuccess(null);
-            else AsyncChains.reduce(persisting, (a, b)-> null)
-                               .begin((s, f) -> {
-                                   if (f == null) done.setSuccess(ranges);
-                                   else done.setFailure(f);
-                               });
-        }
-
-        @Override
-        protected void start()
-        {
-            super.start();
-        }
-
-        void abort(Ranges abort)
-        {
-            // TODO (required, later): implement abort
-        }
-    }
-
-    static class StoreSync extends WaitAndReadData
-    {
-        final PartialDeps partialDeps;
-        Timestamp maxApplied;
-
-        StoreSync(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps 
partialDeps, PartialTxn partialTxn)
-        {
-
-            super(ranges, sourceEpoch, Status.Applied, partialDeps, 
Timestamp.MAX, syncId, partialTxn);
-            this.partialDeps = partialDeps;
-        }
-
-        @Override
-        protected void readComplete(CommandStore commandStore, Data result, 
Ranges unavailable)
-        {
-            commandStore.execute(PreLoadContext.empty(), safeStore -> {
-                Ranges slice = 
safeStore.ranges().allAt(executeReadAt).difference(unavailable);
-                Timestamp newMaxApplied = 
((InMemoryCommandStore.InMemorySafeStore)safeStore).maxApplied(readScope, 
slice);
-                synchronized (this)
-                {
-                    if (maxApplied == null) maxApplied = newMaxApplied;
-                    else maxApplied = Timestamp.max(maxApplied, newMaxApplied);
-                    Ranges reportUnavailable = 
unavailable.slice((Ranges)this.readScope, Minimal);
-                    super.readComplete(commandStore, result, 
reportUnavailable);
-                }
-            }).begin(node.agent());
-        }
-
-        @Override
-        protected void reply(@Nullable Ranges unavailable, @Nullable Data data)
-        {
-            node.reply(replyTo, replyContext, new SyncReply(unavailable, data, 
maxApplied));
-        }
-    }
-
-    static class SyncReply extends ReadOk
-    {
-        final Timestamp maxApplied;
-        public SyncReply(@Nullable Ranges unavailable, @Nullable Data data, 
Timestamp maxApplied)
-        {
-            super(unavailable, data);
-            this.maxApplied = maxApplied;
-        }
-    }
-
     static final Timestamped<int[]> EMPTY = new Timestamped<>(Timestamp.NONE, 
new int[0]);
     final NavigableMap<RoutableKey, Timestamped<int[]>> data = new TreeMap<>();
 
@@ -300,8 +69,8 @@ public class ListStore implements DataStore
     @Override
     public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges 
ranges, SyncPoint syncPoint, FetchRanges callback)
     {
-        SyncCoordinator coordinator = new SyncCoordinator(node, ranges, 
syncPoint, callback, safeStore.commandStore(), this);
+        ListFetchCoordinator coordinator = new ListFetchCoordinator(node, 
ranges, syncPoint, callback, safeStore.commandStore(), this);
         coordinator.start();
-        return coordinator.done;
+        return coordinator.result();
     }
 }
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index d2bc2337..c3516377 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -119,7 +119,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
                         SimpleProgressLog::new,
                         InMemoryCommandStores.SingleThread::new);
         awaitUninterruptibly(node.start());
-        node.onTopologyUpdate(topology);
+        node.onTopologyUpdate(topology, true);
         return node;
     }
 
diff --git 
a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java 
b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
index 45e6a9b7..bae0cec5 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
@@ -35,6 +35,7 @@ public class MockConfigurationService implements 
TestableConfigurationService
 {
     private final MessageSink messageSink;
     private final List<Topology> epochs = new ArrayList<>();
+    private final Map<Long, EpochReady> acks = new HashMap<>();
     private final List<AsyncResult<Void>> syncs = new ArrayList<>();
     private final List<Listener> listeners = new ArrayList<>();
     private final EpochFunction<MockConfigurationService> fetchTopologyHandler;
@@ -81,34 +82,40 @@ public class MockConfigurationService implements 
TestableConfigurationService
     }
 
     @Override
-    public void acknowledgeEpoch(EpochReady epoch)
+    public synchronized void acknowledgeEpoch(EpochReady epoch)
     {
+        Assertions.assertFalse(acks.containsKey(epoch.epoch));
+        acks.put(epoch.epoch, epoch);
+    }
+
+    public synchronized EpochReady ackFor(long epoch)
+    {
+        return acks.get(epoch);
     }
 
     @Override
-    public synchronized AsyncResult<Void> reportTopology(Topology topology)
+    public synchronized void reportTopology(Topology topology)
     {
         if (topology.epoch() > epochs.size())
-            return syncs.get((int)topology.epoch() - 1);
+            return;
 
         Assertions.assertEquals(topology.epoch(), epochs.size());
         epochs.add(topology);
 
         List<AsyncResult<Void>> futures = new ArrayList<>();
         for (Listener listener : listeners)
-            futures.add(listener.onTopologyUpdate(topology));
+            futures.add(listener.onTopologyUpdate(topology, true));
 
         AsyncResult<Void> result = futures.isEmpty()
            ? AsyncResults.success(null)
            : AsyncChains.reduce(futures, (a, b) -> null).beginAsResult();
 
         syncs.add(result);
-        return result;
     }
 
     public synchronized void reportSyncComplete(Node.Id node, long epoch)
     {
         for (Listener listener : listeners)
-            listener.onEpochSyncComplete(node, epoch);
+            listener.onRemoteSyncComplete(node, epoch);
     }
 }
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java 
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index c39f46a1..a1cfa215 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -100,7 +100,7 @@ public class ImmutableCommandTest
                         new MockCluster.Clock(100), () -> storeSupport.data, 
new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new 
TestAgent(), new DefaultRandom(), null,
                         SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
         awaitUninterruptibly(node.start());
-        node.onTopologyUpdate(storeSupport.local.get());
+        node.onTopologyUpdate(storeSupport.local.get(), true);
         return node;
     }
 
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java 
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index bec84683..36825cb6 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -152,23 +152,27 @@ public class TopologyManagerTest
     {
         Range range = range(100, 200);
         Topology topology1 = topology(1, shard(range, idList(1, 2, 3), 
idSet(1, 2)));
-        Topology topology2 = topology(2, shard(range, idList(1, 2, 3), 
idSet(2, 3)));
+        Topology topology2 = topology(2, shard(range, idList(1, 2, 3), 
idSet(1, 2)));
+        Topology topology3 = topology(3, shard(range, idList(1, 2, 3), 
idSet(2, 3)));
 
         TopologyManager service = new TopologyManager(SUPPLIER, ID);
 
         Assertions.assertSame(Topology.EMPTY, service.current());
         service.onTopologyUpdate(topology1);
         service.onTopologyUpdate(topology2);
+        service.onTopologyUpdate(topology3);
         Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
 
         RoutingKeys keys = keys(150).toUnseekables();
-        Assertions.assertEquals(topologies(topology2.forSelection(keys, 
Topology.OnUnknown.REJECT), topology1.forSelection(keys, 
Topology.OnUnknown.REJECT)),
-                                service.withUnsyncedEpochs(keys, 2, 2));
+        Assertions.assertEquals(topologies(topology3.forSelection(keys, 
Topology.OnUnknown.REJECT), topology2.forSelection(keys, 
Topology.OnUnknown.REJECT), topology1.withEmptySubset()),
+                                service.withUnsyncedEpochs(keys, 3, 3));
 
         service.onEpochSyncComplete(id(2), 2);
         service.onEpochSyncComplete(id(3), 2);
-        Assertions.assertEquals(topologies(topology2.forSelection(keys, 
Topology.OnUnknown.REJECT)),
-                                service.withUnsyncedEpochs(keys, 2, 2));
+        service.onEpochSyncComplete(id(2), 3);
+        service.onEpochSyncComplete(id(3), 3);
+        Assertions.assertEquals(topologies(topology3.forSelection(keys, 
Topology.OnUnknown.REJECT)),
+                                service.withUnsyncedEpochs(keys, 3, 3));
     }
 
     /**
@@ -182,22 +186,26 @@ public class TopologyManagerTest
                                       shard(range(100, 200), idList(1, 2, 3), 
idSet(1, 2)),
                                       shard(range(200, 300), idList(4, 5, 6), 
idSet(4, 5)));
         Topology topology2 = topology(2,
+                                      shard(range(100, 200), idList(1, 2, 3), 
idSet(1, 2)),
+                                      shard(range(200, 300), idList(4, 5, 6), 
idSet(4, 5)));
+        Topology topology3 = topology(3,
                                       shard(range(100, 200), idList(1, 2, 3), 
idSet(1, 2)),
                                       shard(range(200, 300), idList(4, 5, 6), 
idSet(5, 6)));
 
         TopologyManager service = new TopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(topology1);
         service.onTopologyUpdate(topology2);
+        service.onTopologyUpdate(topology3);
 
-        // no acks, so all epoch 1 shards should be included
-        Assertions.assertEquals(topologies(topology2, topology1),
-                                service.withUnsyncedEpochs(keys(150, 
250).toUnseekables(), 2, 2));
+        // no acks, so all epoch 2 shards should be included
+        Assertions.assertEquals(topologies(topology3, topology2, 
topology1.withEmptySubset()),
+                                service.withUnsyncedEpochs(keys(150, 
250).toUnseekables(), 3, 3));
 
         // first topology acked, so only the second shard should be included
-        service.onEpochSyncComplete(id(1), 1);
-        service.onEpochSyncComplete(id(2), 1);
-        Topologies actual = service.withUnsyncedEpochs(keys(150, 
250).toUnseekables(), 2, 2);
-        Assertions.assertEquals(topologies(topology2, topology(1, 
shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))),
+        service.onEpochSyncComplete(id(1), 2);
+        service.onEpochSyncComplete(id(2), 2);
+        Topologies actual = service.withUnsyncedEpochs(keys(150, 
250).toUnseekables(), 3, 3);
+        Assertions.assertEquals(topologies(topology3, topology(2, 
shard(range(200, 300), idList(4, 5, 6), idSet(4, 5))), 
topology1.withEmptySubset()),
                                 actual);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to