This is an automated email from the ASF dual-hosted git repository.

belliottsmith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a10cd05 Safely regain ranges and delete retired command stores
0a10cd05 is described below

commit 0a10cd056794c05588114f45ce86d49d6d6538db
Author: Alan Wang <[email protected]>
AuthorDate: Thu Jun 4 14:13:05 2026 -0700

    Safely regain ranges and delete retired command stores
    
    patch by Alan Wang; reviewed by Benedict for CASSANDRA-21212
---
 accord-core/src/main/java/accord/api/Journal.java  |  17 +-
 .../main/java/accord/coordinate/MaybeRecover.java  |   1 -
 .../java/accord/impl/AbstractSafeCommandStore.java |   9 +
 .../src/main/java/accord/local/CommandStore.java   |  35 +++-
 .../src/main/java/accord/local/CommandStores.java  | 228 ++++++++++++++++++---
 .../local/DeletedCommandStoresException.java       |  39 ++++
 .../local/OverlappingCommandStoresException.java   |  39 ++++
 .../main/java/accord/local/SafeCommandStore.java   |  10 +-
 .../src/main/java/accord/topology/ActiveEpoch.java |   4 +-
 .../main/java/accord/topology/ActiveEpochs.java    |  59 +++---
 .../src/main/java/accord/topology/Topology.java    |  17 ++
 .../main/java/accord/topology/TopologyManager.java |  90 ++++++--
 .../accord/impl/basic/DelayedCommandStores.java    |  29 ++-
 .../java/accord/impl/basic/InMemoryJournal.java    |  21 ++
 .../java/accord/impl/basic/LoggingJournal.java     |   6 +
 .../src/test/java/accord/impl/list/ListAgent.java  |   9 +-
 .../java/accord/topology/TopologyManagerTest.java  |   7 -
 .../java/accord/topology/TopologyRandomizer.java   |  44 ++--
 .../src/main/java/accord/maelstrom/Cluster.java    |   1 +
 19 files changed, 550 insertions(+), 115 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index 8ba7db18..883c53f8 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
 import accord.impl.CommandChange;
 import accord.local.Command;
 import accord.local.CommandStores;
+import accord.local.CommandStores.PreviouslyOwned;
 import accord.local.DurableBefore;
 import accord.local.MinimalCommand;
 import accord.local.Node;
@@ -64,12 +65,14 @@ public interface Journal
     void saveCommand(int store, CommandUpdate value, Runnable onFlush);
 
     List<? extends TopologyUpdate> loadTopologies();
+    // TODO (required): saveTopology should be synchronous, or we should at 
least await durability before updating in memory topology
     void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush);
 
     RedundantBefore loadRedundantBefore(int store);
     NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store);
     NavigableMap<Timestamp, Ranges> loadSafeToRead(int store);
     CommandStores.RangesForEpoch loadRangesForEpoch(int store);
+    Ranges loadPermanentlyUnsafeToRead(int store);
     void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush);
 
     Persister<DurableBefore, DurableBefore> durableBeforePersister();
@@ -86,16 +89,18 @@ public interface Journal
     {
         public final Int2ObjectHashMap<CommandStores.RangesForEpoch> 
commandStores;
         public final Topology global;
+        public final PreviouslyOwned previouslyOwned;
 
-        public TopologyUpdate(@Nonnull 
Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores, @Nonnull 
Topology global)
+        public TopologyUpdate(@Nonnull 
Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores, @Nonnull 
Topology global, PreviouslyOwned previouslyOwned)
         {
             this.commandStores = commandStores;
             this.global = global;
+            this.previouslyOwned = previouslyOwned;
         }
 
         public boolean isEquivalent(TopologyUpdate other)
         {
-            boolean equivalent = global.isEquivalent(other.global);
+            boolean equivalent = global.isEquivalent(other.global) && 
Objects.equals(previouslyOwned, other.previouslyOwned);
             if (!equivalent)
                 return false;
             Invariants.require(commandStores.equals(other.commandStores));
@@ -104,7 +109,7 @@ public interface Journal
 
         public TopologyUpdate cloneWithEquivalentEpoch(long epoch)
         {
-            return new TopologyUpdate(commandStores, 
global.cloneEquivalentWithEpoch(epoch));
+            return new TopologyUpdate(commandStores, 
global.cloneEquivalentWithEpoch(epoch), previouslyOwned);
         }
 
         @Override
@@ -113,7 +118,8 @@ public interface Journal
             if (this == object) return true;
             if (object == null || getClass() != object.getClass()) return 
false;
             TopologyUpdate update = (TopologyUpdate) object;
-            return Objects.equals(commandStores, update.commandStores) && 
Objects.equals(global, update.global);
+            return Objects.equals(commandStores, update.commandStores) && 
Objects.equals(global, update.global)
+                   && Objects.equals(previouslyOwned, update.previouslyOwned);
         }
 
         @Override
@@ -151,6 +157,7 @@ public interface Journal
         public RedundantBefore newRedundantBefore;
         public NavigableMap<TxnId, Ranges> newBootstrapBeganAt;
         public NavigableMap<Timestamp, Ranges> newSafeToRead;
+        public Ranges newPermanentlyUnsafeToRead;
         public CommandStores.RangesForEpoch newRangesForEpoch;
 
         public String toString()
@@ -162,6 +169,8 @@ public interface Journal
                 
builder.append("newBootstrapBeganAt=").append(newBootstrapBeganAt).append(", ");
             if (newSafeToRead != null)
                 
builder.append("newSafeToRead=").append(newSafeToRead).append(", ");
+            if (newPermanentlyUnsafeToRead != null)
+                
builder.append("newPermanentlyUnsafeToRead=").append(newPermanentlyUnsafeToRead).append(",
 ");
             if (newRangesForEpoch != null)
                 
builder.append("newRangesForEpoch=").append(newRangesForEpoch).append(", ");
             builder.setLength(builder.length() - 2);
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java 
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index bd6cb3fd..e5b49dd5 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -55,7 +55,6 @@ public class MaybeRecover extends CheckShards<Outcome, 
Route<?>>
         this.recoverIfAlreadyDurable = recoverIfAlreadyDurable;
         this.reportTo = reportTo;
     }
-
     public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf 
invalidIf, Route<?> someRoute, ProgressToken prevProgress, boolean 
recoverIfAlreadyDurable, LatentStoreSelector reportTo, BiConsumer<? super 
Outcome, Throwable> callback)
     {
         MaybeRecover maybeRecover;
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java 
b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
index 24d52839..7d3c7fd6 100644
--- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
@@ -214,6 +214,9 @@ extends SafeCommandStore
         if (fieldUpdates.newRangesForEpoch != null)
             super.setRangesForEpoch(fieldUpdates.newRangesForEpoch);
 
+        if (fieldUpdates.newPermanentlyUnsafeToRead != null)
+            
super.setPermanentlyUnsafeToRead(fieldUpdates.newPermanentlyUnsafeToRead);
+
         fieldUpdates = null;
     }
 
