This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 555337a7 Fix:  - EphemeralReads should retry in a later epoch if 
replication factor changes  - Fix no local epoch for NotAccept  - Invalidate a 
command with no route but for which we know we own some key that has applied 
locally  - Don't pre-merge existing DurableBefore, to reduce 
duplicate/redundant persistence  - Misc purging bugs and improve testing of 
purging
555337a7 is described below

commit 555337a7d41158f74033818facf94fed6904bf5a
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Fri Jun 27 11:19:52 2025 +0100

    Fix:
     - EphemeralReads should retry in a later epoch if replication factor 
changes
     - Fix no local epoch for NotAccept
     - Invalidate a command with no route but for which we know we own some key 
that has applied locally
     - Don't pre-merge existing DurableBefore, to reduce duplicate/redundant 
persistence
     - Misc purging bugs and improve testing of purging
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20739
---
 .../java/accord/coordinate/ExecuteSyncPoint.java   |  2 +-
 .../src/main/java/accord/coordinate/Propose.java   | 27 ++++++----
 .../src/main/java/accord/impl/CommandChange.java   | 58 +++++++++++++--------
 .../java/accord/impl/DefaultLocalListeners.java    |  5 +-
 .../src/main/java/accord/local/Cleanup.java        | 27 +++-------
 .../src/main/java/accord/local/CommandStore.java   |  1 +
 accord-core/src/main/java/accord/local/Node.java   |  2 +-
 .../accord/local/durability/ShardDurability.java   |  8 +--
 .../java/accord/messages/GetEphemeralReadDeps.java |  5 +-
 .../src/main/java/accord/topology/Topology.java    |  1 +
 .../main/java/accord/topology/TopologyManager.java | 59 +++++++++++++++++++++-
 .../java/accord/impl/basic/InMemoryJournal.java    | 15 +++---
 12 files changed, 143 insertions(+), 67 deletions(-)

diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index 5b6f00aa..1cdc781d 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -184,7 +184,7 @@ public class ExecuteSyncPoint extends 
SettableResult<DurabilityResult> implement
         }
         else
         {
-            DurabilityResult result = new DurabilityResult(syncPoint, 
tracker.achievedLocal(node.id()), tracker.achievedRemote(), tracker.failures(), 
null);
+            DurabilityResult result = current();
             if (result.achievedRemote == SyncRemote.All)
             {
                 
node.configService().reportEpochRetired(syncPoint.route.toRanges(), 
syncPoint.syncId.epoch() - 1);
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java 
b/accord-core/src/main/java/accord/coordinate/Propose.java
index ccefb3d4..26917132 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -44,6 +44,7 @@ import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
+import accord.topology.TopologyManager;
 import accord.utils.Invariants;
 import accord.utils.SortedArrays;
 import accord.utils.SortedListMap;
@@ -259,23 +260,29 @@ abstract class Propose<R> implements Callback<AcceptReply>
             this.debug = debug() ? new SortedListMap<>(topologies.nodes(), 
AcceptReply[]::new) : null;
         }
 
-        public static NotAccept proposeInvalidate(Node node, 
SequentialAsyncExecutor executor, Ballot ballot, TxnId txnId, RoutingKey 
invalidateWithParticipant, BiConsumer<Void, Throwable> callback)
+        public static void proposeInvalidate(Node node, 
SequentialAsyncExecutor executor, Ballot ballot, TxnId txnId, RoutingKey 
invalidateWithParticipant, BiConsumer<Void, Throwable> callback)
         {
-            return proposeNotAccept(node, executor, AcceptedInvalidate, 
ballot, txnId, invalidateWithParticipant, callback);
+            proposeNotAccept(node, executor, AcceptedInvalidate, ballot, 
txnId, invalidateWithParticipant, callback);
         }
 
-        public static NotAccept proposeNotAccept(Node node, 
SequentialAsyncExecutor executor, Status status, Ballot ballot, TxnId txnId, 
RoutingKey participatingKey, BiConsumer<Void, Throwable> callback)
+        public static void proposeNotAccept(Node node, SequentialAsyncExecutor 
executor, Status status, Ballot ballot, TxnId txnId, RoutingKey 
participatingKey, BiConsumer<Void, Throwable> callback)
         {
-            Participants<?> participants = 
Participants.singleton(txnId.domain(), participatingKey);
-            Topologies topologies = node.topology().forEpoch(participants, 
txnId.epoch(), SHARE);
-            NotAccept notAccept = new NotAccept(node, status, topologies, 
ballot, txnId, participants, callback);
-            node.send(topologies.nodes(), to -> new Accept.NotAccept(status, 
ballot, txnId, participants), executor, notAccept);
-            return notAccept;
+            try
+            {
+                Participants<?> participants = 
Participants.singleton(txnId.domain(), participatingKey);
+                Topologies topologies = node.topology().forEpoch(participants, 
txnId.epoch(), SHARE);
+                NotAccept notAccept = new NotAccept(node, status, topologies, 
ballot, txnId, participants, callback);
+                node.send(topologies.nodes(), to -> new 
Accept.NotAccept(status, ballot, txnId, participants), executor, notAccept);
+            }
+            catch (Throwable t)
+            {
+                callback.accept(null, t);
+            }
         }
 
-        public static NotAccept proposeAndCommitInvalidate(Node node, 
SequentialAsyncExecutor executor, Ballot ballot, TxnId txnId, RoutingKey 
invalidateWithParticipant, Route<?> commitInvalidationTo, Timestamp 
invalidateUntil, BiConsumer<?, Throwable> callback)
+        public static void proposeAndCommitInvalidate(Node node, 
SequentialAsyncExecutor executor, Ballot ballot, TxnId txnId, RoutingKey 
invalidateWithParticipant, Route<?> commitInvalidationTo, Timestamp 
invalidateUntil, BiConsumer<?, Throwable> callback)
         {
-            return proposeInvalidate(node, executor, ballot, txnId, 
invalidateWithParticipant, (success, fail) -> {
+            proposeInvalidate(node, executor, ballot, txnId, 
invalidateWithParticipant, (success, fail) -> {
                 if (fail != null)
                 {
                     callback.accept(null, fail);
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java 
b/accord-core/src/main/java/accord/impl/CommandChange.java
index a7ccfce1..b10b98b1 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -336,6 +336,7 @@ public class CommandChange
             {
                 if (saveStatus == null)
                     return EXPUNGE;
+                
                 if (participants != null)
                     participants = participants.filter(LOAD, redundantBefore, 
txnId, saveStatus.known.isExecuteAtKnown() ? executeAt : null);
             }
@@ -348,28 +349,21 @@ public class CommandChange
         public Cleanup maybeCleanup(boolean clearFields, Input input, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
         {
             Cleanup cleanup = shouldCleanup(input, redundantBefore, 
durableBefore);
-            return maybeCleanup(clearFields, input, cleanup);
+            return maybeCleanup(clearFields, cleanup);
         }
 
-        public Cleanup maybeCleanup(boolean clearFields, Input input, Cleanup 
cleanup)
+        public Cleanup maybeCleanup(boolean clearFields, Cleanup cleanup)
         {
-            if (cleanup == NO || cleanup == EXPUNGE)
+            if (cleanup == NO)
                 return cleanup;
 
-            SaveStatus newSaveStatus = cleanup.newStatus;
-            if (saveStatus == null || saveStatus.compareTo(newSaveStatus) < 0)
+            forceSetNulls(clearFields, 
eraseKnownFieldsMask[cleanup.newStatus.ordinal()]);
+            if (this.cleanup == null || this.cleanup.compareTo(cleanup) < 0)
             {
-                if (input == Input.FULL)
-                {
-                    // TODO (expected): this special-casing should be declared 
in Cleanup
-                    if (newSaveStatus == SaveStatus.TruncatedApply && 
(saveStatus == null || !saveStatus.known.is(ApplyAtKnown)))
-                        newSaveStatus = SaveStatus.TruncatedUnapplied;
-                    saveStatus = newSaveStatus;
-                }
-                forceSetNulls(clearFields, 
eraseKnownFieldsMask[newSaveStatus.ordinal()]);
+                this.hasUpdate = true;
+                this.flags |= setChanged(CLEANUP);
+                this.cleanup = cleanup;
             }
-
-            this.cleanup = cleanup;
             return cleanup;
         }
 
@@ -462,7 +456,7 @@ public class CommandChange
         // that is, if we cleared a non-null field or if we are already 
mask-only
         public boolean clearSuperseded(boolean clearFields, Builder 
superseding)
         {
-            int unset = flags & setFieldsMask(superseding.flags);
+            int unset = flags & setFieldsMask(superseding.flags & 
~setChanged(CLEANUP));
             if (notNulls(unset) == 0 && notNulls(flags) != 0)
                 return false;
 
@@ -478,13 +472,14 @@ public class CommandChange
                 return false;
 
             if (cleanup != null && addCleanup.compareTo(cleanup) <= 0)
+            {
+                Invariants.require(isChanged(CLEANUP, flags));
                 return false;
+            }
 
             hasUpdate = true;
             cleanup = addCleanup;
             flags |= setChanged(CLEANUP);
-            if (!cleanup.appliesTo(saveStatus))
-                return false;
             return forceSetNulls(clearFields, 
eraseKnownFieldsMask[cleanup.newStatus.ordinal()]);
         }
 
@@ -546,8 +541,8 @@ public class CommandChange
                 return null;
 
             Invariants.require(txnId != null);
-            if (participants == null) participants = 
StoreParticipants.empty(txnId);
-            else participants = participants.filter(LOAD, redundantBefore, 
txnId, saveStatus.known.isExecuteAtKnown() ? executeAt : null);
+            if (participants != null)
+                participants = participants.filter(LOAD, redundantBefore, 
txnId, saveStatus != null && saveStatus.known.isExecuteAtKnown() ? executeAt : 
null);
 
             if (durability == null)
                 durability = NotDurable;
@@ -556,6 +551,27 @@ public class CommandChange
             if (this.waitingOn != null)
                 waitingOn = this.waitingOn.provide(txnId, partialDeps, 
executesAtLeast, minUniqueHlc);
 
+            if (cleanup != null)
+            {
+                switch (cleanup)
+                {
+                    default: throw new UnhandledEnum(cleanup);
+                    case NO: break;
+                    case EXPUNGE: return null;
+                    case ERASE: return Command.Truncated.erased(txnId);
+                    case INVALIDATE: return 
Command.Truncated.invalidated(txnId, participants);
+                    case VESTIGIAL: return Command.Truncated.vestigial(txnId, 
participants);
+                    case TRUNCATE_WITH_OUTCOME:
+                        if 
(saveStatus.compareTo(SaveStatus.TruncatedApplyWithOutcome) < 0)
+                            saveStatus = SaveStatus.TruncatedApplyWithOutcome;
+                        break;
+                    case TRUNCATE:
+                        if (saveStatus.compareTo(SaveStatus.TruncatedApply) < 
0)
+                            saveStatus = saveStatus.known.is(ApplyAtKnown) ? 
SaveStatus.TruncatedApply : SaveStatus.TruncatedUnapplied;
+                        break;
+                }
+            }
+
             Invariants.require(saveStatus != null);
             switch (saveStatus.status)
             {
@@ -635,7 +651,7 @@ public class CommandChange
             if (!isChanged(field, flags))
                 return "";
 
-            return field.name().toLowerCase() + '=' + 
safeToString(isNull(field, flags), obj);
+            return ", " + field.name().toLowerCase() + '=' + 
safeToString(isNull(field, flags), obj);
         }
 
         private static Object safeToString(boolean isNull, Object obj)
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index af7a2b02..3b6a5694 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -32,6 +32,7 @@ import accord.local.Node;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
+import accord.local.StoreParticipants;
 import accord.primitives.SaveStatus;
 import accord.primitives.TxnId;
 import accord.utils.AsymmetricComparator;
@@ -39,6 +40,8 @@ import accord.utils.Invariants;
 import accord.utils.btree.BTree;
 import accord.utils.btree.BTreeRemoval;
 
+import static accord.local.StoreParticipants.Filter.UPDATE;
+
 // TODO (desired): evict to disk
 public class DefaultLocalListeners implements LocalListeners
 {
@@ -495,7 +498,7 @@ public class DefaultLocalListeners implements LocalListeners
                 SafeCommand safeCommand = safeStore.unsafeGet(entry);
                 Command command = safeCommand.current();
                 SaveStatus saveStatus = command.saveStatus();
-                Invariants.require(saveStatus.compareTo(entry.await) >= 0 || 
command.participants().stillTouches().isEmpty());
+                Invariants.require(saveStatus.compareTo(entry.await) >= 0 || 
command.participants().stillOwns().isEmpty());
                 entry.notify(notifySink, safeStore, safeCommand);
             }, commandStore.agent());
             txnListeners = BTreeRemoval.remove(txnListeners, 
TxnListeners::compareListeners, entry);
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index dfa2aa2a..569fa94d 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -215,7 +215,7 @@ public enum Cleanup
             case ShardUniversal:
                 // TODO (required): consider how we guarantee not to break 
recovery of other shards if a majority on this shard are PRE_BOOTSTRAP
                 //   (if the condition is false and we fall through to 
removing Outcome)
-                if (participants.doesStillExecute())
+                if (input != FULL || participants.doesStillExecute())
                     return min.atLeast(truncateWithOutcome(txnId, min));
 
             case MajorityOrInvalidated:
@@ -233,23 +233,12 @@ public enum Cleanup
     private static Cleanup cleanupWithoutFullRoute(Input input, TxnId txnId, 
SaveStatus saveStatus, StoreParticipants participants, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
     {
         // TODO (expected): consider if we can truncate more aggressively 
partial records, although we cannot infer anything from the fact they're 
undecided
-        if (input == PARTIAL || saveStatus.hasBeen(Truncated))
+        if (input == PARTIAL || saveStatus.hasBeen(Truncated) || saveStatus == 
Uninitialised)
             return NO;
 
         Invariants.require(!saveStatus.hasBeen(PreCommitted));
-        if (participants.route() == null)
-        {
-            // we don't want to erase something that we only don't own because 
it hasn't been initialised
-            if (saveStatus == Uninitialised)
-                return NO;
-
-            if 
(txnId.compareTo(redundantBefore.minShardAndLocallyAppliedBefore()) >= 0)
-                return NO;
-
-            return vestigial(txnId);
-        }
+        boolean isCovering = saveStatus.known.route() == CoveringRoute || 
txnId.compareTo(redundantBefore.minShardAndLocallyAppliedBefore()) <= 0;
 
-        boolean isCovering = saveStatus.known.route() == CoveringRoute;
         if (isCovering && txnId.isSyncPoint() && participants.owns().isEmpty())
         {
             RedundantStatus redundant = redundantBefore.status(txnId, null, 
participants.touches());
@@ -257,8 +246,8 @@ public enum Cleanup
                 return vestigial(txnId);
         }
 
-        RedundantStatus redundant = redundantBefore.status(txnId, null, 
participants.owns());
-        return cleanupUndecided(txnId, redundant, isCovering || 
redundantBefore.minShardAndLocallyAppliedBefore().compareTo(txnId) > 0);
+        RedundantStatus ownStatus = redundantBefore.status(txnId, null, 
participants.owns());
+        return cleanupUndecided(txnId, ownStatus, isCovering);
     }
 
     private static Cleanup cleanupIfUndecidedWithFullRoute(Input input, TxnId 
txnId, SaveStatus saveStatus, RedundantStatus redundantStatus)
@@ -269,14 +258,14 @@ public enum Cleanup
         return cleanupUndecided(txnId, redundantStatus, true);
     }
 
-    private static Cleanup cleanupUndecided(TxnId txnId, RedundantStatus 
redundantStatus, boolean isCoveringRoute)
+    private static Cleanup cleanupUndecided(TxnId txnId, RedundantStatus 
ownStatus, boolean isCoveringRoute)
     {
-        if (redundantStatus.any(LOCALLY_APPLIED))
+        if (ownStatus.any(LOCALLY_APPLIED))
             return invalidate(txnId);
 
         // TODO (desired): safe to use MAJORITY_APPLIED, LOCALLY_REDUNDANT?
         // TODO (required): can we guarantee we will always eventually obtain 
a covering route if others are garbage collecting?
-        if (isCoveringRoute && redundantStatus.all(SHARD_APPLIED, 
LOCALLY_REDUNDANT))
+        if (isCoveringRoute && ownStatus.all(SHARD_APPLIED, LOCALLY_REDUNDANT))
             return vestigial(txnId);
 
         return NO;
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 96830653..7c81d2e6 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -599,6 +599,7 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
     final void markBootstrapping(SafeCommandStore safeStore, TxnId 
globalSyncId, Ranges ranges)
     {
         safeStore.setBootstrapBeganAt(bootstrap(globalSyncId, ranges, 
bootstrapBeganAt));
+        safeStore.setSafeToRead(purgeHistory(safeToRead, ranges));
         updateMaxConflicts(ranges, globalSyncId);
         RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, globalSyncId, PRE_BOOTSTRAP_ONLY);
         safeStore.upsertRedundantBefore(addRedundantBefore);
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index a79f780e..6490e5ef 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -334,7 +334,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
 
     public AsyncResult<?> markDurable(DurableBefore addDurableBefore)
     {
-        return withEpochExact(addDurableBefore.maxEpoch(), (Executor)null, () 
-> persistDurableBefore.mergeAndUpdate(DurableBefore.merge(durableBefore, 
addDurableBefore)))
+        return withEpochExact(addDurableBefore.maxEpoch(), (Executor)null, () 
-> persistDurableBefore.mergeAndUpdate(addDurableBefore))
                .beginAsResult();
     }
 
diff --git 
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java 
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index 84c065c4..7f0fdf7c 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -285,8 +285,8 @@ public class ShardDurability
                 logger.info("Increased numberOfSplits to {} for shard {}", 
currentSplits, shard.range);
             }
             long retryDelay = node.agent().retrySyncPointDelay(node, retries, 
MICROSECONDS);
-            if (activeRequest != null)
-                logger.info("Retrying {} for {} in {}s", ranges, 
activeRequest.requestedBy, String.format("%.2f", retryDelay/1000_000.0));
+            if (activeRequest != null) logger.info("Retrying {} for {} in 
{}s", ranges, activeRequest.requestedBy, String.format("%.2f", 
retryDelay/1000_000.0));
+            else logger.debug("Retrying {} in {}s", ranges, 
String.format("%.2f", retryDelay/1000_000.0));
             scheduled = node.scheduler().selfRecurring(() -> {
                 synchronized (this)
                 {
@@ -313,8 +313,8 @@ public class ShardDurability
             }
             minHlc = Math.max(minHlc, node.agent().minStaleHlc(node, 
activeRequest != null));
             TxnId staleId = node.nextStaleTxnId(minEpoch, minHlc, 
ExclusiveSyncPoint, Domain.Range);
-            if (activeRequest != null)
-                logger.info("Initiating RX requested by {} for {} with TxnId 
{}. Remaining: {}.", activeRequest.requestedBy, ranges, staleId, active);
+            if (activeRequest != null) logger.info("Initiating RX requested by 
{} for {} with TxnId {}. Remaining: {}.", activeRequest.requestedBy, ranges, 
staleId, active);
+            else logger.debug("Initiating RX for durability of {} with TxnId 
{}.", ranges, staleId);
 
             scheduled = node.agent().awaitStaleId(node, staleId, activeIndex 
>= 0)
                             .flatMap(
diff --git 
a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java 
b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
index 04de28eb..88043420 100644
--- a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
@@ -29,6 +29,7 @@ import accord.local.SafeCommandStore;
 import accord.local.StoreParticipants;
 import accord.primitives.Deps;
 import accord.primitives.FullRoute;
+import accord.primitives.Participants;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -72,10 +73,10 @@ public class GetEphemeralReadDeps extends 
TxnRequest.WithUnsynced<GetEphemeralRe
         long latestEpoch = Math.max(safeStore.node().epoch(), node.epoch());
 
         // TODO (desired): only return failure if we've participated in a sync 
point that could migrate coordination to the newer epoch
-        if (latestEpoch > executionEpoch && 
safeStore.ranges().removed(executionEpoch, latestEpoch).intersects(scope))
+        StoreParticipants participants = StoreParticipants.read(safeStore, 
scope, txnId, minEpoch, latestEpoch);
+        if (latestEpoch > executionEpoch && 
(safeStore.ranges().removed(executionEpoch, 
latestEpoch).intersects(participants.owns()) || 
node.topology().hasReplicationMaybeChanged(participants.owns(), 
executionEpoch)))
             return new GetEphemeralReadDepsOk(latestEpoch);
 
-        StoreParticipants participants = StoreParticipants.read(safeStore, 
scope, txnId, minEpoch, latestEpoch);
         Deps deps;
         ExecuteFlags flags;
         try (DepsCalculator calculator = new DepsCalculator())
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index 3df0d750..7071f199 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -46,6 +46,7 @@ import accord.utils.ArrayBuffers;
 import accord.utils.ArrayBuffers.IntBuffers;
 import accord.utils.IndexedBiFunction;
 import accord.utils.IndexedConsumer;
+import accord.utils.IndexedFoldToLong;
 import accord.utils.IndexedIntFunction;
 import accord.utils.IndexedTriFunction;
 import accord.utils.SimpleBitSet;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 4e055eb9..2d990602 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -1240,10 +1240,22 @@ public class TopologyManager
 
     public Topologies forEpoch(Unseekables<?> select, long epoch, 
SelectNodeOwnership selectNodeOwnership)
     {
-        EpochState state = epochs.get(epoch);
+        Epochs snapshot = epochs;
+        EpochState state = snapshot.get(epoch);
+        if (state == null)
+            throw new TopologyRetiredException(epoch, snapshot.minEpoch());
         return new Single(sorter, state.global.select(select, 
selectNodeOwnership));
     }
 
+    public boolean hasReplicationMaybeChanged(Unseekables<?> select, long 
sinceEpoch)
+    {
+        Epochs snapshot = epochs;
+        if (snapshot.minEpoch() > sinceEpoch)
+            return true;
+
+        return atLeast(select, sinceEpoch, Long.MAX_VALUE, ignore -> 
Ranges.EMPTY, HasChangedReplication.INSTANCE);
+    }
+
     public Topologies forEpochAtLeast(Unseekables<?> select, long epoch, 
SelectNodeOwnership selectNodeOwnership)
     {
         Epochs snapshot = this.epochs;
@@ -1488,6 +1500,51 @@ public class TopologyManager
         }
     }
 
+    static class ReplicationChangeTracker
+    {
+        int rf = -1;
+        boolean hasChanged;
+    }
+    static class HasChangedReplication implements 
TopologyManager.Collectors<ReplicationChangeTracker, Unseekables<?>, Boolean>
+    {
+        static final HasChangedReplication INSTANCE = new 
HasChangedReplication();
+
+        @Override
+        public ReplicationChangeTracker allocate(int size)
+        {
+            return new ReplicationChangeTracker();
+        }
+
+        @Override
+        public Boolean none()
+        {
+            return false;
+        }
+
+        @Override
+        public Boolean multi(ReplicationChangeTracker collector)
+        {
+            return collector.hasChanged;
+        }
+
+        @Override
+        public Boolean one(TopologyManager.EpochState epoch, Unseekables<?> 
select, boolean permitMissing)
+        {
+            return false;
+        }
+
+        @Override
+        public ReplicationChangeTracker update(ReplicationChangeTracker 
collector, TopologyManager.EpochState epoch, Unseekables<?> select, boolean 
permitMissing)
+        {
+            epoch.global.foldl(select, (shard, c, i1) -> {
+                if (c.rf < 0) c.rf = shard.rf;
+                else c.hasChanged |= c.rf != shard.rf;
+                return c;
+            }, collector);
+            return collector;
+        }
+    }
+
     public interface Collectors<C, K, T>
     {
         C allocate(int size);
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 618f121d..220d61f4 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Objects;
 import java.util.TreeMap;
 
 import javax.annotation.Nonnull;
@@ -147,12 +148,7 @@ public class InMemoryJournal implements Journal
         if (builder == null)
             return null;
 
-        Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore, 
durableBefore);
-        switch (cleanup)
-        {
-            case ERASE: return Command.Truncated.erased(txnId);
-            case EXPUNGE: return null;
-        }
+        builder.maybeCleanup(true, FULL, redundantBefore, durableBefore);
         return builder.construct(redundantBefore);
     }
 
@@ -527,7 +523,7 @@ public class InMemoryJournal implements Journal
                 Input input = isPartialCompaction ? PARTIAL : FULL;
                 ++counter;
                 Cleanup cleanup = builder.shouldCleanup(input, 
store.unsafeGetRedundantBefore(), store.durableBefore());
-                cleanup = builder.maybeCleanup(true, input, cleanup);
+                cleanup = builder.maybeCleanup(true, cleanup);
                 if (cleanup != NO)
                 {
                     if (cleanup == EXPUNGE)
@@ -584,11 +580,16 @@ public class InMemoryJournal implements Journal
                     }
                 }
 
+                Builder before = reconstruct(diffs, ALL);
+                before.maybeCleanup(true, FULL, 
store.unsafeGetRedundantBefore(), store.durableBefore());
                 diffs.size -= removeCount;
                 diffs.flushed.removeAll(subset.flushed);
                 diffs.files.removeAll(subset.files);
                 diffs.files.add(new DiffFile(sorted));
                 diffs.sorted = null;
+                Builder after = reconstruct(diffs, ALL);
+                after.maybeCleanup(true, FULL, 
store.unsafeGetRedundantBefore(), store.durableBefore());
+                
Invariants.require(Objects.equals(before.construct(store.unsafeGetRedundantBefore()),
 after.construct(store.unsafeGetRedundantBefore())));
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to