This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 555337a7 Fix: - EphemeralReads should retry in a later epoch if replication factor changes - Fix no local epoch for NotAccept - Invalidate a command with no route but for which we know we own some key that has applied locally - Don't pre-merge existing DurableBefore, to reduce duplicate/redundant persistence - Misc purging bugs and improve testing of purging 555337a7 is described below commit 555337a7d41158f74033818facf94fed6904bf5a Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Fri Jun 27 11:19:52 2025 +0100 Fix: - EphemeralReads should retry in a later epoch if replication factor changes - Fix no local epoch for NotAccept - Invalidate a command with no route but for which we know we own some key that has applied locally - Don't pre-merge existing DurableBefore, to reduce duplicate/redundant persistence - Misc purging bugs and improve testing of purging patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20739 --- .../java/accord/coordinate/ExecuteSyncPoint.java | 2 +- .../src/main/java/accord/coordinate/Propose.java | 27 ++++++---- .../src/main/java/accord/impl/CommandChange.java | 58 +++++++++++++-------- .../java/accord/impl/DefaultLocalListeners.java | 5 +- .../src/main/java/accord/local/Cleanup.java | 27 +++------- .../src/main/java/accord/local/CommandStore.java | 1 + accord-core/src/main/java/accord/local/Node.java | 2 +- .../accord/local/durability/ShardDurability.java | 8 +-- .../java/accord/messages/GetEphemeralReadDeps.java | 5 +- .../src/main/java/accord/topology/Topology.java | 1 + .../main/java/accord/topology/TopologyManager.java | 59 +++++++++++++++++++++- .../java/accord/impl/basic/InMemoryJournal.java | 15 +++--- 12 files changed, 143 insertions(+), 67 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java index 5b6f00aa..1cdc781d 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java @@ -184,7 +184,7 @@ public class ExecuteSyncPoint extends SettableResult<DurabilityResult> implement } else { - DurabilityResult result = new DurabilityResult(syncPoint, tracker.achievedLocal(node.id()), tracker.achievedRemote(), tracker.failures(), null); + DurabilityResult result = current(); if (result.achievedRemote == SyncRemote.All) { node.configService().reportEpochRetired(syncPoint.route.toRanges(), syncPoint.syncId.epoch() - 1); diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java index ccefb3d4..26917132 100644 --- a/accord-core/src/main/java/accord/coordinate/Propose.java +++ b/accord-core/src/main/java/accord/coordinate/Propose.java @@ -44,6 +44,7 @@ import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; import accord.topology.Topologies; +import accord.topology.TopologyManager; import accord.utils.Invariants; import accord.utils.SortedArrays; import accord.utils.SortedListMap; @@ -259,23 +260,29 @@ abstract class Propose<R> implements Callback<AcceptReply> this.debug = debug() ? new SortedListMap<>(topologies.nodes(), AcceptReply[]::new) : null; } - public static NotAccept proposeInvalidate(Node node, SequentialAsyncExecutor executor, Ballot ballot, TxnId txnId, RoutingKey invalidateWithParticipant, BiConsumer<Void, Throwable> callback) + public static void proposeInvalidate(Node node, SequentialAsyncExecutor executor, Ballot ballot, TxnId txnId, RoutingKey invalidateWithParticipant, BiConsumer<Void, Throwable> callback) { - return proposeNotAccept(node, executor, AcceptedInvalidate, ballot, txnId, invalidateWithParticipant, callback); + proposeNotAccept(node, executor, AcceptedInvalidate, ballot, txnId, invalidateWithParticipant, callback); } - public static NotAccept proposeNotAccept(Node node, SequentialAsyncExecutor executor, Status status, Ballot ballot, TxnId txnId, RoutingKey participatingKey, BiConsumer<Void, Throwable> callback) + public static void proposeNotAccept(Node node, SequentialAsyncExecutor executor, Status status, Ballot ballot, TxnId txnId, RoutingKey participatingKey, BiConsumer<Void, Throwable> callback) { - Participants<?> participants = Participants.singleton(txnId.domain(), participatingKey); - Topologies topologies = node.topology().forEpoch(participants, txnId.epoch(), SHARE); - NotAccept notAccept = new NotAccept(node, status, topologies, ballot, txnId, participants, callback); - node.send(topologies.nodes(), to -> new Accept.NotAccept(status, ballot, txnId, participants), executor, notAccept); - return notAccept; + try + { + Participants<?> participants = Participants.singleton(txnId.domain(), participatingKey); + Topologies topologies = node.topology().forEpoch(participants, txnId.epoch(), SHARE); + NotAccept notAccept = new NotAccept(node, status, topologies, ballot, txnId, participants, callback); + node.send(topologies.nodes(), to -> new Accept.NotAccept(status, ballot, txnId, participants), executor, notAccept); + } + catch (Throwable t) + { + callback.accept(null, t); + } } - public static NotAccept proposeAndCommitInvalidate(Node node, SequentialAsyncExecutor executor, Ballot ballot, TxnId txnId, RoutingKey invalidateWithParticipant, Route<?> commitInvalidationTo, Timestamp invalidateUntil, BiConsumer<?, Throwable> callback) + public static void proposeAndCommitInvalidate(Node node, SequentialAsyncExecutor executor, Ballot ballot, TxnId txnId, RoutingKey invalidateWithParticipant, Route<?> commitInvalidationTo, Timestamp invalidateUntil, BiConsumer<?, Throwable> callback) { - return proposeInvalidate(node, executor, ballot, txnId, invalidateWithParticipant, (success, fail) -> { + proposeInvalidate(node, executor, ballot, txnId, invalidateWithParticipant, (success, fail) -> { if (fail != null) { callback.accept(null, fail); diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java b/accord-core/src/main/java/accord/impl/CommandChange.java index a7ccfce1..b10b98b1 100644 --- a/accord-core/src/main/java/accord/impl/CommandChange.java +++ b/accord-core/src/main/java/accord/impl/CommandChange.java @@ -336,6 +336,7 @@ public class CommandChange { if (saveStatus == null) return EXPUNGE; + if (participants != null) participants = participants.filter(LOAD, redundantBefore, txnId, saveStatus.known.isExecuteAtKnown() ? executeAt : null); } @@ -348,28 +349,21 @@ public class CommandChange public Cleanup maybeCleanup(boolean clearFields, Input input, RedundantBefore redundantBefore, DurableBefore durableBefore) { Cleanup cleanup = shouldCleanup(input, redundantBefore, durableBefore); - return maybeCleanup(clearFields, input, cleanup); + return maybeCleanup(clearFields, cleanup); } - public Cleanup maybeCleanup(boolean clearFields, Input input, Cleanup cleanup) + public Cleanup maybeCleanup(boolean clearFields, Cleanup cleanup) { - if (cleanup == NO || cleanup == EXPUNGE) + if (cleanup == NO) return cleanup; - SaveStatus newSaveStatus = cleanup.newStatus; - if (saveStatus == null || saveStatus.compareTo(newSaveStatus) < 0) + forceSetNulls(clearFields, eraseKnownFieldsMask[cleanup.newStatus.ordinal()]); + if (this.cleanup == null || this.cleanup.compareTo(cleanup) < 0) { - if (input == Input.FULL) - { - // TODO (expected): this special-casing should be declared in Cleanup - if (newSaveStatus == SaveStatus.TruncatedApply && (saveStatus == null || !saveStatus.known.is(ApplyAtKnown))) - newSaveStatus = SaveStatus.TruncatedUnapplied; - saveStatus = newSaveStatus; - } - forceSetNulls(clearFields, eraseKnownFieldsMask[newSaveStatus.ordinal()]); + this.hasUpdate = true; + this.flags |= setChanged(CLEANUP); + this.cleanup = cleanup; } - - this.cleanup = cleanup; return cleanup; } @@ -462,7 +456,7 @@ public class CommandChange // that is, if we cleared a non-null field or if we are already mask-only public boolean clearSuperseded(boolean clearFields, Builder superseding) { - int unset = flags & setFieldsMask(superseding.flags); + int unset = flags & setFieldsMask(superseding.flags & ~setChanged(CLEANUP)); if (notNulls(unset) == 0 && notNulls(flags) != 0) return false; @@ -478,13 +472,14 @@ public class CommandChange return false; if (cleanup != null && addCleanup.compareTo(cleanup) <= 0) + { + Invariants.require(isChanged(CLEANUP, flags)); return false; + } hasUpdate = true; cleanup = addCleanup; flags |= setChanged(CLEANUP); - if (!cleanup.appliesTo(saveStatus)) - return false; return forceSetNulls(clearFields, eraseKnownFieldsMask[cleanup.newStatus.ordinal()]); } @@ -546,8 +541,8 @@ public class CommandChange return null; Invariants.require(txnId != null); - if (participants == null) participants = StoreParticipants.empty(txnId); - else participants = participants.filter(LOAD, redundantBefore, txnId, saveStatus.known.isExecuteAtKnown() ? executeAt : null); + if (participants != null) + participants = participants.filter(LOAD, redundantBefore, txnId, saveStatus != null && saveStatus.known.isExecuteAtKnown() ? executeAt : null); if (durability == null) durability = NotDurable; @@ -556,6 +551,27 @@ public class CommandChange if (this.waitingOn != null) waitingOn = this.waitingOn.provide(txnId, partialDeps, executesAtLeast, minUniqueHlc); + if (cleanup != null) + { + switch (cleanup) + { + default: throw new UnhandledEnum(cleanup); + case NO: break; + case EXPUNGE: return null; + case ERASE: return Command.Truncated.erased(txnId); + case INVALIDATE: return Command.Truncated.invalidated(txnId, participants); + case VESTIGIAL: return Command.Truncated.vestigial(txnId, participants); + case TRUNCATE_WITH_OUTCOME: + if (saveStatus.compareTo(SaveStatus.TruncatedApplyWithOutcome) < 0) + saveStatus = SaveStatus.TruncatedApplyWithOutcome; + break; + case TRUNCATE: + if (saveStatus.compareTo(SaveStatus.TruncatedApply) < 0) + saveStatus = saveStatus.known.is(ApplyAtKnown) ? SaveStatus.TruncatedApply : SaveStatus.TruncatedUnapplied; + break; + } + } + Invariants.require(saveStatus != null); switch (saveStatus.status) { @@ -635,7 +651,7 @@ public class CommandChange if (!isChanged(field, flags)) return ""; - return field.name().toLowerCase() + '=' + safeToString(isNull(field, flags), obj); + return ", " + field.name().toLowerCase() + '=' + safeToString(isNull(field, flags), obj); } private static Object safeToString(boolean isNull, Object obj) diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java index af7a2b02..3b6a5694 100644 --- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java +++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java @@ -32,6 +32,7 @@ import accord.local.Node; import accord.local.PreLoadContext; import accord.local.SafeCommand; import accord.local.SafeCommandStore; +import accord.local.StoreParticipants; import accord.primitives.SaveStatus; import accord.primitives.TxnId; import accord.utils.AsymmetricComparator; @@ -39,6 +40,8 @@ import accord.utils.Invariants; import accord.utils.btree.BTree; import accord.utils.btree.BTreeRemoval; +import static accord.local.StoreParticipants.Filter.UPDATE; + // TODO (desired): evict to disk public class DefaultLocalListeners implements LocalListeners { @@ -495,7 +498,7 @@ public class DefaultLocalListeners implements LocalListeners SafeCommand safeCommand = safeStore.unsafeGet(entry); Command command = safeCommand.current(); SaveStatus saveStatus = command.saveStatus(); - Invariants.require(saveStatus.compareTo(entry.await) >= 0 || command.participants().stillTouches().isEmpty()); + Invariants.require(saveStatus.compareTo(entry.await) >= 0 || command.participants().stillOwns().isEmpty()); entry.notify(notifySink, safeStore, safeCommand); }, commandStore.agent()); txnListeners = BTreeRemoval.remove(txnListeners, TxnListeners::compareListeners, entry); diff --git a/accord-core/src/main/java/accord/local/Cleanup.java b/accord-core/src/main/java/accord/local/Cleanup.java index dfa2aa2a..569fa94d 100644 --- a/accord-core/src/main/java/accord/local/Cleanup.java +++ b/accord-core/src/main/java/accord/local/Cleanup.java @@ -215,7 +215,7 @@ public enum Cleanup case ShardUniversal: // TODO (required): consider how we guarantee not to break recovery of other shards if a majority on this shard are PRE_BOOTSTRAP // (if the condition is false and we fall through to removing Outcome) - if (participants.doesStillExecute()) + if (input != FULL || participants.doesStillExecute()) return min.atLeast(truncateWithOutcome(txnId, min)); case MajorityOrInvalidated: @@ -233,23 +233,12 @@ public enum Cleanup private static Cleanup cleanupWithoutFullRoute(Input input, TxnId txnId, SaveStatus saveStatus, StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore) { // TODO (expected): consider if we can truncate more aggressively partial records, although we cannot infer anything from the fact they're undecided - if (input == PARTIAL || saveStatus.hasBeen(Truncated)) + if (input == PARTIAL || saveStatus.hasBeen(Truncated) || saveStatus == Uninitialised) return NO; Invariants.require(!saveStatus.hasBeen(PreCommitted)); - if (participants.route() == null) - { - // we don't want to erase something that we only don't own because it hasn't been initialised - if (saveStatus == Uninitialised) - return NO; - - if (txnId.compareTo(redundantBefore.minShardAndLocallyAppliedBefore()) >= 0) - return NO; - - return vestigial(txnId); - } + boolean isCovering = saveStatus.known.route() == CoveringRoute || txnId.compareTo(redundantBefore.minShardAndLocallyAppliedBefore()) <= 0; - boolean isCovering = saveStatus.known.route() == CoveringRoute; if (isCovering && txnId.isSyncPoint() && participants.owns().isEmpty()) { RedundantStatus redundant = redundantBefore.status(txnId, null, participants.touches()); @@ -257,8 +246,8 @@ public enum Cleanup return vestigial(txnId); } - RedundantStatus redundant = redundantBefore.status(txnId, null, participants.owns()); - return cleanupUndecided(txnId, redundant, isCovering || redundantBefore.minShardAndLocallyAppliedBefore().compareTo(txnId) > 0); + RedundantStatus ownStatus = redundantBefore.status(txnId, null, participants.owns()); + return cleanupUndecided(txnId, ownStatus, isCovering); } private static Cleanup cleanupIfUndecidedWithFullRoute(Input input, TxnId txnId, SaveStatus saveStatus, RedundantStatus redundantStatus) @@ -269,14 +258,14 @@ public enum Cleanup return cleanupUndecided(txnId, redundantStatus, true); } - private static Cleanup cleanupUndecided(TxnId txnId, RedundantStatus redundantStatus, boolean isCoveringRoute) + private static Cleanup cleanupUndecided(TxnId txnId, RedundantStatus ownStatus, boolean isCoveringRoute) { - if (redundantStatus.any(LOCALLY_APPLIED)) + if (ownStatus.any(LOCALLY_APPLIED)) return invalidate(txnId); // TODO (desired): safe to use MAJORITY_APPLIED, LOCALLY_REDUNDANT? // TODO (required): can we guarantee we will always eventually obtain a covering route if others are garbage collecting? - if (isCoveringRoute && redundantStatus.all(SHARD_APPLIED, LOCALLY_REDUNDANT)) + if (isCoveringRoute && ownStatus.all(SHARD_APPLIED, LOCALLY_REDUNDANT)) return vestigial(txnId); return NO; diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 96830653..7c81d2e6 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -599,6 +599,7 @@ public abstract class CommandStore implements SequentialAsyncExecutor final void markBootstrapping(SafeCommandStore safeStore, TxnId globalSyncId, Ranges ranges) { safeStore.setBootstrapBeganAt(bootstrap(globalSyncId, ranges, bootstrapBeganAt)); + safeStore.setSafeToRead(purgeHistory(safeToRead, ranges)); updateMaxConflicts(ranges, globalSyncId); RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, globalSyncId, PRE_BOOTSTRAP_ONLY); safeStore.upsertRedundantBefore(addRedundantBefore); diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index a79f780e..6490e5ef 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -334,7 +334,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ public AsyncResult<?> markDurable(DurableBefore addDurableBefore) { - return withEpochExact(addDurableBefore.maxEpoch(), (Executor)null, () -> persistDurableBefore.mergeAndUpdate(DurableBefore.merge(durableBefore, addDurableBefore))) + return withEpochExact(addDurableBefore.maxEpoch(), (Executor)null, () -> persistDurableBefore.mergeAndUpdate(addDurableBefore)) .beginAsResult(); } diff --git a/accord-core/src/main/java/accord/local/durability/ShardDurability.java b/accord-core/src/main/java/accord/local/durability/ShardDurability.java index 84c065c4..7f0fdf7c 100644 --- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java +++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java @@ -285,8 +285,8 @@ public class ShardDurability logger.info("Increased numberOfSplits to {} for shard {}", currentSplits, shard.range); } long retryDelay = node.agent().retrySyncPointDelay(node, retries, MICROSECONDS); - if (activeRequest != null) - logger.info("Retrying {} for {} in {}s", ranges, activeRequest.requestedBy, String.format("%.2f", retryDelay/1000_000.0)); + if (activeRequest != null) logger.info("Retrying {} for {} in {}s", ranges, activeRequest.requestedBy, String.format("%.2f", retryDelay/1000_000.0)); + else logger.debug("Retrying {} in {}s", ranges, String.format("%.2f", retryDelay/1000_000.0)); scheduled = node.scheduler().selfRecurring(() -> { synchronized (this) { @@ -313,8 +313,8 @@ public class ShardDurability } minHlc = Math.max(minHlc, node.agent().minStaleHlc(node, activeRequest != null)); TxnId staleId = node.nextStaleTxnId(minEpoch, minHlc, ExclusiveSyncPoint, Domain.Range); - if (activeRequest != null) - logger.info("Initiating RX requested by {} for {} with TxnId {}. Remaining: {}.", activeRequest.requestedBy, ranges, staleId, active); + if (activeRequest != null) logger.info("Initiating RX requested by {} for {} with TxnId {}. Remaining: {}.", activeRequest.requestedBy, ranges, staleId, active); + else logger.debug("Initiating RX for durability of {} with TxnId {}.", ranges, staleId); scheduled = node.agent().awaitStaleId(node, staleId, activeIndex >= 0) .flatMap( diff --git a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java index 04de28eb..88043420 100644 --- a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java +++ b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java @@ -29,6 +29,7 @@ import accord.local.SafeCommandStore; import accord.local.StoreParticipants; import accord.primitives.Deps; import accord.primitives.FullRoute; +import accord.primitives.Participants; import accord.primitives.Route; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -72,10 +73,10 @@ public class GetEphemeralReadDeps extends TxnRequest.WithUnsynced<GetEphemeralRe long latestEpoch = Math.max(safeStore.node().epoch(), node.epoch()); // TODO (desired): only return failure if we've participated in a sync point that could migrate coordination to the newer epoch - if (latestEpoch > executionEpoch && safeStore.ranges().removed(executionEpoch, latestEpoch).intersects(scope)) + StoreParticipants participants = StoreParticipants.read(safeStore, scope, txnId, minEpoch, latestEpoch); + if (latestEpoch > executionEpoch && (safeStore.ranges().removed(executionEpoch, latestEpoch).intersects(participants.owns()) || node.topology().hasReplicationMaybeChanged(participants.owns(), executionEpoch))) return new GetEphemeralReadDepsOk(latestEpoch); - StoreParticipants participants = StoreParticipants.read(safeStore, scope, txnId, minEpoch, latestEpoch); Deps deps; ExecuteFlags flags; try (DepsCalculator calculator = new DepsCalculator()) diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index 3df0d750..7071f199 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -46,6 +46,7 @@ import accord.utils.ArrayBuffers; import accord.utils.ArrayBuffers.IntBuffers; import accord.utils.IndexedBiFunction; import accord.utils.IndexedConsumer; +import accord.utils.IndexedFoldToLong; import accord.utils.IndexedIntFunction; import accord.utils.IndexedTriFunction; import accord.utils.SimpleBitSet; diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 4e055eb9..2d990602 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -1240,10 +1240,22 @@ public class TopologyManager public Topologies forEpoch(Unseekables<?> select, long epoch, SelectNodeOwnership selectNodeOwnership) { - EpochState state = epochs.get(epoch); + Epochs snapshot = epochs; + EpochState state = snapshot.get(epoch); + if (state == null) + throw new TopologyRetiredException(epoch, snapshot.minEpoch()); return new Single(sorter, state.global.select(select, selectNodeOwnership)); } + public boolean hasReplicationMaybeChanged(Unseekables<?> select, long sinceEpoch) + { + Epochs snapshot = epochs; + if (snapshot.minEpoch() > sinceEpoch) + return true; + + return atLeast(select, sinceEpoch, Long.MAX_VALUE, ignore -> Ranges.EMPTY, HasChangedReplication.INSTANCE); + } + public Topologies forEpochAtLeast(Unseekables<?> select, long epoch, SelectNodeOwnership selectNodeOwnership) { Epochs snapshot = this.epochs; @@ -1488,6 +1500,51 @@ public class TopologyManager } } + static class ReplicationChangeTracker + { + int rf = -1; + boolean hasChanged; + } + static class HasChangedReplication implements TopologyManager.Collectors<ReplicationChangeTracker, Unseekables<?>, Boolean> + { + static final HasChangedReplication INSTANCE = new HasChangedReplication(); + + @Override + public ReplicationChangeTracker allocate(int size) + { + return new ReplicationChangeTracker(); + } + + @Override + public Boolean none() + { + return false; + } + + @Override + public Boolean multi(ReplicationChangeTracker collector) + { + return collector.hasChanged; + } + + @Override + public Boolean one(TopologyManager.EpochState epoch, Unseekables<?> select, boolean permitMissing) + { + return false; + } + + @Override + public ReplicationChangeTracker update(ReplicationChangeTracker collector, TopologyManager.EpochState epoch, Unseekables<?> select, boolean permitMissing) + { + epoch.global.foldl(select, (shard, c, i1) -> { + if (c.rf < 0) c.rf = shard.rf; + else c.hasChanged |= c.rf != shard.rf; + return c; + }, collector); + return collector; + } + } + public interface Collectors<C, K, T> { C allocate(int size); diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index 618f121d..220d61f4 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.TreeMap; import javax.annotation.Nonnull; @@ -147,12 +148,7 @@ public class InMemoryJournal implements Journal if (builder == null) return null; - Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore, durableBefore); - switch (cleanup) - { - case ERASE: return Command.Truncated.erased(txnId); - case EXPUNGE: return null; - } + builder.maybeCleanup(true, FULL, redundantBefore, durableBefore); return builder.construct(redundantBefore); } @@ -527,7 +523,7 @@ public class InMemoryJournal implements Journal Input input = isPartialCompaction ? PARTIAL : FULL; ++counter; Cleanup cleanup = builder.shouldCleanup(input, store.unsafeGetRedundantBefore(), store.durableBefore()); - cleanup = builder.maybeCleanup(true, input, cleanup); + cleanup = builder.maybeCleanup(true, cleanup); if (cleanup != NO) { if (cleanup == EXPUNGE) @@ -584,11 +580,16 @@ public class InMemoryJournal implements Journal } } + Builder before = reconstruct(diffs, ALL); + before.maybeCleanup(true, FULL, store.unsafeGetRedundantBefore(), store.durableBefore()); diffs.size -= removeCount; diffs.flushed.removeAll(subset.flushed); diffs.files.removeAll(subset.files); diffs.files.add(new DiffFile(sorted)); diffs.sorted = null; + Builder after = reconstruct(diffs, ALL); + after.maybeCleanup(true, FULL, store.unsafeGetRedundantBefore(), store.durableBefore()); + Invariants.require(Objects.equals(before.construct(store.unsafeGetRedundantBefore()), after.construct(store.unsafeGetRedundantBefore()))); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org