@@ -243,6 +246,12 @@ extends SafeCommandStore
         ensureFieldUpdates().newSafeToRead = newSafeToRead;
     }
 
+    @Override
+    public final void setPermanentlyUnsafeToRead(Ranges 
newPermanentlyUnsafeToRead)
+    {
+        ensureFieldUpdates().newPermanentlyUnsafeToRead = 
newPermanentlyUnsafeToRead;
+    }
+
     @Override
     public void setRangesForEpoch(RangesForEpoch rangesForEpoch)
     {
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 6cb67370..d6a99a42 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -188,6 +188,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
      * But they may still be ordered for other key ranges they participate in.
      */
     private NavigableMap<Timestamp, Ranges> safeToRead = emptySafeToRead();
+    protected Ranges permanentlyUnsafeToRead = Ranges.EMPTY;
     private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new 
DeterministicIdentitySet<>());
 
     static class WaitingOnVisibility
@@ -248,6 +249,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         maxConflicts = MaxConflicts.EMPTY;
         maxDecidedRX = MaxDecidedRX.EMPTY;
         safeToRead = emptySafeToRead();
+        permanentlyUnsafeToRead = Ranges.EMPTY;
         listeners.clear();
         waitingOnVisibility.values().forEach(w -> w.invalid = true);
         waitingOnVisibility.clear();
@@ -286,6 +288,11 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         return rangesForEpoch;
     }
 
+    public Ranges unsafeGetPermanentlyUnsafeToRead()
+    {
+        return permanentlyUnsafeToRead;
+    }
+
     public MaxDecidedRX unsafeGetMaxDecidedRX()
     {
         return maxDecidedRX;
@@ -308,6 +315,17 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         unsafeSetRangesForEpoch(newRangesForEpoch);
     }
 
+    protected final void unsafeClearPermanentlyUnsafeToRead()
+    {
+        permanentlyUnsafeToRead = null;
+    }
+
+    protected void loadPermanentlyUnsafeToRead(Ranges 
newPermanentlyUnsafeToRead)
+    {
+        Invariants.require(this.permanentlyUnsafeToRead == null);
+        unsafeSetPermanentlyUnsafeToRead(newPermanentlyUnsafeToRead);
+    }
+
     public abstract boolean inStore();
 
     public boolean tryExecuteImmediately(Runnable run)
@@ -406,12 +424,19 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
     /**
      * This method may be invoked on a non-CommandStore thread
      */
-    final void unsafeSetSafeToRead(NavigableMap<Timestamp, Ranges> 
newSafeToRead)
+    final void unsafeSetSafeToRead(@Nullable NavigableMap<Timestamp, Ranges> 
newSafeToRead)
     {
+        if (newSafeToRead != null)
+            newSafeToRead = purgeHistory(newSafeToRead, 
permanentlyUnsafeToRead);
         this.safeToRead = newSafeToRead;
         node.updateStamp();
     }
 
+    final void unsafeSetPermanentlyUnsafeToRead(Ranges 
newPermanentlyUnsafeToRead)
+    {
+        this.permanentlyUnsafeToRead = newPermanentlyUnsafeToRead;
+    }
+
     protected final void unsafeClearSafeToRead()
     {
         unsafeSetSafeToRead(null);
@@ -1227,6 +1252,14 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         }
     }
 
+    final AsyncChain<Void> markPermanentlyUnsafeToRead(Ranges ranges)
+    {
+        return chain((Empty) () -> "Mark Range As Permanently Unsafe To Read", 
safeStore -> {
+            safeStore.setSafeToRead(purgeHistory(safeToRead, ranges));
+            
safeStore.setPermanentlyUnsafeToRead(permanentlyUnsafeToRead.union(MERGE_ADJACENT,
 ranges));
+        });
+    }
+
     public final DataStore unsafeGetDataStore()
     {
         return dataStore;
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 69c05478..dd2e353d 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -24,7 +24,6 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
@@ -80,6 +79,7 @@ import org.agrona.collections.Hashing;
 import org.agrona.collections.Int2IntHashMap;
 import org.agrona.collections.Int2ObjectHashMap;
 
+import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
 import static accord.topology.EpochReady.done;
 import static accord.api.DataStore.FetchKind.Sync;
 import static 
accord.local.CommandStores.BootstrapRangeAction.BOOTSTRAP_NOT_NEEDED;
@@ -93,7 +93,6 @@ import static java.util.stream.Collectors.toList;
  */
 public abstract class CommandStores implements AsyncExecutorFactory
 {
-    @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(CommandStores.class);
 
     public interface LatentStoreSelector
@@ -127,6 +126,8 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
          * return null if we do not intersect this shard on the specified 
epochs
          */
         Ranges ranges(ShardHolder shard);
+
+        default @Nullable long minEpoch() { return -1L; };
     }
 
     public interface UnrestrictedStoreSelector extends StoreSelector
@@ -146,11 +147,13 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
             this.maxEpoch = maxEpoch;
         }
 
+        @Override
         public StoreSelection select(Snapshot snapshot)
         {
             return StoreFinder.find(snapshot, keysOrRanges);
         }
 
+        @Override
         public @Nullable Ranges ranges(ShardHolder shard)
         {
             Ranges ranges = shard.ranges().allBetween(minEpoch, maxEpoch);
@@ -158,6 +161,12 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
                 return null;
             return ranges;
         }
+
+        @Override
+        public long minEpoch()
+        {
+            return minEpoch;
+        }
     }
 
     public static class IncludingSpecificStoreSelector implements 
