This is an automated email from the ASF dual-hosted git repository.
belliottsmith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0a10cd05 Safely regain ranges and delete retired command stores
0a10cd05 is described below
commit 0a10cd056794c05588114f45ce86d49d6d6538db
Author: Alan Wang <[email protected]>
AuthorDate: Thu Jun 4 14:13:05 2026 -0700
Safely regain ranges and delete retired command stores
patch by Alan Wang; reviewed by Benedict for CASSANDRA-21212
---
accord-core/src/main/java/accord/api/Journal.java | 17 +-
.../main/java/accord/coordinate/MaybeRecover.java | 1 -
.../java/accord/impl/AbstractSafeCommandStore.java | 9 +
.../src/main/java/accord/local/CommandStore.java | 35 +++-
.../src/main/java/accord/local/CommandStores.java | 228 ++++++++++++++++++---
.../local/DeletedCommandStoresException.java | 39 ++++
.../local/OverlappingCommandStoresException.java | 39 ++++
.../main/java/accord/local/SafeCommandStore.java | 10 +-
.../src/main/java/accord/topology/ActiveEpoch.java | 4 +-
.../main/java/accord/topology/ActiveEpochs.java | 59 +++---
.../src/main/java/accord/topology/Topology.java | 17 ++
.../main/java/accord/topology/TopologyManager.java | 90 ++++++--
.../accord/impl/basic/DelayedCommandStores.java | 29 ++-
.../java/accord/impl/basic/InMemoryJournal.java | 21 ++
.../java/accord/impl/basic/LoggingJournal.java | 6 +
.../src/test/java/accord/impl/list/ListAgent.java | 9 +-
.../java/accord/topology/TopologyManagerTest.java | 7 -
.../java/accord/topology/TopologyRandomizer.java | 44 ++--
.../src/main/java/accord/maelstrom/Cluster.java | 1 +
19 files changed, 550 insertions(+), 115 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Journal.java
b/accord-core/src/main/java/accord/api/Journal.java
index 8ba7db18..883c53f8 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
import accord.impl.CommandChange;
import accord.local.Command;
import accord.local.CommandStores;
+import accord.local.CommandStores.PreviouslyOwned;
import accord.local.DurableBefore;
import accord.local.MinimalCommand;
import accord.local.Node;
@@ -64,12 +65,14 @@ public interface Journal
void saveCommand(int store, CommandUpdate value, Runnable onFlush);
List<? extends TopologyUpdate> loadTopologies();
+ // TODO (required): saveTopology should be synchronous, or we should at
least await durability before updating in memory topology
void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush);
RedundantBefore loadRedundantBefore(int store);
NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store);
NavigableMap<Timestamp, Ranges> loadSafeToRead(int store);
CommandStores.RangesForEpoch loadRangesForEpoch(int store);
+ Ranges loadPermanentlyUnsafeToRead(int store);
void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush);
Persister<DurableBefore, DurableBefore> durableBeforePersister();
@@ -86,16 +89,18 @@ public interface Journal
{
public final Int2ObjectHashMap<CommandStores.RangesForEpoch>
commandStores;
public final Topology global;
+ public final PreviouslyOwned previouslyOwned;
- public TopologyUpdate(@Nonnull
Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores, @Nonnull
Topology global)
+ public TopologyUpdate(@Nonnull
Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores, @Nonnull
Topology global, PreviouslyOwned previouslyOwned)
{
this.commandStores = commandStores;
this.global = global;
+ this.previouslyOwned = previouslyOwned;
}
public boolean isEquivalent(TopologyUpdate other)
{
- boolean equivalent = global.isEquivalent(other.global);
+ boolean equivalent = global.isEquivalent(other.global) &&
Objects.equals(previouslyOwned, other.previouslyOwned);
if (!equivalent)
return false;
Invariants.require(commandStores.equals(other.commandStores));
@@ -104,7 +109,7 @@ public interface Journal
public TopologyUpdate cloneWithEquivalentEpoch(long epoch)
{
- return new TopologyUpdate(commandStores,
global.cloneEquivalentWithEpoch(epoch));
+ return new TopologyUpdate(commandStores,
global.cloneEquivalentWithEpoch(epoch), previouslyOwned);
}
@Override
@@ -113,7 +118,8 @@ public interface Journal
if (this == object) return true;
if (object == null || getClass() != object.getClass()) return
false;
TopologyUpdate update = (TopologyUpdate) object;
- return Objects.equals(commandStores, update.commandStores) &&
Objects.equals(global, update.global);
+ return Objects.equals(commandStores, update.commandStores) &&
Objects.equals(global, update.global)
+ && Objects.equals(previouslyOwned, update.previouslyOwned);
}
@Override
@@ -151,6 +157,7 @@ public interface Journal
public RedundantBefore newRedundantBefore;
public NavigableMap<TxnId, Ranges> newBootstrapBeganAt;
public NavigableMap<Timestamp, Ranges> newSafeToRead;
+ public Ranges newPermanentlyUnsafeToRead;
public CommandStores.RangesForEpoch newRangesForEpoch;
public String toString()
@@ -162,6 +169,8 @@ public interface Journal
builder.append("newBootstrapBeganAt=").append(newBootstrapBeganAt).append(", ");
if (newSafeToRead != null)
builder.append("newSafeToRead=").append(newSafeToRead).append(", ");
+ if (newPermanentlyUnsafeToRead != null)
+
builder.append("newPermanentlyUnsafeToRead=").append(newPermanentlyUnsafeToRead).append(",
");
if (newRangesForEpoch != null)
builder.append("newRangesForEpoch=").append(newRangesForEpoch).append(", ");
builder.setLength(builder.length() - 2);
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index bd6cb3fd..e5b49dd5 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -55,7 +55,6 @@ public class MaybeRecover extends CheckShards<Outcome,
Route<?>>
this.recoverIfAlreadyDurable = recoverIfAlreadyDurable;
this.reportTo = reportTo;
}
-
public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf
invalidIf, Route<?> someRoute, ProgressToken prevProgress, boolean
recoverIfAlreadyDurable, LatentStoreSelector reportTo, BiConsumer<? super
Outcome, Throwable> callback)
{
MaybeRecover maybeRecover;
diff --git
a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
index 24d52839..7d3c7fd6 100644
--- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
@@ -214,6 +214,9 @@ extends SafeCommandStore
if (fieldUpdates.newRangesForEpoch != null)
super.setRangesForEpoch(fieldUpdates.newRangesForEpoch);
+ if (fieldUpdates.newPermanentlyUnsafeToRead != null)
+
super.setPermanentlyUnsafeToRead(fieldUpdates.newPermanentlyUnsafeToRead);
+
fieldUpdates = null;
}
@@ -243,6 +246,12 @@ extends SafeCommandStore
ensureFieldUpdates().newSafeToRead = newSafeToRead;
}
+ @Override
+ public final void setPermanentlyUnsafeToRead(Ranges
newPermanentlyUnsafeToRead)
+ {
+ ensureFieldUpdates().newPermanentlyUnsafeToRead =
newPermanentlyUnsafeToRead;
+ }
+
@Override
public void setRangesForEpoch(RangesForEpoch rangesForEpoch)
{
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index 6cb67370..d6a99a42 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -188,6 +188,7 @@ public abstract class CommandStore implements
AbstractAsyncExecutor, SequentialA
* But they may still be ordered for other key ranges they participate in.
*/
private NavigableMap<Timestamp, Ranges> safeToRead = emptySafeToRead();
+ protected Ranges permanentlyUnsafeToRead = Ranges.EMPTY;
private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new
DeterministicIdentitySet<>());
static class WaitingOnVisibility
@@ -248,6 +249,7 @@ public abstract class CommandStore implements
AbstractAsyncExecutor, SequentialA
maxConflicts = MaxConflicts.EMPTY;
maxDecidedRX = MaxDecidedRX.EMPTY;
safeToRead = emptySafeToRead();
+ permanentlyUnsafeToRead = Ranges.EMPTY;
listeners.clear();
waitingOnVisibility.values().forEach(w -> w.invalid = true);
waitingOnVisibility.clear();
@@ -286,6 +288,11 @@ public abstract class CommandStore implements
AbstractAsyncExecutor, SequentialA
return rangesForEpoch;
}
+ public Ranges unsafeGetPermanentlyUnsafeToRead()
+ {
+ return permanentlyUnsafeToRead;
+ }
+
public MaxDecidedRX unsafeGetMaxDecidedRX()
{
return maxDecidedRX;
@@ -308,6 +315,17 @@ public abstract class CommandStore implements
AbstractAsyncExecutor, SequentialA
unsafeSetRangesForEpoch(newRangesForEpoch);
}
+ protected final void unsafeClearPermanentlyUnsafeToRead()
+ {
+ permanentlyUnsafeToRead = null;
+ }
+
+ protected void loadPermanentlyUnsafeToRead(Ranges
newPermanentlyUnsafeToRead)
+ {
+ Invariants.require(this.permanentlyUnsafeToRead == null);
+ unsafeSetPermanentlyUnsafeToRead(newPermanentlyUnsafeToRead);
+ }
+
public abstract boolean inStore();
public boolean tryExecuteImmediately(Runnable run)
@@ -406,12 +424,19 @@ public abstract class CommandStore implements
AbstractAsyncExecutor, SequentialA
/**
* This method may be invoked on a non-CommandStore thread
*/
- final void unsafeSetSafeToRead(NavigableMap<Timestamp, Ranges>
newSafeToRead)
+ final void unsafeSetSafeToRead(@Nullable NavigableMap<Timestamp, Ranges>
newSafeToRead)
{
+ if (newSafeToRead != null)
+ newSafeToRead = purgeHistory(newSafeToRead,
permanentlyUnsafeToRead);
this.safeToRead = newSafeToRead;
node.updateStamp();
}
+ final void unsafeSetPermanentlyUnsafeToRead(Ranges
newPermanentlyUnsafeToRead)
+ {
+ this.permanentlyUnsafeToRead = newPermanentlyUnsafeToRead;
+ }
+
protected final void unsafeClearSafeToRead()
{
unsafeSetSafeToRead(null);
@@ -1227,6 +1252,14 @@ public abstract class CommandStore implements
AbstractAsyncExecutor, SequentialA
}
}
+ final AsyncChain<Void> markPermanentlyUnsafeToRead(Ranges ranges)
+ {
+ return chain((Empty) () -> "Mark Range As Permanently Unsafe To Read",
safeStore -> {
+ safeStore.setSafeToRead(purgeHistory(safeToRead, ranges));
+
safeStore.setPermanentlyUnsafeToRead(permanentlyUnsafeToRead.union(MERGE_ADJACENT,
ranges));
+ });
+ }
+
public final DataStore unsafeGetDataStore()
{
return dataStore;
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java
b/accord-core/src/main/java/accord/local/CommandStores.java
index 69c05478..dd2e353d 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -24,7 +24,6 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -80,6 +79,7 @@ import org.agrona.collections.Hashing;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectHashMap;
+import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
import static accord.topology.EpochReady.done;
import static accord.api.DataStore.FetchKind.Sync;
import static
accord.local.CommandStores.BootstrapRangeAction.BOOTSTRAP_NOT_NEEDED;
@@ -93,7 +93,6 @@ import static java.util.stream.Collectors.toList;
*/
public abstract class CommandStores implements AsyncExecutorFactory
{
- @SuppressWarnings("unused")
private static final Logger logger =
LoggerFactory.getLogger(CommandStores.class);
public interface LatentStoreSelector
@@ -127,6 +126,8 @@ public abstract class CommandStores implements
AsyncExecutorFactory
* return null if we do not intersect this shard on the specified
epochs
*/
Ranges ranges(ShardHolder shard);
+
+ default @Nullable long minEpoch() { return -1L; };
}
public interface UnrestrictedStoreSelector extends StoreSelector
@@ -146,11 +147,13 @@ public abstract class CommandStores implements
AsyncExecutorFactory
this.maxEpoch = maxEpoch;
}
+ @Override
public StoreSelection select(Snapshot snapshot)
{
return StoreFinder.find(snapshot, keysOrRanges);
}
+ @Override
public @Nullable Ranges ranges(ShardHolder shard)
{
Ranges ranges = shard.ranges().allBetween(minEpoch, maxEpoch);
@@ -158,6 +161,12 @@ public abstract class CommandStores implements
AsyncExecutorFactory
return null;
return ranges;
}
+
+ @Override
+ public long minEpoch()
+ {
+ return minEpoch;
+ }
}
public static class IncludingSpecificStoreSelector implements
LatentStoreSelector
@@ -354,24 +363,22 @@ public abstract class CommandStores implements
AsyncExecutorFactory
public static class ShardHolder
{
public final CommandStore store;
+ public @Nullable final Ranges regainsRanges;
RangesForEpoch ranges;
- ShardHolder(CommandStore store)
+ ShardHolder(CommandStore store, @Nullable Ranges regainsRanges)
{
this.store = store;
+ this.regainsRanges = regainsRanges;
}
- public ShardHolder(CommandStore store, RangesForEpoch ranges)
+ public ShardHolder(CommandStore store, RangesForEpoch ranges,
@Nullable Ranges regainsRanges)
{
this.store = store;
+ this.regainsRanges = regainsRanges;
this.ranges = ranges;
}
- public ShardHolder withStoreUnsafe(CommandStore store)
- {
- return new ShardHolder(store, ranges);
- }
-
public RangesForEpoch ranges()
{
return ranges;
@@ -388,12 +395,109 @@ public abstract class CommandStores implements
AsyncExecutorFactory
RangesForEpoch ranges();
}
+ public static final class PreviouslyOwned
+ {
+ public static final PreviouslyOwned EMPTY = new PreviouslyOwned(0,
RangesForEpoch.EMPTY.epochs, RangesForEpoch.EMPTY.ranges);
+ final long maxEpoch;
+ final long[] epochs; // the epoch upon which it was last owned
+ final Ranges[] ranges;
+
+ public PreviouslyOwned(long maxEpoch, long[] epochs, Ranges[] ranges)
+ {
+ this.maxEpoch = maxEpoch;
+ this.epochs = epochs;
+ this.ranges = ranges;
+ }
+
+ PreviouslyOwned prepend(long epoch, Ranges ranges)
+ {
+ Invariants.require(epochs.length == 0 || epoch > epochs[0]);
+ long[] newEpochs = new long[this.epochs.length + 1];
+ Ranges[] newRanges = new Ranges[this.epochs.length + 1];
+ newEpochs[0] = epoch;
+ newRanges[0] = ranges;
+ System.arraycopy(this.epochs, 0, newEpochs, 1, this.epochs.length);
+ System.arraycopy(this.ranges, 0, newRanges, 1, this.ranges.length);
+ return new PreviouslyOwned(epoch, newEpochs, newRanges);
+ }
+
+ public boolean overlaps(long epoch, Unseekables<?> test)
+ {
+ if (epoch > maxEpoch)
+ return false;
+
+ for (int i = 0 ; i < epochs.length && epoch <= epochs[i] ; ++i)
+ {
+ if (this.ranges[i].intersects(test))
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean overlapsDeletedCommandStore(Ranges shardRanges, long
epoch, Unseekables<?> test)
+ {
+ if (epoch > maxEpoch)
+ return false;
+
+ for (int i = 0 ; i < epochs.length && epoch <= epochs[i] ; ++i)
+ {
+ if (this.ranges[i].intersects(test) &&
!shardRanges.containsAll(this.ranges[i].overlapping(test)))
+ return true;
+ }
+
+ return false;
+ }
+
+ public Ranges regains(Ranges overlapping)
+ {
+ Ranges regains = Ranges.EMPTY;
+ for (Ranges rs : this.ranges)
+ regains = regains.with(rs.slice(overlapping, Minimal));
+ return regains;
+ }
+
+ public int size()
+ {
+ return epochs.length;
+ }
+
+ public long epochs(int i)
+ {
+ return epochs[i];
+ }
+
+ public Ranges ranges(int i)
+ {
+ return ranges[i];
+ }
+
+ @Override
+ public boolean equals(Object that)
+ {
+ return that instanceof PreviouslyOwned && equals((PreviouslyOwned)
that);
+ }
+
+ public boolean equals(PreviouslyOwned that)
+ {
+ return Arrays.equals(this.ranges, that.ranges)
+ && Arrays.equals(this.epochs, that.epochs);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
// We ONLY remove ranges to keep logic manageable; likely to only merge
CommandStores into a new CommandStore via some kind of Bootstrap
public static class RangesForEpoch
{
+ public static final RangesForEpoch EMPTY = new RangesForEpoch(new
long[0], new Ranges[0]);
+
final long[] epochs;
final Ranges[] ranges;
- public static final RangesForEpoch EMPTY = new RangesForEpoch(new
long[0], new Ranges[0]);
public RangesForEpoch(long epoch, Ranges ranges)
{
@@ -425,7 +529,7 @@ public abstract class CommandStores implements
AsyncExecutorFactory
if (this == object) return true;
if (object == null || getClass() != object.getClass()) return
false;
RangesForEpoch that = (RangesForEpoch) object;
- return Objects.deepEquals(epochs, that.epochs) &&
Objects.deepEquals(ranges, that.ranges);
+ return Arrays.equals(epochs, that.epochs) && Arrays.equals(ranges,
that.ranges);
}
@Override
@@ -632,15 +736,17 @@ public abstract class CommandStores implements
AsyncExecutorFactory
final Int2IntHashMap byId;
private final int[] indexForRange;
final SearchableRangeList lookupByRange;
+ final Ranges shardRanges;
- public Snapshot(ShardHolder[] shards, Topology local, Topology global)
+ public Snapshot(ShardHolder[] shards, Topology local, Topology global,
PreviouslyOwned previouslyOwned)
{
- super(asMap(shards), global);
+ super(asMap(shards), global, previouslyOwned);
this.local = local;
this.shards = shards;
this.byId = new Int2IntHashMap(shards.length,
Hashing.DEFAULT_LOAD_FACTOR, -1);
int count = 0;
int prevId = -1;
+ Ranges shardRanges = Ranges.EMPTY;
for (int i = 0 ; i < shards.length ; ++i)
{
ShardHolder shard = shards[i];
@@ -649,7 +755,9 @@ public abstract class CommandStores implements
AsyncExecutorFactory
byId.put(id, i);
count += shard.ranges.all().size();
prevId = id;
+ shardRanges = shardRanges.union(MERGE_ADJACENT,
shard.ranges.all());
}
+ this.shardRanges = shardRanges;
class RangeAndIndex
{
final Range range;
@@ -685,7 +793,7 @@ public abstract class CommandStores implements
AsyncExecutorFactory
// This method exists to ensure we do not hold references to command
stores
public Journal.TopologyUpdate asTopologyUpdate()
{
- return new Journal.TopologyUpdate(commandStores, global);
+ return new Journal.TopologyUpdate(commandStores, global,
previouslyOwned);
}
private static Int2ObjectHashMap<CommandStores.RangesForEpoch>
asMap(ShardHolder[] shards)
@@ -725,7 +833,7 @@ public abstract class CommandStores implements
AsyncExecutorFactory
this.supplier = supplier;
this.shardDistributor = shardDistributor;
- this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY,
Topology.EMPTY);
+ this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY,
Topology.EMPTY, PreviouslyOwned.EMPTY);
this.journal = journal;
}
@@ -809,6 +917,8 @@ public abstract class CommandStores implements
AsyncExecutorFactory
List<Supplier<EpochReady>> bootstrapUpdates = new ArrayList<>();
List<ShardHolder> result = new ArrayList<>(prev.shards.length +
added.size());
+ PreviouslyOwned previouslyOwned = prev.previouslyOwned;
+
for (ShardHolder shard : prev.shards)
{
Ranges current = shard.ranges().currentRanges();
@@ -818,8 +928,14 @@ public abstract class CommandStores implements
AsyncExecutorFactory
// TODO (required): This is updating the a non-volatile field
in the previous Snapshot, why modify it at all, even with volatile the
guaranteed visibility is weak even with mutual exclusion
shard.ranges = shard.ranges().withRanges(newTopology.epoch(),
current.without(subtracted));
shard.store.epochUpdateHolder.remove(epoch, shard.ranges,
removeRanges);
+
bootstrapUpdates.add(shard.store.unbootstrap(epoch,
removeRanges));
}
+
+ Ranges regainedRanges = shard.ranges().all().slice(added, Minimal);
+ if (!regainedRanges.isEmpty())
+ bootstrapUpdates.add(() -> EpochReady.all(epoch,
shard.store.markPermanentlyUnsafeToRead(regainedRanges).beginAsResult()));
+
// TODO (desired): only sync affected shards
Ranges ranges = shard.ranges().currentRanges();
// ranges can be empty when ranges are lost or consolidated across
epochs.
@@ -828,6 +944,7 @@ public abstract class CommandStores implements
AsyncExecutorFactory
logger.debug("Epoch {} requires visibility sync for {}",
epoch, ranges);
bootstrapUpdates.add(shard.store.refreshReadyToCoordinate(node, ranges, epoch));
}
+
result.add(shard);
}
@@ -839,7 +956,7 @@ public abstract class CommandStores implements
AsyncExecutorFactory
EpochUpdateHolder updateHolder = new EpochUpdateHolder();
RangesForEpoch rangesForEpoch = new RangesForEpoch(epoch,
addRanges);
updateHolder.add(epoch, rangesForEpoch, addRanges);
- ShardHolder shard = new ShardHolder(supplier.create(nextId++,
updateHolder));
+ ShardHolder shard = new ShardHolder(supplier.create(nextId++,
updateHolder), previouslyOwned.regains(addRanges));
shard.ranges = rangesForEpoch;
Map<BootstrapRangeAction, Ranges> partitioned =
addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global,
newLocalTopology, range), BootstrapRangeAction.class);
@@ -873,7 +990,11 @@ public abstract class CommandStores implements
AsyncExecutorFactory
);
};
}
- return new TopologyUpdate(new Snapshot(result.toArray(new
ShardHolder[0]), newLocalTopology, newTopology), bootstrap);
+
+ if (!subtracted.isEmpty())
+ previouslyOwned = previouslyOwned.prepend(epoch - 1, subtracted);
+
+ return new TopologyUpdate(new Snapshot(result.toArray(new
ShardHolder[0]), newLocalTopology, newTopology, previouslyOwned), bootstrap);
}
private static boolean requiresSync(Ranges ranges, Topology oldTopology,
Topology newTopology)
@@ -989,6 +1110,7 @@ public abstract class CommandStores implements
AsyncExecutorFactory
{
Snapshot snapshot = current;
StoreSelection selection = selector.select(snapshot);
+ long minEpoch = selector.minEpoch();
AsyncChain<O> chain = null;
for (int i = selection.firstSetBit(); i >= 0 ; i =
selection.nextSetBit(i + 1, -1))
{
@@ -997,10 +1119,17 @@ public abstract class CommandStores implements
AsyncExecutorFactory
if (ranges == null)
continue;
+ if (minEpoch >= 0 && unsafelyTouchesRegainedRanges(snapshot,
shard, mapReduceConsume.scope, minEpoch))
+ return AsyncChains.failure(new
OverlappingCommandStoresException());
+
AsyncChain<O> next = mapReduceConsume.applyAsync(ranges,
shard.store);
if (next != null)
chain = chain != null ? AsyncChains.reduce(chain, next,
mapReduceConsume) : next;
}
+
+ if (minEpoch >= 0 &&
snapshot.previouslyOwned.overlapsDeletedCommandStore(snapshot.shardRanges,
minEpoch, mapReduceConsume.scope))
+ return AsyncChains.failure(new DeletedCommandStoresException());
+
return chain == null ? AsyncChains.success(null) : chain;
}
@@ -1022,6 +1151,18 @@ public abstract class CommandStores implements
AsyncExecutorFactory
return accumulator;
}
+ private static boolean unsafelyTouchesRegainedRanges(Snapshot snapshot,
ShardHolder shard, Unseekables<?> unseekables, long minEpoch)
+ {
+ if (shard.regainsRanges == null)
+ return false;
+
+ unseekables = unseekables.slice(shard.regainsRanges, Minimal);
+ if (unseekables.isEmpty())
+ return false;
+
+ return snapshot.previouslyOwned.overlaps(minEpoch, unseekables);
+ }
+
/**
* Initialize topology from snapshot on boot.
*/
@@ -1033,19 +1174,20 @@ public abstract class CommandStores implements
AsyncExecutorFactory
int maxId = -1;
for (Map.Entry<Integer, RangesForEpoch> e :
update.commandStores.entrySet())
{
- Invariants.require(e.getValue() != null);
- EpochUpdateHolder epochUpdates = new EpochUpdateHolder();
- ShardHolder shard = new ShardHolder(supplier.create(e.getKey(),
epochUpdates), e.getValue());
+ RangesForEpoch rfe = e.getValue();
+ Invariants.require(rfe != null);
+ EpochUpdateHolder holder = new EpochUpdateHolder();
+ ShardHolder shard = new ShardHolder(supplier.create(e.getKey(),
holder), rfe, update.previouslyOwned.regains(rfe.currentRanges()));
// TODO (required): if the add is necessary (highly unlikely) it
needs to be done once journal is writeable so we NEED to move this
if (!shard.ranges.equals(shard.store.rangesForEpoch))
- epochUpdates.add(1, e.getValue(), e.getValue().all());
+ holder.add(1, e.getValue(), rfe.all());
maxId = Math.max(maxId, e.getKey());
shards[i++] = shard;
}
Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id));
nextId = maxId + 1;
- loadSnapshot(new Snapshot(shards,
update.global.forNode(supplier.node.id()).trim(), update.global));
+ loadSnapshot(new Snapshot(shards,
update.global.forNode(supplier.node.id()).trim(), update.global,
update.previouslyOwned));
}
public synchronized void resetTopology(Journal.TopologyUpdate update)
@@ -1057,11 +1199,11 @@ public abstract class CommandStores implements
AsyncExecutorFactory
for (Map.Entry<Integer, RangesForEpoch> e :
update.commandStores.entrySet())
{
int storeId = e.getKey();
- RangesForEpoch ranges = e.getValue();
- Invariants.require(ranges != null);
- ShardHolder shard = new ShardHolder(current.byId(storeId), ranges);
+ RangesForEpoch rfe = e.getValue();
+ Invariants.require(rfe != null);
+ ShardHolder shard = new ShardHolder(current.byId(storeId), rfe,
update.previouslyOwned.regains(rfe.all()));
EpochUpdateHolder holder = shard.store.epochUpdateHolder;
- ranges.forEach(new BiConsumer<>()
+ rfe.forEach(new BiConsumer<>()
{
RangesForEpoch accumulator = null;
Ranges prev = null;
@@ -1094,7 +1236,7 @@ public abstract class CommandStores implements
AsyncExecutorFactory
}
nextId = maxId + 1;
- loadSnapshot(new Snapshot(shards, current.local, current.global));
+ loadSnapshot(new Snapshot(shards, current.local, current.global,
update.previouslyOwned));
}
public synchronized Supplier<EpochReady> updateTopology(Node node,
Topology newTopology)
@@ -1126,6 +1268,26 @@ public abstract class CommandStores implements
AsyncExecutorFactory
shuttingDown = true;
}
+ public synchronized void removeCommandStoresBefore(long beforeEpoch)
+ {
+ List<ShardHolder> keep = new ArrayList<>(current.shards.length);
+ for (ShardHolder shard : current)
+ {
+ RangesForEpoch ranges = shard.ranges;
+ int size = ranges.size();
+ long lastEpochWithOwnedRange = ranges.epochAtIndex(size - 1) - 1;
+ if (!ranges.rangesAtIndex(size - 1).isEmpty() ||
lastEpochWithOwnedRange >= beforeEpoch)
+ keep.add(shard);
+ }
+
+ if (keep.size() != current.shards.length)
+ {
+ Snapshot snapshot = new Snapshot(keep.toArray(new ShardHolder[0]),
current().local, current.global, current.previouslyOwned);
+ journal.saveTopology(snapshot.asTopologyUpdate(), null);
+ current = snapshot;
+ }
+ }
+
public void shutdown()
{
markShuttingDown();
@@ -1162,10 +1324,14 @@ public abstract class CommandStores implements
AsyncExecutorFactory
return all;
}
+ @Nullable
public CommandStore forId(int id)
{
Snapshot snapshot = current;
- return snapshot.shards[snapshot.byId.get(id)].store;
+ int shardIndex = snapshot.byId.get(id);
+ if (shardIndex == -1)
+ return null;
+ return snapshot.shards[shardIndex].store;
}
public int[] ids()
@@ -1200,6 +1366,12 @@ public abstract class CommandStores implements
AsyncExecutorFactory
throw new IllegalArgumentException();
}
+ @VisibleForTesting
+ public PreviouslyOwned getPreviouslyOwnedFromSnapshot()
+ {
+ return current.previouslyOwned;
+ }
+
protected Snapshot current()
{
return current;
diff --git
a/accord-core/src/main/java/accord/local/DeletedCommandStoresException.java
b/accord-core/src/main/java/accord/local/DeletedCommandStoresException.java
new file mode 100644
index 00000000..cefdbd8e
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/DeletedCommandStoresException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.utils.Rethrowable;
+
+public class DeletedCommandStoresException extends RuntimeException implements
Rethrowable<DeletedCommandStoresException>
+{
+ public DeletedCommandStoresException()
+ {
+ }
+
+ private DeletedCommandStoresException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ @Override
+ public DeletedCommandStoresException rethrowable()
+ {
+ return new DeletedCommandStoresException(this);
+ }
+}
diff --git
a/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java
b/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java
new file mode 100644
index 00000000..f1e358d6
--- /dev/null
+++
b/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.utils.Rethrowable;
+
+public class OverlappingCommandStoresException extends RuntimeException
implements Rethrowable<OverlappingCommandStoresException>
+{
+ public OverlappingCommandStoresException()
+ {
+ }
+
+ private OverlappingCommandStoresException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ @Override
+ public OverlappingCommandStoresException rethrowable()
+ {
+ return new OverlappingCommandStoresException(this);
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 239c3ff1..c4de4314 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -33,6 +33,7 @@ import accord.api.DataStore;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
+import accord.local.CommandStores.RangesForEpoch;
import accord.local.CommandStores.RangesForEpochSupplier;
import accord.local.RedundantBefore.RedundantBeforeSupplier;
import accord.local.cfk.CommandsForKey;
@@ -557,7 +558,7 @@ public abstract class SafeCommandStore implements
RangesForEpochSupplier, Redund
if (safeCommand != null && safeCommand.current().known().route() !=
MaybeRoute)
return;
- CommandStores.RangesForEpoch rangesForEpoch = safeStore.ranges();
+ RangesForEpoch rangesForEpoch = safeStore.ranges();
// TODO (required): this is incompatible with rebootstrap - we need to
use some additional condition
witnessedBy = witnessedBy.without(rangesForEpoch.coordinates(txnId));
// already coordinates, no need to replicate
if (witnessedBy.isEmpty())
@@ -601,6 +602,11 @@ public abstract class SafeCommandStore implements
RangesForEpochSupplier, Redund
commandStore().unsafeSetSafeToRead(newSafeToRead);
}
+ public void setPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead)
+ {
+
commandStore().unsafeSetPermanentlyUnsafeToRead(newPermanentlyUnsafeToRead);
+ }
+
public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
{
commandStore().unsafeSetRangesForEpoch(rangesForEpoch);
@@ -613,7 +619,7 @@ public abstract class SafeCommandStore implements
RangesForEpochSupplier, Redund
public abstract Agent agent();
public abstract ProgressLog progressLog();
public abstract NodeCommandStoreService node();
- public abstract CommandStores.RangesForEpoch ranges();
+ public abstract RangesForEpoch ranges();
protected NavigableMap<TxnId, Ranges> bootstrapBeganAt()
{
diff --git a/accord-core/src/main/java/accord/topology/ActiveEpoch.java
b/accord-core/src/main/java/accord/topology/ActiveEpoch.java
index 04333fca..2a39855f 100644
--- a/accord-core/src/main/java/accord/topology/ActiveEpoch.java
+++ b/accord-core/src/main/java/accord/topology/ActiveEpoch.java
@@ -44,9 +44,9 @@ public final class ActiveEpoch
final QuorumTracker quorumReadyTracker;
final SimpleBitSet shardQuorumReady, receivedNodeReady;
- private Ranges quorumReady;
+ private volatile Ranges quorumReady;
- private Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
+ private volatile Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
private volatile boolean allRetired;
public boolean allRetired()
diff --git a/accord-core/src/main/java/accord/topology/ActiveEpochs.java
b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
index a9fbd1b6..228c7f9b 100644
--- a/accord-core/src/main/java/accord/topology/ActiveEpochs.java
+++ b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
@@ -61,19 +61,34 @@ public final class ActiveEpochs implements
Iterable<ActiveEpoch>
// Epochs are sorted in _descending_ order
final ActiveEpoch[] epochs;
- ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long
prevFirstNonEmptyEpoch)
+ private ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long
firstNonEmptyEpoch)
{
this.manager = manager;
this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
- if (prevFirstNonEmptyEpoch != -1)
- this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch;
- else if (epochs.length > 0 && !epochs[0].all().isEmpty())
- this.firstNonEmptyEpoch = currentEpoch;
- else
- this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch;
-
+ this.firstNonEmptyEpoch = firstNonEmptyEpoch;
+ this.epochs = epochs;
for (int i = 1; i < epochs.length; i++)
Invariants.requireArgument(epochs[i].epoch() == epochs[i -
1].epoch() - 1);
+ }
+
+ static ActiveEpochs empty(TopologyManager manager)
+ {
+ return new ActiveEpochs(manager, new ActiveEpoch[0], -1);
+ }
+
+ ActiveEpochs withNewEpochs(ActiveEpoch[] epochs)
+ {
+ long firstNonEmptyEpoch = this.firstNonEmptyEpoch;
+ if (firstNonEmptyEpoch == -1 && epochs.length > 0 &&
!epochs[0].all().isEmpty())
+ {
+ Invariants.require(epochs.length == 1);
+ firstNonEmptyEpoch = epochs[0].epoch();
+ }
+ return new ActiveEpochs(manager, epochs, firstNonEmptyEpoch);
+ }
+
+ ActiveEpochs maybeTruncate()
+ {
int truncateFrom = -1;
// > 0 because we do not want to be left without epochs in case
they're all empty
for (int i = epochs.length - 1; i > 0; i--)
@@ -85,28 +100,26 @@ public final class ActiveEpochs implements
Iterable<ActiveEpoch>
}
if (truncateFrom == -1)
+ return this;
+
+ ActiveEpoch[] newEpochs = Arrays.copyOf(epochs, truncateFrom);
+ for (int i = truncateFrom; i < epochs.length; i++)
{
- this.epochs = epochs;
+ ActiveEpoch e = epochs[i];
+ Invariants.require(epochs[i].isQuorumReady());
+ logger.info("Retired epoch {} with added/removed ranges {}/{}.
Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges,
e.all.ranges, e.closed());
}
- else
+ if (logger.isTraceEnabled())
{
- this.epochs = Arrays.copyOf(epochs, truncateFrom);
- for (int i = truncateFrom; i < epochs.length; i++)
+ for (int i = 0; i < truncateFrom; i++)
{
ActiveEpoch e = epochs[i];
- Invariants.require(epochs[i].isQuorumReady());
- logger.info("Retired epoch {} with added/removed ranges {}/{}.
Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges,
e.all.ranges, e.closed());
- }
- if (logger.isTraceEnabled())
- {
- for (int i = 0; i < truncateFrom; i++)
- {
- ActiveEpoch e = epochs[i];
- Invariants.require(e.isQuorumReady());
- logger.trace("Leaving epoch {} with added/removed ranges
{}/{}", e.epoch(), e.addedRanges, e.removedRanges);
- }
+ Invariants.require(e.isQuorumReady());
+ logger.trace("Leaving epoch {} with added/removed ranges
{}/{}", e.epoch(), e.addedRanges, e.removedRanges);
}
}
+
+ return withNewEpochs(newEpochs);
}
public boolean isEmpty()
diff --git a/accord-core/src/main/java/accord/topology/Topology.java
b/accord-core/src/main/java/accord/topology/Topology.java
index 3f993bc7..0fd504fd 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -766,6 +766,23 @@ public class Topology
forEach.accept(shards[i]);
}
+ public static Map<Id, Ranges> computeNodeAdditions(Topology current,
Topology next)
+ {
+ Map<Id, Ranges> additions = new HashMap<>();
+ for (Id node : next.nodes())
+ {
+ Ranges prev = current.rangesForNode(node);
+ if (prev == null) prev = Ranges.EMPTY;
+
+ Ranges added = next.rangesForNode(node).without(prev);
+ if (added.isEmpty())
+ continue;
+
+ additions.put(node, added);
+ }
+ return additions;
+ }
+
public SortedArrayList<Id> nodes()
{
return nodes;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 263e64ba..0746104a 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -19,6 +19,7 @@
package accord.topology;
import java.util.IdentityHashMap;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -52,6 +53,9 @@ import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import accord.utils.async.NestedAsyncResult;
+import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
+import static accord.primitives.Routables.Slice.Minimal;
+
/**
* Manages topology state changes and update bookkeeping
*
@@ -99,7 +103,7 @@ public class TopologyManager
this.time = time;
this.timeouts = timeouts;
this.topologyService = topologyService;
- this.active = new ActiveEpochs(this, new ActiveEpoch[0], -1);
+ this.active = ActiveEpochs.empty(this);
this.pending = new PendingEpochs(this);
}
@@ -169,6 +173,7 @@ public class TopologyManager
private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId
txnId)
{
+ long removeCommandStoresBefore = 0;
Topology topology = null;
synchronized (this)
{
@@ -193,28 +198,20 @@ public class TopologyManager
if (epoch > active.currentEpoch)
ranges = pending.retired(ranges, epoch);
ranges = active.retired(ranges, epoch);
+ ActiveEpochs truncated = active.maybeTruncate();
+ if (truncated != active)
+ {
+ active = truncated;
+ removeCommandStoresBefore = truncated.minEpoch();
+ }
}
if (!ranges.isEmpty())
{
for (TopologyListener listener : listeners)
listener.onEpochRetired(ranges, epoch, topology);
}
- }
-
- public synchronized void truncateTopologiesUntil(long epoch)
- {
- ActiveEpochs current = active;
- Invariants.requireArgument(current.epoch() >= epoch, "Unable to
truncate; epoch %d is > current epoch %d", epoch, current.epoch());
-
- if (current.minEpoch() >= epoch)
- return;
-
- int newLen = current.epochs.length - (int) (epoch -
current.minEpoch());
- Invariants.require(current.epochs[newLen - 1].isQuorumReady(), "Epoch
%d is not ready to coordinate", current.epochs[newLen - 1].epoch());
-
- ActiveEpoch[] nextEpochs = new ActiveEpoch[newLen];
- System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
- active = new ActiveEpochs(this, nextEpochs,
current.firstNonEmptyEpoch);
+ if (removeCommandStoresBefore > 0)
+
node.commandStores().removeCommandStoresBefore(removeCommandStoresBefore);
}
public TopologySorter.Supplier sorter()
@@ -308,6 +305,63 @@ public class TopologyManager
updateActive();
}
+ public static class RegainingEpochRange
+ {
+ public final long epoch;
+ public final Ranges ranges;
+
+ public RegainingEpochRange(long epoch, Ranges ranges)
+ {
+ this.epoch = epoch;
+ this.ranges = ranges;
+ }
+
+ public long epoch()
+ {
+ return epoch;
+ }
+
+ public Ranges ranges()
+ {
+ return ranges;
+ }
+ }
+
+ @Nullable
+ public RegainingEpochRange computeRegaining(Topology current, Topology
next)
+ {
+ Map<Id, Ranges> additions = Topology.computeNodeAdditions(current,
next);
+ long greatestEpoch = -1;
+ Ranges ranges = Ranges.EMPTY;
+
+ ActiveEpochs active = this.active;
+ for (Map.Entry<Id, Ranges> entry : additions.entrySet())
+ {
+ Ranges addingForNode = entry.getValue();
+ for (ActiveEpoch e : active)
+ {
+ addingForNode =
addingForNode.without(e.removedRanges).without(e.retired());
+ if (addingForNode.isEmpty())
+ break;
+
+ Ranges existingForNode = e.all().rangesForNode(entry.getKey());
+ Ranges regainingForNode = addingForNode.slice(existingForNode,
Minimal);
+ if (!regainingForNode.isEmpty())
+ {
+ greatestEpoch = Math.max(greatestEpoch, e.epoch());
+ ranges = ranges.union(MERGE_ADJACENT, regainingForNode);
+ addingForNode = addingForNode.without(regainingForNode);
+ }
+ addingForNode = addingForNode.without(e.addedRanges);
+ }
+ }
+
+ if (greatestEpoch != -1)
+ return new RegainingEpochRange(greatestEpoch, ranges);
+
+ return null;
+ }
+
private final AtomicBoolean updatingActive = new AtomicBoolean();
private void updateActive()
{
@@ -376,7 +430,7 @@ public class TopologyManager
}
}
- this.active = new ActiveEpochs(this, next,
prev.firstNonEmptyEpoch);
+ this.active = prev.withNewEpochs(next);
this.pending.removeFirst(topology.epoch);
}
diff --git
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 22a29e5b..b80b9824 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -85,6 +85,7 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
public void validateShardStateForTesting(Journal.TopologyUpdate lastUpdate)
{
+ PreviouslyOwned previouslyOwned = lastUpdate.previouslyOwned;
ShardHolder[] shards = new
ShardHolder[lastUpdate.commandStores.size()];
int i = 0;
for (Map.Entry<Integer, RangesForEpoch> e :
lastUpdate.commandStores.entrySet())
@@ -92,20 +93,26 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
Snapshot current = current();
RangesForEpoch ranges = e.getValue();
DelayedCommandStore commandStore = null;
+ Ranges regainingRanges = null;
for (ShardHolder shard : current)
{
if (shard.ranges().equals(ranges))
+ {
+ Invariants.require(commandStore == null);
commandStore = (DelayedCommandStore) shard.store;
+ regainingRanges = shard.regainsRanges;
+ }
}
Invariants.nonNull(commandStore, "Each set of ranges should have a
corresponding command store, but %d did not:(%s)",
ranges, Arrays.toString(shards))
.restore();
- ShardHolder shard = new ShardHolder(commandStore, e.getValue());
+
Invariants.require(previouslyOwned.regains(ranges.currentRanges()).equals(regainingRanges));
+ ShardHolder shard = new ShardHolder(commandStore, ranges,
previouslyOwned.regains(ranges.currentRanges()));
shards[i++] = shard;
}
Arrays.sort(shards, Comparator.comparingInt(shard ->
shard.store.id()));
- loadSnapshot(new Snapshot(shards,
lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global));
+ loadSnapshot(new Snapshot(shards,
lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global,
lastUpdate.previouslyOwned));
}
protected void loadSnapshot(Snapshot nextSnapshot)
@@ -139,6 +146,12 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
RangesForEpoch loaded = next.unsafeGetRangesForEpoch();
Invariants.require(orig.equals(loaded), "%s should equal %s",
loaded, orig);
}
+
+ {
+ Ranges orig = prev.unsafeGetPermanentlyUnsafeToRead();
+ Ranges loaded = next.unsafeGetPermanentlyUnsafeToRead();
+ Invariants.require(orig.equals(loaded), "%s should equal %s",
loaded, orig);
+ }
}
super.loadSnapshot(nextSnapshot);
@@ -258,8 +271,20 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
}
}
+ @Override
+ protected void loadPermanentlyUnsafeToRead(Ranges
newPermanentlyUnsafeToRead)
+ {
+ if (newPermanentlyUnsafeToRead == null)
Invariants.require(super.permanentlyUnsafeToRead.isEmpty());
+ else
+ {
+ unsafeClearPermanentlyUnsafeToRead();
+ super.loadPermanentlyUnsafeToRead(newPermanentlyUnsafeToRead);
+ }
+ }
+
public void restore()
{
+
loadPermanentlyUnsafeToRead(journal.loadPermanentlyUnsafeToRead(id()));
loadRangesForEpoch(journal.loadRangesForEpoch(id()));
loadRedundantBefore(journal.loadRedundantBefore(id()));
loadBootstrapBeganAt(journal.loadBootstrapBeganAt(id()));
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 8539fbe1..a4aa44f1 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -262,6 +262,11 @@ public class InMemoryJournal implements Journal
@Override
public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush)
{
+ // Ensure that we only read the latest topologyUpdate as that is what
happens in the
+ // C* implementation
+ int lastIndex = topologyUpdates.size() - 1;
+ if (!topologyUpdates.isEmpty() &&
topologyUpdates.get(lastIndex).global.equals(topologyUpdate.global))
+ topologyUpdates.remove(lastIndex);
topologyUpdates.add(topologyUpdate);
if (onFlush != null)
onFlush.run();
@@ -315,6 +320,15 @@ public class InMemoryJournal implements Journal
return fieldStates.newRangesForEpoch;
}
+ @Override
+ public Ranges loadPermanentlyUnsafeToRead(int commandStoreId)
+ {
+ FieldUpdates fieldStates = this.fieldStates.get(commandStoreId);
+ if (fieldStates == null)
+ return null;
+ return fieldStates.newPermanentlyUnsafeToRead;
+ }
+
@Override
public PersistentField.Persister<DurableBefore, DurableBefore>
durableBeforePersister()
{
@@ -329,6 +343,7 @@ public class InMemoryJournal implements Journal
init.newRedundantBefore = RedundantBefore.EMPTY;
init.newBootstrapBeganAt = ImmutableSortedMap.of(TxnId.NONE,
Ranges.EMPTY);
init.newSafeToRead = ImmutableSortedMap.of(Timestamp.NONE,
Ranges.EMPTY);
+ init.newPermanentlyUnsafeToRead = Ranges.EMPTY;
return init;
});
@@ -340,6 +355,8 @@ public class InMemoryJournal implements Journal
fieldStates.newBootstrapBeganAt = fieldUpdates.newBootstrapBeganAt;
if (fieldUpdates.newRangesForEpoch != null)
fieldStates.newRangesForEpoch = fieldUpdates.newRangesForEpoch;
+ if (fieldUpdates.newPermanentlyUnsafeToRead != null)
+ fieldStates.newPermanentlyUnsafeToRead =
fieldUpdates.newPermanentlyUnsafeToRead;
if (onFlush!= null)
onFlush.run();
@@ -634,6 +651,10 @@ public class InMemoryJournal implements Journal
Map<TxnId, List<Diff>> diffs = new TreeMap<>();
InMemoryCommandStore commandStore = (InMemoryCommandStore)
commandStores.forId(commandStoreId);
+
+ if (commandStore == null)
+ continue;
+
Replayer replayer = commandStore.replayer(PART_NON_DURABLE);
for (Map.Entry<TxnId, Diffs> e : diffEntry.getValue().entrySet())
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 3fd8669f..5563276f 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -165,6 +165,12 @@ public class LoggingJournal implements Journal
return delegate.loadRangesForEpoch(commandStoreId);
}
+ @Override
+ public Ranges loadPermanentlyUnsafeToRead(int store)
+ {
+ return delegate.loadPermanentlyUnsafeToRead(store);
+ }
+
@Override
public PersistentField.Persister<DurableBefore, DurableBefore>
durableBeforePersister()
{
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index c0beeaae..182a0d8a 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -47,12 +47,7 @@ import accord.impl.InMemoryCommandStore.Snapshot;
import accord.impl.basic.NodeSink;
import accord.impl.basic.SimulatedFault;
import accord.impl.mock.Network;
-import accord.local.CommandStore;
-import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.LogUnavailableException;
-import accord.local.SafeCommandStore;
-import accord.local.TimeService;
+import accord.local.*;
import accord.messages.MessageType;
import accord.primitives.Ballot;
import accord.primitives.Keys;
@@ -173,7 +168,7 @@ public class ListAgent implements InMemoryAgent,
CoordinatorEventListener, Owner
ownershipEventListener.onFailedBootstrap(attempt, phase, ranges,
retry, fail, failure);
}
- private static final Set<Class<?>> expectedExceptions = new
HashSet<>(Arrays.asList(SimulatedFault.class,
ExecuteSyncPoint.SyncPointErased.class, CancellationException.class,
TopologyRetiredException.class, TopologyMismatch.class,
Snapshotter.SnapshotAborted.class, TimeoutException.class,
LogUnavailableException.class));
+ private static final Set<Class<?>> expectedExceptions = new
HashSet<>(Arrays.asList(SimulatedFault.class,
ExecuteSyncPoint.SyncPointErased.class, CancellationException.class,
TopologyRetiredException.class, TopologyMismatch.class,
Snapshotter.SnapshotAborted.class, TimeoutException.class,
LogUnavailableException.class, OverlappingCommandStoresException.class,
DeletedCommandStoresException.class));
@Override
public void onException(Throwable t)
{
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index 8a52495d..6189f788 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -283,13 +283,6 @@ public class TopologyManagerTest
Assertions.assertTrue(service.active().hasEpoch(2));
Assertions.assertTrue(service.active().hasEpoch(3));
Assertions.assertTrue(service.active().hasEpoch(4));
-
- service.truncateTopologiesUntil(3);
- Assertions.assertFalse(service.active().hasEpoch(1));
- Assertions.assertFalse(service.active().hasEpoch(2));
- Assertions.assertTrue(service.active().hasEpoch(3));
- Assertions.assertTrue(service.active().hasEpoch(4));
-
}
@Test
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index a8f8490a..c778e707 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -413,33 +413,37 @@ public class TopologyRandomizer
return ((PrefixedIntHashKey) shard.range.start()).prefix;
}
- private static Map<Id, Ranges> getAdditions(Topology current, Topology
next)
+ private boolean validToReassignRange(Topology current, Shard[] nextShards,
Map<Id, Ranges> previouslyReplicated)
{
- Map<Id, Ranges> additions = new HashMap<>();
- for (Id node : next.nodes())
- {
- Ranges prev = current.rangesForNode(node);
- if (prev == null) prev = Ranges.EMPTY;
-
- Ranges added = next.rangesForNode(node).without(prev);
- if (added.isEmpty())
- continue;
+ Topology next = new Topology(current.epoch + 1, nextShards);
+ Map<Id, Ranges> additions = Topology.computeNodeAdditions(current,
next);
- additions.put(node, added);
+ for (Map.Entry<Id, Ranges> entry : additions.entrySet())
+ {
+ if (previouslyReplicated.getOrDefault(entry.getKey(),
Ranges.EMPTY).intersects(entry.getValue())
+ && !(previousEpochForRegainedRangeRetired(current,
entry.getValue())))
+ return false;
}
- return additions;
+
+ return true;
}
- private static boolean reassignsRanges(Topology current, Shard[]
nextShards, Map<Id, Ranges> previouslyReplicated)
+ private boolean previousEpochForRegainedRangeRetired(Topology current,
Ranges regainingRanges)
{
- Topology next = new Topology(current.epoch + 1, nextShards);
- Map<Id, Ranges> additions = getAdditions(current, next);
-
- for (Map.Entry<Id, Ranges> entry : additions.entrySet())
+ for (Id id : current.nodes())
{
- if (previouslyReplicated.getOrDefault(entry.getKey(),
Ranges.EMPTY).intersects(entry.getValue()))
+ Node node = this.nodeLookup.apply(id);
+ boolean isRetiredForNode = true;
+ for (ActiveEpoch epoch : node.topology().active())
+ {
+ if (epoch.all().ranges.intersects(regainingRanges) &&
!epoch.allRetired())
+ isRetiredForNode = false;
+ }
+
+ if (isRetiredForNode)
return true;
}
+
return false;
}
@@ -472,7 +476,7 @@ public class TopologyRandomizer
Shard[] testShards = type.apply(state, random);
Arrays.sort(testShards, (a, b) -> a.range.compareTo(b.range));
if (!everyShardHasQuorumOverlaps(oldShards, testShards)
- || reassignsRanges(current, testShards, previouslyReplicated))
+ || !validToReassignRange(current, testShards,
previouslyReplicated))
{
++rejectedMutations;
}
@@ -488,7 +492,7 @@ public class TopologyRandomizer
Topology nextTopology = new Topology(current.epoch + 1, newShards);
- Map<Id, Ranges> nextAdditions = getAdditions(current, nextTopology);
+ Map<Id, Ranges> nextAdditions = Topology.computeNodeAdditions(current,
nextTopology);
for (Map.Entry<Id, Ranges> entry : nextAdditions.entrySet())
{
previouslyReplicated.merge(entry.getKey(), entry.getValue(), (a,
b) -> a.union(MERGE_ADJACENT, b));
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 3842438d..51b9b69a 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -403,6 +403,7 @@ public class Cluster implements Scheduler
@Override public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int
store) { throw new IllegalStateException("Not impelemented"); }
@Override public NavigableMap<Timestamp, Ranges> loadSafeToRead(int
store) { throw new IllegalStateException("Not impelemented"); }
@Override public CommandStores.RangesForEpoch loadRangesForEpoch(int
store) { throw new IllegalStateException("Not impelemented"); }
+ @Override public Ranges loadPermanentlyUnsafeToRead(int store) { throw
new IllegalStateException("Not implemented"); }
@Override public PersistentField.Persister<DurableBefore,
DurableBefore> durableBeforePersister() { throw new IllegalStateException("Not
impelemented"); }
@Override public void saveStoreState(int store, FieldUpdates
fieldUpdates, Runnable onFlush) { throw new IllegalStateException("Not
impelemented"); }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]