LatentStoreSelector
@@ -354,24 +363,22 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
     public static class ShardHolder
     {
         public final CommandStore store;
+        public @Nullable final Ranges regainsRanges;
         RangesForEpoch ranges;
 
-        ShardHolder(CommandStore store)
+        ShardHolder(CommandStore store, @Nullable Ranges regainsRanges)
         {
             this.store = store;
+            this.regainsRanges = regainsRanges;
         }
 
-        public ShardHolder(CommandStore store, RangesForEpoch ranges)
+        public ShardHolder(CommandStore store, RangesForEpoch ranges, 
@Nullable Ranges regainsRanges)
         {
             this.store = store;
+            this.regainsRanges = regainsRanges;
             this.ranges = ranges;
         }
 
-        public ShardHolder withStoreUnsafe(CommandStore store)
-        {
-            return new ShardHolder(store, ranges);
-        }
-
         public RangesForEpoch ranges()
         {
             return ranges;
@@ -388,12 +395,109 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         RangesForEpoch ranges();
     }
 
+    public static final class PreviouslyOwned
+    {
+        public static final PreviouslyOwned EMPTY = new PreviouslyOwned(0, 
RangesForEpoch.EMPTY.epochs, RangesForEpoch.EMPTY.ranges);
+        final long maxEpoch;
+        final long[] epochs; // the epoch upon which it was last owned
+        final Ranges[] ranges;
+
+        public PreviouslyOwned(long maxEpoch, long[] epochs, Ranges[] ranges)
+        {
+            this.maxEpoch = maxEpoch;
+            this.epochs = epochs;
+            this.ranges = ranges;
+        }
+
+        PreviouslyOwned prepend(long epoch, Ranges ranges)
+        {
+            Invariants.require(epochs.length == 0 || epoch > epochs[0]);
+            long[] newEpochs = new long[this.epochs.length + 1];
+            Ranges[] newRanges = new Ranges[this.epochs.length + 1];
+            newEpochs[0] = epoch;
+            newRanges[0] = ranges;
+            System.arraycopy(this.epochs, 0, newEpochs, 1, this.epochs.length);
+            System.arraycopy(this.ranges, 0, newRanges, 1, this.ranges.length);
+            return new PreviouslyOwned(epoch, newEpochs, newRanges);
+        }
+
+        public boolean overlaps(long epoch, Unseekables<?> test)
+        {
+            if (epoch > maxEpoch)
+                return false;
+
+            for (int i = 0 ; i < epochs.length && epoch <= epochs[i] ; ++i)
+            {
+                if (this.ranges[i].intersects(test))
+                    return true;
+            }
+
+            return false;
+        }
+
+        private boolean overlapsDeletedCommandStore(Ranges shardRanges, long 
epoch, Unseekables<?> test)
+        {
+            if (epoch > maxEpoch)
+                return false;
+
+            for (int i = 0 ; i < epochs.length && epoch <= epochs[i] ; ++i)
+            {
+                if (this.ranges[i].intersects(test) && 
!shardRanges.containsAll(this.ranges[i].overlapping(test)))
+                    return true;
+            }
+
+            return false;
+        }
+
+        public Ranges regains(Ranges overlapping)
+        {
+            Ranges regains = Ranges.EMPTY;
+            for (Ranges rs : this.ranges)
+                regains = regains.with(rs.slice(overlapping, Minimal));
+            return regains;
+        }
+
+        public int size()
+        {
+            return epochs.length;
+        }
+
+        public long epochs(int i)
+        {
+            return epochs[i];
+        }
+
+        public Ranges ranges(int i)
+        {
+            return ranges[i];
+        }
+
+        @Override
+        public boolean equals(Object that)
+        {
+            return that instanceof PreviouslyOwned && equals((PreviouslyOwned) 
that);
+        }
+
+        public boolean equals(PreviouslyOwned that)
+        {
+            return Arrays.equals(this.ranges, that.ranges)
+                   && Arrays.equals(this.epochs, that.epochs);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     // We ONLY remove ranges to keep logic manageable; likely to only merge 
CommandStores into a new CommandStore via some kind of Bootstrap
     public static class RangesForEpoch
     {
+        public static final RangesForEpoch EMPTY = new RangesForEpoch(new 
long[0], new Ranges[0]);
+
         final long[] epochs;
         final Ranges[] ranges;
-        public static final RangesForEpoch EMPTY = new RangesForEpoch(new 
long[0], new Ranges[0]);
 
         public RangesForEpoch(long epoch, Ranges ranges)
         {
@@ -425,7 +529,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
             if (this == object) return true;
             if (object == null || getClass() != object.getClass()) return 
false;
             RangesForEpoch that = (RangesForEpoch) object;
-            return Objects.deepEquals(epochs, that.epochs) && 
Objects.deepEquals(ranges, that.ranges);
+            return Arrays.equals(epochs, that.epochs) && Arrays.equals(ranges, 
that.ranges);
         }
 
         @Override
@@ -632,15 +736,17 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         final Int2IntHashMap byId;
         private final int[] indexForRange;
         final SearchableRangeList lookupByRange;
+        final Ranges shardRanges;
 
-        public Snapshot(ShardHolder[] shards, Topology local, Topology global)
+        public Snapshot(ShardHolder[] shards, Topology local, Topology global, 
PreviouslyOwned previouslyOwned)
         {
-            super(asMap(shards), global);
+            super(asMap(shards), global, previouslyOwned);
             this.local = local;
             this.shards = shards;
             this.byId = new Int2IntHashMap(shards.length, 
Hashing.DEFAULT_LOAD_FACTOR, -1);
             int count = 0;
             int prevId = -1;
+            Ranges shardRanges = Ranges.EMPTY;
             for (int i = 0 ; i < shards.length ; ++i)
             {
                 ShardHolder shard = shards[i];
@@ -649,7 +755,9 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
                 byId.put(id, i);
                 count += shard.ranges.all().size();
                 prevId = id;
+                shardRanges = shardRanges.union(MERGE_ADJACENT, 
shard.ranges.all());
             }
+            this.shardRanges = shardRanges;
             class RangeAndIndex
             {
                 final Range range;
@@ -685,7 +793,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         // This method exists to ensure we do not hold references to command 
stores
         public Journal.TopologyUpdate asTopologyUpdate()
         {
-            return new Journal.TopologyUpdate(commandStores, global);
+            return new Journal.TopologyUpdate(commandStores, global, 
previouslyOwned);
         }
 
         private static Int2ObjectHashMap<CommandStores.RangesForEpoch> 
asMap(ShardHolder[] shards)
@@ -725,7 +833,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         this.supplier = supplier;
         this.shardDistributor = shardDistributor;
 
-        this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, 
Topology.EMPTY);
+        this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, 
Topology.EMPTY, PreviouslyOwned.EMPTY);
         this.journal = journal;
     }
 
@@ -809,6 +917,8 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
 
         List<Supplier<EpochReady>> bootstrapUpdates = new ArrayList<>();
         List<ShardHolder> result = new ArrayList<>(prev.shards.length + 
added.size());
+        PreviouslyOwned previouslyOwned = prev.previouslyOwned;
+
         for (ShardHolder shard : prev.shards)
         {
             Ranges current = shard.ranges().currentRanges();
@@ -818,8 +928,14 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
                 // TODO (required): This is updating the a non-volatile field 
in the previous Snapshot, why modify it at all, even with volatile the 
guaranteed visibility is weak even with mutual exclusion
                 shard.ranges = shard.ranges().withRanges(newTopology.epoch(), 
current.without(subtracted));
                 shard.store.epochUpdateHolder.remove(epoch, shard.ranges, 
removeRanges);
+
                 bootstrapUpdates.add(shard.store.unbootstrap(epoch, 
removeRanges));
             }
+
+            Ranges regainedRanges = shard.ranges().all().slice(added, Minimal);
+            if (!regainedRanges.isEmpty())
+                bootstrapUpdates.add(() -> EpochReady.all(epoch, 
shard.store.markPermanentlyUnsafeToRead(regainedRanges).beginAsResult()));
+
             // TODO (desired): only sync affected shards
             Ranges ranges = shard.ranges().currentRanges();
             // ranges can be empty when ranges are lost or consolidated across 
epochs.
@@ -828,6 +944,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
                 logger.debug("Epoch {} requires visibility sync for {}", 
epoch, ranges);
                 
bootstrapUpdates.add(shard.store.refreshReadyToCoordinate(node, ranges, epoch));
             }
+
             result.add(shard);
         }
 
@@ -839,7 +956,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
                 EpochUpdateHolder updateHolder = new EpochUpdateHolder();
                 RangesForEpoch rangesForEpoch = new RangesForEpoch(epoch, 
addRanges);
                 updateHolder.add(epoch, rangesForEpoch, addRanges);
-                ShardHolder shard = new ShardHolder(supplier.create(nextId++, 
updateHolder));
+                ShardHolder shard = new ShardHolder(supplier.create(nextId++, 
updateHolder), previouslyOwned.regains(addRanges));
                 shard.ranges = rangesForEpoch;
 
                 Map<BootstrapRangeAction, Ranges> partitioned = 
addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global, 
newLocalTopology, range), BootstrapRangeAction.class);
@@ -873,7 +990,11 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
                 );
             };
         }
-        return new TopologyUpdate(new Snapshot(result.toArray(new 
ShardHolder[0]), newLocalTopology, newTopology), bootstrap);
+
+        if (!subtracted.isEmpty())
+            previouslyOwned = previouslyOwned.prepend(epoch - 1, subtracted);
+
+        return new TopologyUpdate(new Snapshot(result.toArray(new 
ShardHolder[0]), newLocalTopology, newTopology, previouslyOwned), bootstrap);
     }
 
     private static boolean requiresSync(Ranges ranges, Topology oldTopology, 
Topology newTopology)
@@ -989,6 +1110,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
     {
         Snapshot snapshot = current;
         StoreSelection selection = selector.select(snapshot);
+        long minEpoch = selector.minEpoch();
         AsyncChain<O> chain = null;
         for (int i = selection.firstSetBit(); i >= 0 ; i = 
selection.nextSetBit(i + 1, -1))
         {
@@ -997,10 +1119,17 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
             if (ranges == null)
                 continue;
 
+            if (minEpoch >= 0 && unsafelyTouchesRegainedRanges(snapshot, 
shard, mapReduceConsume.scope, minEpoch))
+                return AsyncChains.failure(new 
OverlappingCommandStoresException());
+
             AsyncChain<O> next = mapReduceConsume.applyAsync(ranges, 
shard.store);
             if (next != null)
                 chain = chain != null ? AsyncChains.reduce(chain, next, 
mapReduceConsume) : next;
         }
+
+        if (minEpoch >= 0 && 
snapshot.previouslyOwned.overlapsDeletedCommandStore(snapshot.shardRanges, 
minEpoch, mapReduceConsume.scope))
+            return AsyncChains.failure(new DeletedCommandStoresException());
+
         return chain == null ? AsyncChains.success(null) : chain;
     }
 
@@ -1022,6 +1151,18 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         return accumulator;
     }
 
+    private static boolean unsafelyTouchesRegainedRanges(Snapshot snapshot, 
ShardHolder shard, Unseekables<?> unseekables, long minEpoch)
+    {
+        if (shard.regainsRanges == null)
+            return false;
+
+        unseekables = unseekables.slice(shard.regainsRanges, Minimal);
+        if (unseekables.isEmpty())
+            return false;
+
+        return snapshot.previouslyOwned.overlaps(minEpoch, unseekables);
+    }
+
     /**
      * Initialize topology from snapshot on boot.
      */
@@ -1033,19 +1174,20 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         int maxId = -1;
         for (Map.Entry<Integer, RangesForEpoch> e : 
update.commandStores.entrySet())
         {
-            Invariants.require(e.getValue() != null);
-            EpochUpdateHolder epochUpdates = new EpochUpdateHolder();
-            ShardHolder shard = new ShardHolder(supplier.create(e.getKey(), 
epochUpdates), e.getValue());
+            RangesForEpoch rfe = e.getValue();
+            Invariants.require(rfe != null);
+            EpochUpdateHolder holder = new EpochUpdateHolder();
+            ShardHolder shard = new ShardHolder(supplier.create(e.getKey(), 
holder), rfe, update.previouslyOwned.regains(rfe.currentRanges()));
             // TODO (required): if the add is necessary (highly unlikely) it 
needs to be done once journal is writeable so we NEED to move this
             if (!shard.ranges.equals(shard.store.rangesForEpoch))
-                epochUpdates.add(1, e.getValue(), e.getValue().all());
+                holder.add(1, e.getValue(), rfe.all());
             maxId = Math.max(maxId, e.getKey());
             shards[i++] = shard;
         }
         Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id));
 
         nextId = maxId + 1;
-        loadSnapshot(new Snapshot(shards, 
update.global.forNode(supplier.node.id()).trim(), update.global));
+        loadSnapshot(new Snapshot(shards, 
update.global.forNode(supplier.node.id()).trim(), update.global, 
update.previouslyOwned));
     }
 
     public synchronized void resetTopology(Journal.TopologyUpdate update)
@@ -1057,11 +1199,11 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         for (Map.Entry<Integer, RangesForEpoch> e : 
update.commandStores.entrySet())
         {
             int storeId = e.getKey();
-            RangesForEpoch ranges = e.getValue();
-            Invariants.require(ranges != null);
-            ShardHolder shard = new ShardHolder(current.byId(storeId), ranges);
+            RangesForEpoch rfe = e.getValue();
+            Invariants.require(rfe != null);
+            ShardHolder shard = new ShardHolder(current.byId(storeId), rfe, 
update.previouslyOwned.regains(rfe.all()));
             EpochUpdateHolder holder = shard.store.epochUpdateHolder;
-            ranges.forEach(new BiConsumer<>()
+            rfe.forEach(new BiConsumer<>()
             {
                 RangesForEpoch accumulator = null;
                 Ranges prev = null;
@@ -1094,7 +1236,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         }
 
         nextId = maxId + 1;
-        loadSnapshot(new Snapshot(shards, current.local, current.global));
+        loadSnapshot(new Snapshot(shards, current.local, current.global, 
update.previouslyOwned));
     }
 
     public synchronized Supplier<EpochReady> updateTopology(Node node, 
Topology newTopology)
@@ -1126,6 +1268,26 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         shuttingDown = true;
     }
 
+    public synchronized void removeCommandStoresBefore(long beforeEpoch)
+    {
+        List<ShardHolder> keep = new ArrayList<>(current.shards.length);
+        for (ShardHolder shard : current)
+        {
+            RangesForEpoch ranges = shard.ranges;
+            int size = ranges.size();
+            long lastEpochWithOwnedRange = ranges.epochAtIndex(size - 1) - 1;
+            if (!ranges.rangesAtIndex(size - 1).isEmpty() || 
lastEpochWithOwnedRange >= beforeEpoch)
+                keep.add(shard);
+        }
+
+        if (keep.size() != current.shards.length)
+        {
+            Snapshot snapshot = new Snapshot(keep.toArray(new ShardHolder[0]), 
current().local, current.global, current.previouslyOwned);
+            journal.saveTopology(snapshot.asTopologyUpdate(), null);
+            current = snapshot;
+        }
+    }
+
     public void shutdown()
     {
         markShuttingDown();
@@ -1162,10 +1324,14 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         return all;
     }
 
+    @Nullable
     public CommandStore forId(int id)
     {
         Snapshot snapshot = current;
-        return snapshot.shards[snapshot.byId.get(id)].store;
+        int shardIndex = snapshot.byId.get(id);
+        if (shardIndex == -1)
+            return null;
+        return snapshot.shards[shardIndex].store;
     }
 
     public int[] ids()
@@ -1200,6 +1366,12 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         throw new IllegalArgumentException();
     }
 
+    @VisibleForTesting
+    public PreviouslyOwned getPreviouslyOwnedFromSnapshot()
+    {
+        return current.previouslyOwned;
+    }
+
     protected Snapshot current()
     {
         return current;
diff --git 
a/accord-core/src/main/java/accord/local/DeletedCommandStoresException.java 
b/accord-core/src/main/java/accord/local/DeletedCommandStoresException.java
new file mode 100644
index 00000000..cefdbd8e
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/DeletedCommandStoresException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.utils.Rethrowable;
+
+public class DeletedCommandStoresException extends RuntimeException implements 
Rethrowable<DeletedCommandStoresException>
+{
+    public DeletedCommandStoresException()
+    {
+    }
+
+    private DeletedCommandStoresException(Throwable cause)
+    {
+        super(cause);
+    }
+
+    @Override
+    public DeletedCommandStoresException rethrowable()
+    {
+        return new DeletedCommandStoresException(this);
+    }
+}
diff --git 
a/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java 
b/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java
new file mode 100644
index 00000000..f1e358d6
--- /dev/null
+++ 
b/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.utils.Rethrowable;
+
+public class OverlappingCommandStoresException extends RuntimeException 
implements Rethrowable<OverlappingCommandStoresException>
+{
+    public OverlappingCommandStoresException()
+    {
+    }
+
+    private OverlappingCommandStoresException(Throwable cause)
+    {
+        super(cause);
+    }
+
+    @Override
+    public OverlappingCommandStoresException rethrowable()
+    {
+        return new OverlappingCommandStoresException(this);
+    }
+}
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 239c3ff1..c4de4314 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -33,6 +33,7 @@ import accord.api.DataStore;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
+import accord.local.CommandStores.RangesForEpoch;
 import accord.local.CommandStores.RangesForEpochSupplier;
 import accord.local.RedundantBefore.RedundantBeforeSupplier;
 import accord.local.cfk.CommandsForKey;
@@ -557,7 +558,7 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
         if (safeCommand != null && safeCommand.current().known().route() != 
MaybeRoute)
             return;
 
-        CommandStores.RangesForEpoch rangesForEpoch = safeStore.ranges();
+        RangesForEpoch rangesForEpoch = safeStore.ranges();
         // TODO (required): this is incompatible with rebootstrap - we need to 
use some additional condition
         witnessedBy = witnessedBy.without(rangesForEpoch.coordinates(txnId));  
// already coordinates, no need to replicate
         if (witnessedBy.isEmpty())
@@ -601,6 +602,11 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
         commandStore().unsafeSetSafeToRead(newSafeToRead);
     }
 
+    public void setPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead)
+    {
+        
commandStore().unsafeSetPermanentlyUnsafeToRead(newPermanentlyUnsafeToRead);
+    }
+
     public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
     {
         commandStore().unsafeSetRangesForEpoch(rangesForEpoch);
@@ -613,7 +619,7 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
     public abstract Agent agent();
     public abstract ProgressLog progressLog();
     public abstract NodeCommandStoreService node();
-    public abstract CommandStores.RangesForEpoch ranges();
+    public abstract RangesForEpoch ranges();
 
     protected NavigableMap<TxnId, Ranges> bootstrapBeganAt()
     {
diff --git a/accord-core/src/main/java/accord/topology/ActiveEpoch.java 
b/accord-core/src/main/java/accord/topology/ActiveEpoch.java
index 04333fca..2a39855f 100644
--- a/accord-core/src/main/java/accord/topology/ActiveEpoch.java
+++ b/accord-core/src/main/java/accord/topology/ActiveEpoch.java
@@ -44,9 +44,9 @@ public final class ActiveEpoch
 
     final QuorumTracker quorumReadyTracker;
     final SimpleBitSet shardQuorumReady, receivedNodeReady;
-    private Ranges quorumReady;
+    private volatile Ranges quorumReady;
 
-    private Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
+    private volatile Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
     private volatile boolean allRetired;
 
     public boolean allRetired()
diff --git a/accord-core/src/main/java/accord/topology/ActiveEpochs.java 
b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
index a9fbd1b6..228c7f9b 100644
--- a/accord-core/src/main/java/accord/topology/ActiveEpochs.java
+++ b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
@@ -61,19 +61,34 @@ public final class ActiveEpochs implements 
Iterable<ActiveEpoch>
     // Epochs are sorted in _descending_ order
     final ActiveEpoch[] epochs;
 
-    ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long 
prevFirstNonEmptyEpoch)
+    private ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long 
firstNonEmptyEpoch)
     {
         this.manager = manager;
         this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
-        if (prevFirstNonEmptyEpoch != -1)
-            this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch;
-        else if (epochs.length > 0 && !epochs[0].all().isEmpty())
-            this.firstNonEmptyEpoch = currentEpoch;
-        else
-            this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch;
-
+        this.firstNonEmptyEpoch = firstNonEmptyEpoch;
+        this.epochs = epochs;
         for (int i = 1; i < epochs.length; i++)
             Invariants.requireArgument(epochs[i].epoch() == epochs[i - 
1].epoch() - 1);
+    }
+
+    static ActiveEpochs empty(TopologyManager manager)
+    {
+        return new ActiveEpochs(manager, new ActiveEpoch[0], -1);
+    }
+
+    ActiveEpochs withNewEpochs(ActiveEpoch[] epochs)
+    {
+        long firstNonEmptyEpoch = this.firstNonEmptyEpoch;
+        if (firstNonEmptyEpoch == -1 && epochs.length > 0 && 
!epochs[0].all().isEmpty())
+        {
+            Invariants.require(epochs.length == 1);
+            firstNonEmptyEpoch = epochs[0].epoch();
+        }
+        return new ActiveEpochs(manager, epochs, firstNonEmptyEpoch);
+    }
+
+    ActiveEpochs maybeTruncate()
+    {
         int truncateFrom = -1;
         // > 0 because we do not want to be left without epochs in case 
they're all empty
         for (int i = epochs.length - 1; i > 0; i--)
@@ -85,28 +100,26 @@ public final class ActiveEpochs implements 
Iterable<ActiveEpoch>
         }
 
         if (truncateFrom == -1)
+            return this;
+
+        ActiveEpoch[] newEpochs = Arrays.copyOf(epochs, truncateFrom);
+        for (int i = truncateFrom; i < epochs.length; i++)
         {
-            this.epochs = epochs;
+            ActiveEpoch e = epochs[i];
+            Invariants.require(epochs[i].isQuorumReady());
+            logger.info("Retired epoch {} with added/removed ranges {}/{}. 
Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, 
e.all.ranges, e.closed());
         }
-        else
+        if (logger.isTraceEnabled())
         {
-            this.epochs = Arrays.copyOf(epochs, truncateFrom);
-            for (int i = truncateFrom; i < epochs.length; i++)
+            for (int i = 0; i < truncateFrom; i++)
             {
                 ActiveEpoch e = epochs[i];
-                Invariants.require(epochs[i].isQuorumReady());
-                logger.info("Retired epoch {} with added/removed ranges {}/{}. 
Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, 
e.all.ranges, e.closed());
-            }
-            if (logger.isTraceEnabled())
-            {
-                for (int i = 0; i < truncateFrom; i++)
-                {
-                    ActiveEpoch e = epochs[i];
-                    Invariants.require(e.isQuorumReady());
-                    logger.trace("Leaving epoch {} with added/removed ranges 
{}/{}", e.epoch(), e.addedRanges, e.removedRanges);
-                }
+                Invariants.require(e.isQuorumReady());
+                logger.trace("Leaving epoch {} with added/removed ranges 
{}/{}", e.epoch(), e.addedRanges, e.removedRanges);
             }
         }
+
+        return withNewEpochs(newEpochs);
     }
 
     public boolean isEmpty()
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index 3f993bc7..0fd504fd 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -766,6 +766,23 @@ public class Topology
             forEach.accept(shards[i]);
     }
 
+    public static Map<Id, Ranges> computeNodeAdditions(Topology current, 
Topology next)
+    {
+        Map<Id, Ranges> additions = new HashMap<>();
+        for (Id node : next.nodes())
+        {
+            Ranges prev = current.rangesForNode(node);
+            if (prev == null) prev = Ranges.EMPTY;
+
+            Ranges added = next.rangesForNode(node).without(prev);
+            if (added.isEmpty())
+                continue;
+
+            additions.put(node, added);
+        }
+        return additions;
+    }
+
     public SortedArrayList<Id> nodes()
     {
         return nodes;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 263e64ba..0746104a 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -19,6 +19,7 @@
 package accord.topology;
 
 import java.util.IdentityHashMap;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -52,6 +53,9 @@ import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 import accord.utils.async.NestedAsyncResult;
 
+import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
+import static accord.primitives.Routables.Slice.Minimal;
+
 /**
  * Manages topology state changes and update bookkeeping
  *
@@ -99,7 +103,7 @@ public class TopologyManager
         this.time = time;
         this.timeouts = timeouts;
         this.topologyService = topologyService;
-        this.active = new ActiveEpochs(this, new ActiveEpoch[0], -1);
+        this.active = ActiveEpochs.empty(this);
         this.pending = new PendingEpochs(this);
     }
 
@@ -169,6 +173,7 @@ public class TopologyManager
 
     private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId 
txnId)
     {
+        long removeCommandStoresBefore = 0;
         Topology topology = null;
         synchronized (this)
         {
@@ -193,28 +198,20 @@ public class TopologyManager
             if (epoch > active.currentEpoch)
                 ranges = pending.retired(ranges, epoch);
             ranges = active.retired(ranges, epoch);
+            ActiveEpochs truncated = active.maybeTruncate();
+            if (truncated != active)
+            {
+                active = truncated;
+                removeCommandStoresBefore = truncated.minEpoch();
+            }
         }
         if (!ranges.isEmpty())
         {
             for (TopologyListener listener : listeners)
                 listener.onEpochRetired(ranges, epoch, topology);
         }
-    }
-
-    public synchronized void truncateTopologiesUntil(long epoch)
-    {
-        ActiveEpochs current = active;
-        Invariants.requireArgument(current.epoch() >= epoch, "Unable to 
truncate; epoch %d is > current epoch %d", epoch, current.epoch());
-
-        if (current.minEpoch() >= epoch)
-            return;
-
-        int newLen = current.epochs.length - (int) (epoch - 
current.minEpoch());
-        Invariants.require(current.epochs[newLen - 1].isQuorumReady(), "Epoch 
%d is not ready to coordinate", current.epochs[newLen - 1].epoch());
-
-        ActiveEpoch[] nextEpochs = new ActiveEpoch[newLen];
-        System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
-        active = new ActiveEpochs(this, nextEpochs, 
current.firstNonEmptyEpoch);
+        if (removeCommandStoresBefore > 0)
+            
node.commandStores().removeCommandStoresBefore(removeCommandStoresBefore);
     }
 
     public TopologySorter.Supplier sorter()
@@ -308,6 +305,63 @@ public class TopologyManager
         updateActive();
     }
 
+    public static class RegainingEpochRange
+    {
+        public final long epoch;
+        public final Ranges ranges;
+
+        public RegainingEpochRange(long epoch, Ranges ranges)
+        {
+            this.epoch = epoch;
+            this.ranges = ranges;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        public Ranges ranges()
+        {
+            return ranges;
+        }
+    }
+
+    @Nullable
+    public RegainingEpochRange computeRegaining(Topology current, Topology 
next)
+    {
+        Map<Id, Ranges> additions = Topology.computeNodeAdditions(current, 
next);
+        long greatestEpoch = -1;
+        Ranges ranges = Ranges.EMPTY;
+
+        ActiveEpochs active = this.active;
+        for (Map.Entry<Id, Ranges> entry : additions.entrySet())
+        {
+            Ranges addingForNode = entry.getValue();
+            for (ActiveEpoch e : active)
+            {
+                addingForNode = 
addingForNode.without(e.removedRanges).without(e.retired());
+                if (addingForNode.isEmpty())
+                    break;
+
+                Ranges existingForNode = e.all().rangesForNode(entry.getKey());
+                Ranges regainingForNode = addingForNode.slice(existingForNode, 
Minimal);
+                if (!regainingForNode.isEmpty())
+                {
+                    greatestEpoch = Math.max(greatestEpoch, e.epoch());
+                    ranges = ranges.union(MERGE_ADJACENT, regainingForNode);
+                    addingForNode = addingForNode.without(regainingForNode);
+                }
+                addingForNode = addingForNode.without(e.addedRanges);
+            }
+        }
+
+        if (greatestEpoch != -1)
+            return new RegainingEpochRange(greatestEpoch, ranges);
+
+        return null;
+    }
+
     private final AtomicBoolean updatingActive = new AtomicBoolean();
     private void updateActive()
     {
@@ -376,7 +430,7 @@ public class TopologyManager
                         }
                     }
 
-                    this.active = new ActiveEpochs(this, next, 
prev.firstNonEmptyEpoch);
+                    this.active = prev.withNewEpochs(next);
                     this.pending.removeFirst(topology.epoch);
                 }
 
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 22a29e5b..b80b9824 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -85,6 +85,7 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
 
     public void validateShardStateForTesting(Journal.TopologyUpdate lastUpdate)
     {
+        PreviouslyOwned previouslyOwned = lastUpdate.previouslyOwned;
         ShardHolder[] shards = new 
ShardHolder[lastUpdate.commandStores.size()];
         int i = 0;
         for (Map.Entry<Integer, RangesForEpoch> e : 
lastUpdate.commandStores.entrySet())
@@ -92,20 +93,26 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             Snapshot current = current();
             RangesForEpoch ranges = e.getValue();
             DelayedCommandStore commandStore = null;
+            Ranges regainingRanges = null;
             for (ShardHolder shard : current)
             {
                 if (shard.ranges().equals(ranges))
+                {
+                    Invariants.require(commandStore == null);
                     commandStore = (DelayedCommandStore) shard.store;
+                    regainingRanges = shard.regainsRanges;
+                }
             }
             Invariants.nonNull(commandStore, "Each set of ranges should have a 
corresponding command store, but %d did not:(%s)",
                                ranges, Arrays.toString(shards))
                       .restore();
-            ShardHolder shard = new ShardHolder(commandStore, e.getValue());
+            
Invariants.require(previouslyOwned.regains(ranges.currentRanges()).equals(regainingRanges));
+            ShardHolder shard = new ShardHolder(commandStore, ranges, 
previouslyOwned.regains(ranges.currentRanges()));
             shards[i++] = shard;
         }
         Arrays.sort(shards, Comparator.comparingInt(shard -> 
shard.store.id()));
 
-        loadSnapshot(new Snapshot(shards, 
lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global));
+        loadSnapshot(new Snapshot(shards, 
lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global, 
lastUpdate.previouslyOwned));
     }
 
     protected void loadSnapshot(Snapshot nextSnapshot)
@@ -139,6 +146,12 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
                 RangesForEpoch loaded = next.unsafeGetRangesForEpoch();
                 Invariants.require(orig.equals(loaded), "%s should equal %s", 
loaded, orig);
             }
+
+            {
+                Ranges orig = prev.unsafeGetPermanentlyUnsafeToRead();
+                Ranges loaded = next.unsafeGetPermanentlyUnsafeToRead();
+                Invariants.require(orig.equals(loaded), "%s should equal %s", 
loaded, orig);
+            }
         }
 
         super.loadSnapshot(nextSnapshot);
@@ -258,8 +271,20 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             }
         }
 
+        @Override
+        protected void loadPermanentlyUnsafeToRead(Ranges 
newPermanentlyUnsafeToRead)
+        {
+            if (newPermanentlyUnsafeToRead == null) 
Invariants.require(super.permanentlyUnsafeToRead.isEmpty());
+            else
+            {
+                unsafeClearPermanentlyUnsafeToRead();
+                super.loadPermanentlyUnsafeToRead(newPermanentlyUnsafeToRead);
+            }
+        }
+
         public void restore()
         {
+            
loadPermanentlyUnsafeToRead(journal.loadPermanentlyUnsafeToRead(id()));
             loadRangesForEpoch(journal.loadRangesForEpoch(id()));
             loadRedundantBefore(journal.loadRedundantBefore(id()));
             loadBootstrapBeganAt(journal.loadBootstrapBeganAt(id()));
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 8539fbe1..a4aa44f1 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -262,6 +262,11 @@ public class InMemoryJournal implements Journal
     @Override
     public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush)
     {
+        // Ensure that we only read the latest topologyUpdate as that is what 
happens in the
+        // C* implementation
+        int lastIndex = topologyUpdates.size() - 1;
+        if (!topologyUpdates.isEmpty() && 
topologyUpdates.get(lastIndex).global.equals(topologyUpdate.global))
+            topologyUpdates.remove(lastIndex);
         topologyUpdates.add(topologyUpdate);
         if (onFlush != null)
             onFlush.run();
@@ -315,6 +320,15 @@ public class InMemoryJournal implements Journal
         return fieldStates.newRangesForEpoch;
     }
 
+    @Override
+    public Ranges loadPermanentlyUnsafeToRead(int commandStoreId)
+    {
+        FieldUpdates fieldStates = this.fieldStates.get(commandStoreId);
+        if (fieldStates == null)
+            return null;
+        return fieldStates.newPermanentlyUnsafeToRead;
+    }
+
     @Override
     public PersistentField.Persister<DurableBefore, DurableBefore> 
durableBeforePersister()
     {
@@ -329,6 +343,7 @@ public class InMemoryJournal implements Journal
             init.newRedundantBefore = RedundantBefore.EMPTY;
             init.newBootstrapBeganAt = ImmutableSortedMap.of(TxnId.NONE, 
Ranges.EMPTY);
             init.newSafeToRead = ImmutableSortedMap.of(Timestamp.NONE, 
Ranges.EMPTY);
+            init.newPermanentlyUnsafeToRead = Ranges.EMPTY;
             return init;
         });
 
@@ -340,6 +355,8 @@ public class InMemoryJournal implements Journal
             fieldStates.newBootstrapBeganAt = fieldUpdates.newBootstrapBeganAt;
         if (fieldUpdates.newRangesForEpoch != null)
             fieldStates.newRangesForEpoch = fieldUpdates.newRangesForEpoch;
+        if (fieldUpdates.newPermanentlyUnsafeToRead != null)
+            fieldStates.newPermanentlyUnsafeToRead = 
fieldUpdates.newPermanentlyUnsafeToRead;
 
         if (onFlush!= null)
             onFlush.run();
@@ -634,6 +651,10 @@ public class InMemoryJournal implements Journal
             Map<TxnId, List<Diff>> diffs = new TreeMap<>();
 
             InMemoryCommandStore commandStore = (InMemoryCommandStore) 
commandStores.forId(commandStoreId);
+
+            if (commandStore == null)
+                continue;
+
             Replayer replayer = commandStore.replayer(PART_NON_DURABLE);
 
             for (Map.Entry<TxnId, Diffs> e : diffEntry.getValue().entrySet())
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 3fd8669f..5563276f 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -165,6 +165,12 @@ public class LoggingJournal implements Journal
         return delegate.loadRangesForEpoch(commandStoreId);
     }
 
+    @Override
+    public Ranges loadPermanentlyUnsafeToRead(int store)
+    {
+        return delegate.loadPermanentlyUnsafeToRead(store);
+    }
+
     @Override
     public PersistentField.Persister<DurableBefore, DurableBefore> 
durableBeforePersister()
     {
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index c0beeaae..182a0d8a 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -47,12 +47,7 @@ import accord.impl.InMemoryCommandStore.Snapshot;
 import accord.impl.basic.NodeSink;
 import accord.impl.basic.SimulatedFault;
 import accord.impl.mock.Network;
-import accord.local.CommandStore;
-import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.LogUnavailableException;
-import accord.local.SafeCommandStore;
-import accord.local.TimeService;
+import accord.local.*;
 import accord.messages.MessageType;
 import accord.primitives.Ballot;
 import accord.primitives.Keys;
@@ -173,7 +168,7 @@ public class ListAgent implements InMemoryAgent, 
CoordinatorEventListener, Owner
         ownershipEventListener.onFailedBootstrap(attempt, phase, ranges, 
retry, fail, failure);
     }
 
-    private static final Set<Class<?>> expectedExceptions = new 
HashSet<>(Arrays.asList(SimulatedFault.class, 
ExecuteSyncPoint.SyncPointErased.class, CancellationException.class, 
TopologyRetiredException.class, TopologyMismatch.class, 
Snapshotter.SnapshotAborted.class, TimeoutException.class, 
LogUnavailableException.class));
+    private static final Set<Class<?>> expectedExceptions = new 
HashSet<>(Arrays.asList(SimulatedFault.class, 
ExecuteSyncPoint.SyncPointErased.class, CancellationException.class, 
TopologyRetiredException.class, TopologyMismatch.class, 
Snapshotter.SnapshotAborted.class, TimeoutException.class, 
LogUnavailableException.class, OverlappingCommandStoresException.class, 
DeletedCommandStoresException.class));
     @Override
     public void onException(Throwable t)
     {
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java 
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index 8a52495d..6189f788 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -283,13 +283,6 @@ public class TopologyManagerTest
         Assertions.assertTrue(service.active().hasEpoch(2));
         Assertions.assertTrue(service.active().hasEpoch(3));
         Assertions.assertTrue(service.active().hasEpoch(4));
-
-        service.truncateTopologiesUntil(3);
-        Assertions.assertFalse(service.active().hasEpoch(1));
-        Assertions.assertFalse(service.active().hasEpoch(2));
-        Assertions.assertTrue(service.active().hasEpoch(3));
-        Assertions.assertTrue(service.active().hasEpoch(4));
-
     }
 
     @Test
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java 
b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index a8f8490a..c778e707 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -413,33 +413,37 @@ public class TopologyRandomizer
         return ((PrefixedIntHashKey) shard.range.start()).prefix;
     }
 
-    private static Map<Id, Ranges> getAdditions(Topology current, Topology 
next)
+    private boolean validToReassignRange(Topology current, Shard[] nextShards, 
Map<Id, Ranges> previouslyReplicated)
     {
-        Map<Id, Ranges> additions = new HashMap<>();
-        for (Id node : next.nodes())
-        {
-            Ranges prev = current.rangesForNode(node);
-            if (prev == null) prev = Ranges.EMPTY;
-
-            Ranges added = next.rangesForNode(node).without(prev);
-            if (added.isEmpty())
-                continue;
+        Topology next = new Topology(current.epoch + 1, nextShards);
+        Map<Id, Ranges> additions = Topology.computeNodeAdditions(current, 
next);
 
-            additions.put(node, added);
+        for (Map.Entry<Id, Ranges> entry : additions.entrySet())
+        {
+            if (previouslyReplicated.getOrDefault(entry.getKey(), 
Ranges.EMPTY).intersects(entry.getValue())
+                    && !(previousEpochForRegainedRangeRetired(current, 
entry.getValue())))
+                return false;
         }
-        return additions;
+
+        return true;
     }
 
-    private static boolean reassignsRanges(Topology current, Shard[] 
nextShards, Map<Id, Ranges> previouslyReplicated)
+    private boolean previousEpochForRegainedRangeRetired(Topology current, 
Ranges regainingRanges)
     {
-        Topology next = new Topology(current.epoch + 1, nextShards);
-        Map<Id, Ranges> additions = getAdditions(current, next);
-
-        for (Map.Entry<Id, Ranges> entry : additions.entrySet())
+        for (Id id : current.nodes())
         {
-            if (previouslyReplicated.getOrDefault(entry.getKey(), 
Ranges.EMPTY).intersects(entry.getValue()))
+            Node node = this.nodeLookup.apply(id);
+            boolean isRetiredForNode = true;
+            for (ActiveEpoch epoch : node.topology().active())
+            {
+                if (epoch.all().ranges.intersects(regainingRanges) && 
!epoch.allRetired())
+                    isRetiredForNode = false;
+            }
+
+            if (isRetiredForNode)
                 return true;
         }
+
         return false;
     }
 
@@ -472,7 +476,7 @@ public class TopologyRandomizer
             Shard[] testShards = type.apply(state, random);
             Arrays.sort(testShards, (a, b) -> a.range.compareTo(b.range));
             if (!everyShardHasQuorumOverlaps(oldShards, testShards)
-                || reassignsRanges(current, testShards, previouslyReplicated))
+                || !validToReassignRange(current, testShards, 
previouslyReplicated))
             {
                 ++rejectedMutations;
             }
@@ -488,7 +492,7 @@ public class TopologyRandomizer
 
         Topology nextTopology = new Topology(current.epoch + 1, newShards);
 
-        Map<Id, Ranges> nextAdditions = getAdditions(current, nextTopology);
+        Map<Id, Ranges> nextAdditions = Topology.computeNodeAdditions(current, 
nextTopology);
         for (Map.Entry<Id, Ranges> entry : nextAdditions.entrySet())
         {
             previouslyReplicated.merge(entry.getKey(), entry.getValue(), (a, 
b) -> a.union(MERGE_ADJACENT, b));
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 3842438d..51b9b69a 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -403,6 +403,7 @@ public class Cluster implements Scheduler
         @Override public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int 
store) { throw new IllegalStateException("Not impelemented"); }
         @Override public NavigableMap<Timestamp, Ranges> loadSafeToRead(int 
store) { throw new IllegalStateException("Not impelemented"); }
         @Override public CommandStores.RangesForEpoch loadRangesForEpoch(int 
store) { throw new IllegalStateException("Not impelemented"); }
+        @Override public Ranges loadPermanentlyUnsafeToRead(int store) { throw 
new IllegalStateException("Not implemented"); }
         @Override public PersistentField.Persister<DurableBefore, 
DurableBefore> durableBeforePersister() { throw new IllegalStateException("Not 
impelemented"); }
         @Override public void saveStoreState(int store, FieldUpdates 
fieldUpdates, Runnable onFlush)  { throw new IllegalStateException("Not 
impelemented"); }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to