This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a5446a3 Fix:  - StoreParticipants.touches behaviour for RX was 
erroneously modified; should touch all non-redundant ranges including those no 
longer owned  - Support improved partial compaction  - SetShardDurable should 
correctly set DurableBefore Majority/Universal based on the Durability 
parameter  - fix erroneous prunedBefore invariant  - unset superseding rather 
than set null, so can omit empty  - Don't save updates to ERASED  - Simplify 
CommandChange.getFlags  - fix handlin [...]
0a5446a3 is described below

commit 0a5446a365fe7eb1a15b6a2d0d72f9475c51bc47
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Mar 13 12:39:09 2025 +0000

    Fix:
     - StoreParticipants.touches behaviour for RX was erroneously modified; 
should touch all non-redundant ranges including those no longer owned
     - Support improved partial compaction
     - SetShardDurable should correctly set DurableBefore Majority/Universal 
based on the Durability parameter
     - fix erroneous prunedBefore invariant
     - unset superseding rather than set null, so can omit empty
     - Don't save updates to ERASED
     - Simplify CommandChange.getFlags
     - fix handling of Durability for Invalidated
     - Don't use ApplyAt for GC_BEFORE with partial input, as might be a 
saveStatus >= ApplyAtKnown but with executeAt < ApplyAtKnown
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20441
---
 accord-core/src/main/java/accord/api/Journal.java  |   3 +
 .../src/main/java/accord/impl/AbstractLoader.java  |  18 +-
 .../src/main/java/accord/impl/CommandChange.java   | 354 +++++++++++++++------
 .../java/accord/impl/InMemoryCommandStore.java     |   2 +-
 .../impl/progresslog/DefaultProgressLog.java       |   3 +-
 .../src/main/java/accord/local/Cleanup.java        | 114 ++++---
 .../src/main/java/accord/local/CommandStore.java   |   4 +-
 .../src/main/java/accord/local/Commands.java       |   7 +-
 .../main/java/accord/local/RedundantBefore.java    |  29 +-
 .../main/java/accord/local/StoreParticipants.java  |   5 +-
 .../main/java/accord/local/cfk/CommandsForKey.java |   2 +-
 .../main/java/accord/messages/SetShardDurable.java |   5 +-
 .../src/main/java/accord/primitives/Known.java     |   6 +
 .../src/main/java/accord/utils/Invariants.java     |  12 +
 accord-core/src/test/java/accord/Utils.java        |   2 +-
 .../src/test/java/accord/burn/BurnTestBase.java    |   4 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |  10 +-
 .../java/accord/impl/basic/InMemoryJournal.java    | 190 +++++++----
 .../java/accord/impl/basic/LoggingJournal.java     |   8 +
 .../test/java/accord/impl/mock/MockCluster.java    |   3 +-
 .../java/accord/local/ImmutableCommandTest.java    |   2 +-
 .../src/main/java/accord/maelstrom/Cluster.java    |   1 +
 22 files changed, 538 insertions(+), 246 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index 00e3ffb4..a1246f76 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import accord.local.Command;
 import accord.local.CommandStores;
 import accord.local.DurableBefore;
+import accord.local.Node;
 import accord.local.RedundantBefore;
 import accord.primitives.EpochSupplier;
 import accord.primitives.Ranges;
@@ -42,6 +43,8 @@ import org.agrona.collections.Int2ObjectHashMap;
  */
 public interface Journal
 {
+    Journal start(Node node);
+
     Command loadCommand(int store, TxnId txnId, RedundantBefore 
redundantBefore, DurableBefore durableBefore);
     Command.Minimal loadMinimal(int store, TxnId txnId, Load load, 
RedundantBefore redundantBefore, DurableBefore durableBefore);
 
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java 
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
index c55ac6c8..9020b96a 100644
--- a/accord-core/src/main/java/accord/impl/AbstractLoader.java
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -31,7 +31,6 @@ import accord.primitives.TxnId;
 
 import static accord.local.Cleanup.Input.FULL;
 import static accord.primitives.SaveStatus.PreApplied;
-import static accord.primitives.Status.Invalidated;
 import static accord.primitives.Status.Stable;
 import static accord.primitives.Status.Truncated;
 import static accord.primitives.Txn.Kind.Write;
@@ -41,20 +40,9 @@ public abstract class AbstractLoader implements 
Journal.Loader
     protected Command loadInternal(Command command, SafeCommandStore safeStore)
     {
         TxnId txnId = command.txnId();
-        if (command.status() != Truncated && command.status() != Invalidated)
-        {
-            Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command, 
command.participants());
-            switch (cleanup)
-            {
-                case NO:
-                    break;
-                case INVALIDATE:
-                case TRUNCATE_WITH_OUTCOME:
-                case TRUNCATE:
-                case ERASE:
-                    command = Commands.purge(safeStore, command, cleanup);
-            }
-        }
+        Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command, 
command.participants());
+        if (cleanup != Cleanup.NO)
+            command = Commands.purge(safeStore, command, cleanup);
 
         return safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
     }
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java 
b/accord-core/src/main/java/accord/impl/CommandChange.java
index 51c56332..c45dd411 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -18,14 +18,15 @@
 
 package accord.impl;
 
+import java.util.Objects;
 import java.util.function.BiPredicate;
-import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.function.ToLongFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import accord.api.Agent;
 import accord.api.Result;
 import accord.local.Cleanup;
 import accord.local.Cleanup.Input;
@@ -90,18 +91,18 @@ public class CommandChange
     {
         PARTICIPANTS, // stored first so we can index it
         SAVE_STATUS,
-        PARTIAL_DEPS,
-        EXECUTE_AT,
-        EXECUTES_AT_LEAST,
-        MIN_UNIQUE_HLC,
         DURABILITY,
-        ACCEPTED,
+        EXECUTE_AT,
         PROMISED,
-        WAITING_ON,
+        ACCEPTED,
         PARTIAL_TXN,
+        PARTIAL_DEPS,
+        WAITING_ON,
+        MIN_UNIQUE_HLC,
+        EXECUTES_AT_LEAST,
         WRITES,
-        CLEANUP,
         RESULT,
+        CLEANUP,
         ;
 
         public static final Field[] FIELDS = values();
@@ -160,25 +161,27 @@ public class CommandChange
 
         protected TxnId txnId;
 
-        protected Timestamp executeAt;
-        protected Timestamp executesAtLeast;
-        protected long minUniqueHlc;
+        protected StoreParticipants participants;
         protected SaveStatus saveStatus;
         protected Durability durability;
+        protected Timestamp executeAt;
 
-        protected Ballot acceptedOrCommitted;
         protected Ballot promised;
+        protected Ballot acceptedOrCommitted;
 
-        protected StoreParticipants participants;
         protected PartialTxn partialTxn;
         protected PartialDeps partialDeps;
 
         protected CommandChange.WaitingOnProvider waitingOn;
+        protected long minUniqueHlc;
+        protected Timestamp executesAtLeast;
+
         protected Writes writes;
         protected Result result;
+
         protected Cleanup cleanup;
 
-        protected boolean nextCalled;
+        protected boolean hasUpdate;
         protected int count;
 
         public Builder(TxnId txnId, Load load)
@@ -226,17 +229,17 @@ public class CommandChange
         {
             switch (field)
             {
-                case EXECUTE_AT: return executeAt;
-                case EXECUTES_AT_LEAST: return executesAtLeast;
-                case MIN_UNIQUE_HLC: return minUniqueHlc;
+                case PARTICIPANTS: return participants;
                 case SAVE_STATUS: return saveStatus;
                 case DURABILITY: return durability;
-                case ACCEPTED: return acceptedOrCommitted;
+                case EXECUTE_AT: return executeAt;
                 case PROMISED: return promised;
-                case PARTICIPANTS: return participants;
+                case ACCEPTED: return acceptedOrCommitted;
                 case PARTIAL_TXN: return partialTxn;
                 case PARTIAL_DEPS: return partialDeps;
                 case WAITING_ON: return waitingOn;
+                case MIN_UNIQUE_HLC: return minUniqueHlc;
+                case EXECUTES_AT_LEAST: return executesAtLeast;
                 case WRITES: return writes;
                 case RESULT: return result;
                 default: throw new UnhandledEnum(field);
@@ -248,25 +251,26 @@ public class CommandChange
             flags = 0;
             txnId = null;
 
-            executeAt = null;
-            executesAtLeast = null;
-            minUniqueHlc = 0;
+            participants = null;
             saveStatus = null;
             durability = null;
+            executeAt = null;
 
-            acceptedOrCommitted = null;
             promised = null;
+            acceptedOrCommitted = null;
 
-            participants = null;
             partialTxn = null;
             partialDeps = null;
 
             waitingOn = null;
+            minUniqueHlc = 0;
+            executesAtLeast = null;
+
             writes = null;
             result = null;
             cleanup = null;
 
-            nextCalled = false;
+            hasUpdate = false;
             count = 0;
         }
 
@@ -287,7 +291,12 @@ public class CommandChange
 
         public boolean isEmpty()
         {
-            return !nextCalled;
+            return !hasUpdate;
+        }
+
+        public int flags()
+        {
+            return flags;
         }
 
         public int count()
@@ -295,31 +304,38 @@ public class CommandChange
             return count;
         }
 
-        public Cleanup shouldCleanup(Input input, Agent agent, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
+        public Cleanup shouldCleanup(Input input, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
         {
-            if (!nextCalled)
+            if (!hasUpdate)
                 return NO;
 
             Durability durability = this.durability;
             if (durability == null) durability = NotDurable;
-            Cleanup cleanup = Cleanup.shouldCleanup(input, agent, txnId, 
executeAt, saveStatus, durability, participants, redundantBefore, 
durableBefore);
+            StoreParticipants participants = this.participants;
+            // TODO (expected): we need to filter participants to correctly 
compute doesStillExecute in Cleanup.shouldCleanup;
+            //  would be better to break this dependency, or otherwise encode 
it better.
+            //  In particular it would be nice to avoid doing this twice for 
each command on load, as we also do this in SafeCommandStore.
+            //  Perhaps we can special-case loading, and simply update the 
participants here so we can avoid doing it again on access
+            if (input == Input.FULL && participants != null)
+                participants = participants.filter(LOAD, redundantBefore, 
txnId, saveStatus != null && saveStatus.known.isExecuteAtKnown() ? executeAt : 
null);
+            Cleanup cleanup = Cleanup.shouldCleanup(input, txnId, executeAt, 
saveStatus, durability, participants, redundantBefore, durableBefore);
             if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
                 cleanup = this.cleanup;
             return cleanup;
         }
 
-        public Cleanup maybeCleanup(Input input, Agent agent, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
+        public Cleanup maybeCleanup(boolean clearFields, Input input, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
         {
-            Cleanup cleanup = shouldCleanup(input, agent, redundantBefore, 
durableBefore);
-            return maybeCleanup(input, cleanup);
+            Cleanup cleanup = shouldCleanup(input, redundantBefore, 
durableBefore);
+            return maybeCleanup(clearFields, input, cleanup);
         }
 
-        public Cleanup maybeCleanup(Input input, Cleanup cleanup)
+        public Cleanup maybeCleanup(boolean clearFields, Input input, Cleanup 
cleanup)
         {
             if (cleanup == NO || cleanup == EXPUNGE)
                 return cleanup;
 
-            SaveStatus newSaveStatus = cleanup.appliesIfNot;
+            SaveStatus newSaveStatus = cleanup.newStatus;
             if (saveStatus == null || saveStatus.compareTo(newSaveStatus) < 0)
             {
                 if (input == Input.FULL)
@@ -329,23 +345,38 @@ public class CommandChange
                         newSaveStatus = SaveStatus.TruncatedUnapplied;
                     saveStatus = newSaveStatus;
                 }
-                forceSetNulls(eraseKnownFieldsMask[newSaveStatus.ordinal()]);
+                forceSetNulls(clearFields, 
eraseKnownFieldsMask[newSaveStatus.ordinal()]);
             }
 
+            this.cleanup = cleanup;
             return cleanup;
         }
 
-        protected void setNulls(int mask)
+        protected void setNulls(boolean clearFields, int newFlags)
         {
-            mask &= ~(flags >>> 16); // limit ourselves to those fields that 
have not already been set (high 16 bits are those already-set fields)
-            forceSetNulls(mask);
+            newFlags &= ~(flags >>> 16); // limit ourselves to those fields 
that have not already been set (high 16 bits are those already-set fields)
+            forceSetNulls(clearFields, newFlags);
         }
 
-        protected void forceSetNulls(int mask)
+        protected boolean forceSetNulls(boolean clearFields, int newFlags)
         {
-            mask &= ~nullMask(flags); // limit ourselves to those fields that 
are not already null
-            mask = nullMask(mask); // limit ourselves to those fields that are 
now being set to null
-            int iterable = toIterableSetFields(mask);
+            newFlags &= ~nulls(flags); // limit ourselves to those fields that 
are not already null
+            newFlags = nulls(newFlags); // limit ourselves to those fields 
that are now being set to null
+            if (newFlags == 0)
+                return false;
+
+            if (clearFields)
+                clearFields(newFlags);
+
+            flags |= newFlags;
+            return true;
+        }
+
+        // clears any field with a CHANGED flag NOT limited only to NULL
+        private void clearFields(int newFlags)
+        {
+            newFlags &= notNulls(flags); // limit ourselves to those fields 
that are not already null
+            int iterable = toIterableSetFields(newFlags);
             for (Field next = nextSetField(iterable); next != null; iterable = 
unsetIterable(next, iterable), next = nextSetField(iterable))
             {
                 switch (next)
@@ -367,12 +398,112 @@ public class CommandChange
                     case RESULT:            result = null;                     
      break;
                 }
             }
-            flags |= mask;
         }
 
-        static int nullMask(int mask)
+        // only populate regular fields that are not already set, but apply 
any Cleanup if it is stronger than any already present
+        public boolean fillInMissingOrCleanup(boolean clearFields, Builder add)
+        {
+            hasUpdate = true;
+            count++;
+
+            int addFlags = notAlreadySet(not(CLEANUP, add.flags), flags);
+            if (addFlags == 0)
+                return addCleanup(clearFields, add.cleanup);
+
+            setNulls(false, addFlags);
+            int iterable = toIterableSetFields(notNulls(addFlags));
+            for (Field next = nextSetField(iterable) ; next != null; next = 
nextSetField(iterable = unsetIterable(next, iterable)))
+            {
+                switch (next)
+                {
+                    default: throw new UnhandledEnum(next);
+                    case PARTICIPANTS:      participants = add.participants;   
            break;
+                    case SAVE_STATUS:       saveStatus = add.saveStatus;       
            break;
+                    case DURABILITY:        durability = add.durability;       
            break;
+                    case EXECUTE_AT:        executeAt = add.executeAt;         
            break;
+                    case PROMISED:          promised = add.promised;           
            break;
+                    case ACCEPTED:          acceptedOrCommitted = 
add.acceptedOrCommitted; break;
+                    case PARTIAL_TXN:       partialTxn = add.partialTxn;       
            break;
+                    case PARTIAL_DEPS:      partialDeps = add.partialDeps;     
            break;
+                    case WAITING_ON:        waitingOn = add.waitingOn;         
            break;
+                    case MIN_UNIQUE_HLC:    minUniqueHlc = add.minUniqueHlc;   
            break;
+                    case EXECUTES_AT_LEAST: executesAtLeast = 
add.executesAtLeast;         break;
+                    case WRITES:            writes = add.writes;               
            break;
+                    case RESULT:            result = add.result;               
            break;
+                }
+            }
+            flags |= addFlags;
+            addCleanup(clearFields, add.cleanup);
+            return true;
+        }
+
+        // returns true if we made a material update to the Builder;
+        // that is, if we cleared a non-null field or if we are already 
mask-only
+        public boolean clearSuperseded(boolean clearFields, Builder 
superseding)
+        {
+            int unset = flags & setFieldsMask(superseding.flags);
+            if (notNulls(unset) == 0 && notNulls(flags) != 0)
+                return false;
+
+            if (clearFields)
+                clearFields(unset);
+            flags ^= unset;
+            return true;
+        }
+
+        public boolean addCleanup(boolean clearFields, Cleanup addCleanup)
+        {
+            if (addCleanup == null || addCleanup == NO)
+                return false;
+
+            if (cleanup != null && addCleanup.compareTo(cleanup) <= 0)
+                return false;
+
+            cleanup = addCleanup;
+            if (!cleanup.appliesTo(saveStatus))
+                return false;
+            return forceSetNulls(clearFields, 
eraseKnownFieldsMask[cleanup.newStatus.ordinal()]);
+        }
+
+        public boolean cleanup(boolean clearFields, Cleanup apply)
         {
-            return (mask & 0xffff) | (mask << 16);
+            int unsetFields = eraseKnownFieldsMask[apply.newStatus.ordinal()];
+            unsetFields &= flags;
+            if (unsetFields == 0)
+                return false;
+
+            if (clearFields)
+                clearFields(unsetFields);
+            flags ^= unsetFields;
+            return true;
+        }
+
+        protected static int nulls(int flags)
+        {
+            return (flags & 0xffff) | (flags << 16);
+        }
+
+        protected static int notNulls(int flags)
+        {
+            return flags & (~flags << 16);
+        }
+
+        protected static int not(Field field, int flags)
+        {
+            return flags & ~(0x10001 << field.ordinal());
+        }
+
+        protected static int notAlreadySet(int newFlags, int oldFlags)
+        {
+            return newFlags & ~setFieldsMask(oldFlags);
+        }
+
+        // result has both null and changed flag bits set for any changed 
field;
+        // can be used to limit another flags to those that were set bu these 
flags, or if inverted to those not set by these flags
+        protected static int setFieldsMask(int flags)
+        {
+            int mask = flags & 0xffff0000;
+            return mask | (mask >>> 16);
         }
 
         public Command.Minimal asMinimal()
@@ -388,7 +519,7 @@ public class CommandChange
         // TODO (expected): we shouldn't need to filter participants here, we 
will do it anyway before using in SafeCommandStore
         public Command construct(RedundantBefore redundantBefore)
         {
-            if (!nextCalled)
+            if (!hasUpdate)
                 return null;
 
             Invariants.require(txnId != null);
@@ -453,17 +584,17 @@ public class CommandChange
         {
             return "Builder {" +
                    "txnId=" + txnId
-                   + (isChanged(EXECUTE_AT, flags)        ? ", executeAt=" + 
executeAt : "")
-                   + (isChanged(EXECUTES_AT_LEAST, flags) ? ", 
executesAtLeast=" + executesAtLeast : "")
-                   + (isChanged(MIN_UNIQUE_HLC, flags)    ? ", minUniqueHlc=" 
+ minUniqueHlc : "")
+                   + (isChanged(PARTICIPANTS, flags)      ? ", participants=" 
+ participants : "")
                    + (isChanged(SAVE_STATUS, flags)       ? ", saveStatus=" + 
saveStatus : "")
                    + (isChanged(DURABILITY, flags)        ? ", durability=" + 
durability : "")
-                   + (isChanged(ACCEPTED, flags)          ? ", 
acceptedOrCommitted=" + acceptedOrCommitted : "")
+                   + (isChanged(EXECUTE_AT, flags)        ? ", executeAt=" + 
executeAt : "")
                    + (isChanged(PROMISED, flags)          ? ", promised=" + 
promised : "")
-                   + (isChanged(PARTICIPANTS, flags)      ? ", participants=" 
+ participants : "")
-                   + (isChanged(PARTIAL_DEPS, flags)      ? ", partialTxn=" + 
partialTxn : "")
+                   + (isChanged(ACCEPTED, flags)          ? ", 
acceptedOrCommitted=" + acceptedOrCommitted : "")
+                   + (isChanged(PARTIAL_TXN, flags)      ? ", partialTxn=" + 
partialTxn : "")
                    + (isChanged(PARTIAL_DEPS, flags)      ? ", partialDeps=" + 
partialDeps : "")
                    + (isChanged(WAITING_ON, flags)        ? ", waitingOn=" + 
waitingOn : "")
+                   + (isChanged(MIN_UNIQUE_HLC, flags)    ? ", minUniqueHlc=" 
+ minUniqueHlc : "")
+                   + (isChanged(EXECUTES_AT_LEAST, flags) ? ", 
executesAtLeast=" + executesAtLeast : "")
                    + (isChanged(WRITES, flags)            ? ", writes=" + 
writes : "")
                    + (isChanged(RESULT, flags)            ? ", result=" + 
result : "")
                    + (isChanged(CLEANUP, flags)           ? ", cleanup=" + 
cleanup : "") +
@@ -514,73 +645,86 @@ public class CommandChange
      */
 
     @VisibleForTesting
-    public static int getFlags(Command before, Command after)
+    public static int getFlags(@Nullable Command before, @Nonnull Command 
after)
     {
         int flags = 0;
-        if (before == null && after == null)
-            return flags;
-
-        // TODO (expected): derive this from precomputed bit masking on Known, 
only testing equality of objects we can't infer directly
-        flags = collectFlags(before, after, Command::executeAt, 
Timestamp::equalsStrict, true, EXECUTE_AT, flags);
-        flags = collectFlags(before, after, Command::executesAtLeast, true, 
EXECUTES_AT_LEAST, flags);
-        flags = collectFlags(before, after, CommandChange::getMinUniqueHlc, 
MIN_UNIQUE_HLC, flags);
-
-        flags = collectFlags(before, after, Command::saveStatus, false, 
SAVE_STATUS, flags);
-        flags = collectFlags(before, after, Command::durability, false, 
DURABILITY, flags);
-
-        flags = collectFlags(before, after, Command::acceptedOrCommitted, 
false, ACCEPTED, flags);
-        flags = collectFlags(before, after, Command::promised, false, 
PROMISED, flags);
-
-        flags = collectFlags(before, after, Command::participants, true, 
PARTICIPANTS, flags);
-        flags = collectFlags(before, after, Command::partialTxn, false, 
PARTIAL_TXN, flags);
-        flags = collectFlags(before, after, Command::partialDeps, false, 
PARTIAL_DEPS, flags);
-        flags = collectFlags(before, after, Command::waitingOn, 
WaitingOn::equalBitSets, true, WAITING_ON, flags);
-
-        flags = collectFlags(before, after, Command::writes, false, WRITES, 
flags);
-        flags = collectFlags(before, after, Command::result, false, RESULT, 
flags);
+        SaveStatus saveStatus = after.saveStatus();
+        if (before == null)
+        {
+            flags |= addIdentityFlags(null, after.participants(), 
PARTICIPANTS);
+            flags |= addIdentityFlags(null, saveStatus, SAVE_STATUS);
+            flags |= addIdentityFlags(null, after.durability(), DURABILITY);
+            flags |= addIdentityFlags(null, after.executeAt(), EXECUTE_AT);
+            flags |= addIdentityFlags(null, after.promised(), PROMISED);
+            flags |= addIdentityFlags(null, after.acceptedOrCommitted(), 
ACCEPTED);
+            flags |= addIdentityFlags(null, after.partialTxn(), PARTIAL_TXN);
+            flags |= addIdentityFlags(null, after.partialDeps(), PARTIAL_DEPS);
+            if (after.waitingOn() != null)
+            {
+                flags |= setChanged(WAITING_ON, 0);
+                flags |= addIdentityFlags(0, getMinUniqueHlc(after), 
MIN_UNIQUE_HLC);
+            }
+            flags |= addIdentityFlags(null, after.executesAtLeast(), 
EXECUTES_AT_LEAST);
+            flags |= addIdentityFlags(null, after.writes(), WRITES);
+            flags |= addIdentityFlags(null, after.result(), RESULT);
+        }
+        else
+        {
+            flags |= addEqualityFlags(before.participants(), 
after.participants(), PARTICIPANTS);
+            flags |= addIdentityFlags(before.saveStatus(), saveStatus, 
SAVE_STATUS);
+            flags |= addIdentityFlags(before.durability(), after.durability(), 
DURABILITY);
+            flags |= addFlags(before.executeAt(), after.executeAt(), 
Timestamp::equalsStrict, EXECUTE_AT);
+            flags |= addEqualityFlags(before.promised(), after.promised(), 
PROMISED);
+            flags |= addEqualityFlags(before.acceptedOrCommitted(), 
after.acceptedOrCommitted(), ACCEPTED);
+            flags |= addIdentityFlags(before.partialTxn(), after.partialTxn(), 
PARTIAL_TXN);
+            flags |= addIdentityFlags(before.partialDeps(), 
after.partialDeps(), PARTIAL_DEPS);
+            if (before.waitingOn() != after.waitingOn())
+            {
+                flags |= setChanged(WAITING_ON);
+                flags |= addIdentityFlags(getMinUniqueHlc(before), 
getMinUniqueHlc(after), MIN_UNIQUE_HLC);
+            }
+            flags |= addEqualityFlags(before.executesAtLeast(), 
after.executesAtLeast(), EXECUTES_AT_LEAST);
+            flags |= addIdentityFlags(before.writes(), after.writes(), WRITES);
+            flags |= addIdentityFlags(before.result(), after.result(), RESULT);
+        }
 
         // make sure we have enough information to decide whether to expunge 
timestamps (for unique ApplyAt HLC guarantees)
-        if (after.saveStatus().known.is(ApplyAtKnown) && (before == null || 
!before.saveStatus().known.is(ApplyAtKnown)))
+        if (saveStatus.known.is(ApplyAtKnown) && (before == null || 
!before.saveStatus().known.is(ApplyAtKnown)))
         {
             flags = setChanged(EXECUTE_AT, flags);
             flags = setChanged(PARTICIPANTS, flags);
             flags = setChanged(SAVE_STATUS, flags);
         }
 
-        flags |= eraseKnownFieldsMask[after.saveStatus().ordinal()];
-
+        flags |= eraseKnownFieldsMask[saveStatus.ordinal()];
         return flags;
     }
 
-    private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ, 
VAL> convert, boolean allowClassMismatch, Field field, int flags)
+    private static int addIdentityFlags(Object l, Object r, Field field)
     {
-        return collectFlags(lo, ro, convert, Object::equals, 
allowClassMismatch, field, flags);
+        if (l == r) return 0;
+        if (r == null) return setIsNullAndChanged(field);
+        return setChanged(field);
     }
 
-    private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ, 
VAL> convert, BiPredicate<VAL, VAL> equals, boolean allowClassMismatch, Field 
field, int flags)
+    private static <T> int addEqualityFlags(T l, T r, Field field)
     {
-        VAL l = null;
-        VAL r = null;
-        if (lo != null) l = convert.apply(lo);
-        if (ro != null) r = convert.apply(ro);
-
-        if (l == r) return flags; // no change
-        if (r == null) return setIsNullAndChanged(field, flags);
-        if (l == null) return setChanged(field, flags);
-        Invariants.require(allowClassMismatch || l.getClass() == r.getClass(), 
"%s != %s", l.getClass(), r.getClass());
-        if (equals.test(l, r)) return flags; // no change
-        return setChanged(field, flags);
+        return addFlags(l, r, Objects::equals, field);
     }
 
-    private static <OBJ> int collectFlags(OBJ lo, OBJ ro, ToLongFunction<OBJ> 
convert, Field field, int flags)
+    private static <T> int addFlags(T l, T r, BiPredicate<T, T> equality, 
Field field)
     {
-        long l = 0, r = 0;
-        if (lo != null) l = convert.applyAsLong(lo);
-        if (ro != null) r = convert.applyAsLong(ro);
+        if (l == r) return 0;
+        if (r == null) return setIsNullAndChanged(field);
+        if (l == null) return setChanged(field);
+        if (equality.test(l, r)) return 0;
+        return setChanged(field);
+    }
 
-        return l == r ? flags:
-                r == 0 ? setIsNullAndChanged(field, flags)
-                       : setChanged(field, flags);
+    private static int addIdentityFlags(long l, long r, Field field)
+    {
+        if (l == r) return 0;
+        return setChanged(field);
     }
 
     public static boolean anyFieldChanged(int flags)
@@ -599,6 +743,16 @@ public class CommandChange
         return oldFlags | (0x10000 << field.ordinal());
     }
 
+    public static int setChanged(Field field)
+    {
+        return 0x10000 << field.ordinal();
+    }
+
+    public static int setIsNullAndChanged(Field field)
+    {
+        return 0x10001 << field.ordinal();
+    }
+
     @VisibleForTesting
     public static boolean isChanged(Field field, int oldFlags)
     {
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 9d03b7de..64103a3d 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -289,7 +289,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             Invariants.require(globalCommand != null && 
!globalCommand.isEmpty());
             Command command = globalCommand.value();
             StoreParticipants participants = 
command.participants().filter(LOAD, safeStore, txnId, 
command.executeAtIfKnown());
-            Cleanup cleanup = Cleanup.shouldCleanup(FULL, agent, txnId, 
command.executeAtIfKnown(), command.saveStatus(), command.durability(), 
participants, unsafeGetRedundantBefore(), durableBefore());
+            Cleanup cleanup = Cleanup.shouldCleanup(FULL, txnId, 
command.executeAtIfKnown(), command.saveStatus(), command.durability(), 
participants, unsafeGetRedundantBefore(), durableBefore());
             Invariants.require(command.hasBeen(Applied)
                                || cleanup.compareTo(Cleanup.TRUNCATE) >= 0
                                || (durableBefore().min(txnId) == NotDurable &&
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index d7da2ce7..d95f4af5 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -306,8 +306,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
     @Override
     public void clearBefore(SafeCommandStore safeStore, TxnId 
clearWaitingBefore, TxnId clearAllBefore)
     {
-        if (clearAllBefore.compareTo(clearWaitingBefore) >= 0)
-            clearWaitingBefore = clearAllBefore;
+        Invariants.require(clearAllBefore.compareTo(clearWaitingBefore) <= 0);
 
         int index = 0;
         while (index < BTree.size(stateMap))
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index 75454675..a1ee2477 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -21,7 +21,6 @@ package accord.local;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import accord.api.Agent;
 import accord.primitives.FullRoute;
 import accord.primitives.Participants;
 import accord.primitives.Route;
@@ -33,6 +32,7 @@ import accord.utils.Invariants;
 import accord.utils.UnhandledEnum;
 
 import static accord.api.ProtocolModifiers.Toggles.requiresUniqueHlcs;
+import static accord.local.Cleanup.Input.FULL;
 import static accord.local.Cleanup.Input.PARTIAL;
 import static accord.local.RedundantStatus.Property.GC_BEFORE;
 import static accord.local.RedundantStatus.Property.LOCALLY_APPLIED;
@@ -48,6 +48,7 @@ import static accord.primitives.SaveStatus.TruncatedApply;
 import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
 import static accord.primitives.SaveStatus.Uninitialised;
 import static accord.primitives.SaveStatus.Vestigial;
+import static accord.primitives.Status.Durability.NotDurable;
 import static accord.primitives.Status.Durability.UniversalOrInvalidated;
 import static accord.primitives.Status.PreCommitted;
 import static accord.primitives.Status.Truncated;
@@ -73,21 +74,38 @@ public enum Cleanup
 
     private static final Cleanup[] VALUES = values();
 
-    public final SaveStatus appliesIfNot;
+    public final SaveStatus newStatus;
 
-    Cleanup(SaveStatus appliesIfNot)
+    Cleanup(SaveStatus newStatus)
     {
-        this.appliesIfNot = appliesIfNot;
+        this.newStatus = newStatus;
+    }
+
+    public final Cleanup atLeast(Cleanup that)
+    {
+        return compareTo(that) >= 0 ? this : that;
+    }
+
+    public final boolean appliesTo(SaveStatus saveStatus)
+    {
+        if (saveStatus == null)
+            return this != NO;
+
+        switch (this)
+        {
+            case EXPUNGE: return true;
+            case ERASE: return saveStatus != Erased;
+            default: return saveStatus.compareTo(newStatus) < 0;
+        }
     }
 
     public final Cleanup filter(SaveStatus saveStatus)
     {
-        return saveStatus != null && saveStatus.compareTo(appliesIfNot) >= 0 
&& this != EXPUNGE ? NO : this;
+        return appliesTo(saveStatus) ? this : NO;
     }
 
     public enum Input { PARTIAL, FULL }
 
-    // TODO (required): simulate compaction of log records in burn test
     public static Cleanup shouldCleanup(Input input, SafeCommandStore 
safeStore, Command command)
     {
         return shouldCleanup(input, safeStore, command, 
command.participants());
@@ -95,19 +113,19 @@ public enum Cleanup
 
     public static Cleanup shouldCleanup(Input input, SafeCommandStore 
safeStore, Command command, @Nonnull StoreParticipants participants)
     {
-        return shouldCleanup(input, safeStore.agent(), command.txnId(), 
command.executeAt(), command.saveStatus(), command.durability(), participants,
+        return shouldCleanup(input, command.txnId(), command.executeAt(), 
command.saveStatus(), command.durability(), participants,
                              safeStore.redundantBefore(), 
safeStore.durableBefore());
     }
 
-    public static Cleanup shouldCleanup(Input input, Agent agent, Command 
command, RedundantBefore redundantBefore, DurableBefore durableBefore)
+    public static Cleanup shouldCleanup(Input input, Command command, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
-        return shouldCleanup(input, agent, command.txnId(), 
command.executeAt(), command.saveStatus(), command.durability(), 
command.participants(),
+        return shouldCleanup(input, command.txnId(), command.executeAt(), 
command.saveStatus(), command.durability(), command.participants(),
                              redundantBefore, durableBefore);
     }
 
-    public static Cleanup shouldCleanup(Input input, Agent agent, TxnId txnId, 
Timestamp executeAt, SaveStatus status, Durability durability, 
StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore 
durableBefore)
+    public static Cleanup shouldCleanup(Input input, TxnId txnId, Timestamp 
executeAt, SaveStatus status, Durability durability, StoreParticipants 
participants, RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
-        Cleanup cleanup = shouldCleanupInternal(input, agent, txnId, 
executeAt, status, durability, participants, redundantBefore, durableBefore);
+        Cleanup cleanup = shouldCleanupInternal(input, txnId, executeAt, 
status, durability, participants, redundantBefore, durableBefore);
         return cleanup.filter(status);
     }
 
@@ -137,47 +155,57 @@ public enum Cleanup
      * we pessimistically assume the whole cluster may need to see its outcome
      * TODO (expected): we should be able to rely on replicas to infer 
Invalidated from an Erased record
      */
-    private static Cleanup shouldCleanupInternal(Input input, Agent agent, 
TxnId txnId, @Nullable Timestamp executeAt, @Nullable SaveStatus saveStatus, 
Durability durability, @Nullable StoreParticipants participants, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
+    private static Cleanup shouldCleanupInternal(Input input, TxnId txnId, 
@Nullable Timestamp executeAt, @Nullable SaveStatus saveStatus, Durability 
durability, @Nullable StoreParticipants participants, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
     {
         if (txnId.kind() == EphemeralRead)
             return NO;
 
         if (expunge(txnId, executeAt, saveStatus, participants, 
redundantBefore, durableBefore))
-            return expunge();
+            return expunge(txnId);
 
         if (saveStatus == null || participants == null)
             return NO;
 
         if (participants.hasFullRoute())
-            return cleanupWithFullRoute(input, agent, participants, txnId, 
executeAt, saveStatus, durability, redundantBefore, durableBefore);
+            return cleanupWithFullRoute(input, participants, txnId, executeAt, 
saveStatus, durability, redundantBefore, durableBefore);
         return cleanupWithoutFullRoute(input, txnId, saveStatus, participants, 
redundantBefore, durableBefore);
     }
 
-    private static Cleanup cleanupWithFullRoute(Input input, Agent agent, 
StoreParticipants participants, TxnId txnId, Timestamp executeAt, SaveStatus 
saveStatus, Durability durability, RedundantBefore redundantBefore, 
DurableBefore durableBefore)
+    private static Cleanup cleanupWithFullRoute(Input input, StoreParticipants 
participants, TxnId txnId, Timestamp executeAt, SaveStatus saveStatus, 
Durability durability, RedundantBefore redundantBefore, DurableBefore 
durableBefore)
     {
         // We first check if the command is redundant locally, i.e. whether it 
has been applied to all non-faulty replicas of the local shard
         // If not, we don't want to truncate its state else we may make 
catching up for these other replicas much harder
         FullRoute<?> route = Route.castToFullRoute(participants.route());
-        RedundantStatus redundant = redundantBefore.status(txnId, 
saveStatus.known.is(ApplyAtKnown) ? executeAt : null, route);
-        Invariants.require(redundant.none(NOT_OWNED),"Command " + txnId + " 
that is being loaded is not owned by this shard on route " + route);
+        // we must not use executeAt here if input == PARTIAL because with 
partial compaction we might be merging the combination of some old executeAt 
with a later partial status
+        RedundantStatus redundant = redundantBefore.status(txnId, input == 
FULL && saveStatus.known.is(ApplyAtKnown) ? executeAt : null, route);
+        Invariants.require(redundant.none(NOT_OWNED),"Command %s that is being 
loaded is not owned by this shard on route %s", txnId, route);
 
         if (redundant.none(LOCALLY_REDUNDANT))
             return NO;
 
-        Cleanup ifUndecided = cleanupIfUndecidedWithFullRoute(input, txnId, 
saveStatus, redundant);
-        if (ifUndecided != null)
-            return ifUndecided;
-
+        Cleanup min = cleanupIfUndecidedWithFullRoute(input, txnId, 
saveStatus, redundant);
         if (!redundant.all(TRUNCATE_BEFORE))
         {
             // TODO (expected): see if we can improve our invariants so we can 
remove this special-case
             if (input != PARTIAL && redundant.all(SHARD_APPLIED) && 
redundant.all(LOCALLY_DEFUNCT) && !redundant.any(LOCALLY_APPLIED))
-                return truncate(txnId);
-            return NO;
+                return truncate(txnId, min);
+            return min;
         }
 
         Invariants.paranoid(redundant.all(SHARD_APPLIED));
-        Durability test = Durability.max(durability, durableBefore.min(txnId, 
participants.route()));
+
+        if (saveStatus.compareTo(Vestigial) >= 0)
+        {
+            // we can't use durability from an Invalidated record to decide if 
we can erase/expunge,
+            // as we don't know that this has been persisted at all shards.
+            // Similarly, vestigial/erased records don't know their etymology, 
and may derive from Invalidate
+            durability = NotDurable;
+        }
+
+        Durability test;
+        if (durability.compareTo(UniversalOrInvalidated) >= 0) test = 
durability;
+        else test = Durability.max(durability, durableBefore.min(txnId, 
participants.route()));
+
         switch (test)
         {
             default: throw new UnhandledEnum(durability);
@@ -187,17 +215,17 @@ public enum Cleanup
                 // TODO (required): consider how we guarantee not to break 
recovery of other shards if a majority on this shard are PRE_BOOTSTRAP
                 //   (if the condition is false and we fall through to 
removing Outcome)
                 if (participants.doesStillExecute())
-                    return truncateWithOutcome(txnId);
+                    return min.atLeast(truncateWithOutcome(txnId, min));
 
             case MajorityOrInvalidated:
             case Majority:
-                return truncate(txnId);
+                return min.atLeast(truncate(txnId, min));
 
             case UniversalOrInvalidated:
             case Universal:
                 if (redundant.all(GC_BEFORE))
-                    return erase();
-                return truncate(txnId);
+                    return erase(txnId, min);
+                return truncate(txnId, min);
         }
     }
 
@@ -226,10 +254,7 @@ public enum Cleanup
 
     private static Cleanup cleanupIfUndecidedWithFullRoute(Input input, TxnId 
txnId, SaveStatus saveStatus, RedundantStatus redundantStatus)
     {
-        if (saveStatus.hasBeen(PreCommitted))
-            return null;
-
-        if (input == PARTIAL)
+        if (input == PARTIAL || saveStatus.hasBeen(PreCommitted))
             return NO;
 
         return cleanupUndecided(txnId, redundantStatus);
@@ -258,6 +283,8 @@ public enum Cleanup
 
         if (!requiresUniqueHlcs() || !txnId.is(Write)) return true;
         if (saveStatus == null || !saveStatus.known.is(ApplyAtKnown)) return 
true;
+        // note, it is safe to use ApplyAtKnown even with PARTIAL input here, 
because we are only discarding information,
+        // and we can safely discard any stale executeAt
         if (executeAt == null) return true;
         if (minGcBefore.is(HLC_BOUND) && executeAt.uniqueHlc() < 
minGcBefore.hlc()) return true;
         if (participants == null)
@@ -276,18 +303,29 @@ public enum Cleanup
     {
         return INVALIDATE;
     }
-    private static Cleanup truncateWithOutcome(TxnId txnId)
+
+    private static Cleanup truncateWithOutcome(TxnId txnId, Cleanup atLeast)
     {
-        return TRUNCATE_WITH_OUTCOME;
+        return atLeast.compareTo(TRUNCATE_WITH_OUTCOME) > 0 ? atLeast : 
TRUNCATE_WITH_OUTCOME;
     }
-    private static Cleanup truncate(TxnId txnId)
+
+    private static Cleanup truncate(TxnId txnId, Cleanup atLeast)
     {
-        return TRUNCATE;
+        return atLeast.compareTo(TRUNCATE) > 0 ? atLeast : TRUNCATE;
     }
+
     private static Cleanup vestigial(TxnId txnId)
     {
         return VESTIGIAL;
     }
-    private static Cleanup erase() { return ERASE; }
-    private static Cleanup expunge() { return EXPUNGE; }
+
+    private static Cleanup erase(TxnId txnId, Cleanup atLeast)
+    {
+        return atLeast != EXPUNGE ? ERASE : EXPUNGE;
+    }
+
+    private static Cleanup expunge(TxnId txnId)
+    {
+        return EXPUNGE;
+    }
 }
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index f5ee9835..36647fc1 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -701,8 +701,8 @@ public abstract class CommandStore implements AgentExecutor
     protected void updatedRedundantBefore(SafeCommandStore safeStore, TxnId 
syncId, Ranges ranges)
     {
         TxnId clearWaitingBefore = 
redundantBefore.minShardAndLocallyAppliedBefore();
-        TxnId clearAnyBefore = durableBefore().min.majorityBefore;
-        progressLog.clearBefore(safeStore, clearWaitingBefore, clearAnyBefore);
+        TxnId clearAllBefore = TxnId.min(clearWaitingBefore, 
durableBefore().min.majorityBefore);
+        progressLog.clearBefore(safeStore, clearWaitingBefore, clearAllBefore);
         listeners.clearBefore(this, clearWaitingBefore);
     }
 
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 8817a787..a1abe351 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -925,7 +925,7 @@ public class Commands
             case TRUNCATE_WITH_OUTCOME:
                 Invariants.requireArgument(command.known().is(Apply));
                 Invariants.requireArgument(command.known().is(ApplyAtKnown));
-                result = truncated(command, newParticipants, 
cleanup.appliesIfNot);
+                result = truncated(command, newParticipants, 
cleanup.newStatus);
                 break;
 
             case TRUNCATE:
@@ -938,7 +938,8 @@ public class Commands
                 break;
 
             case ERASE:
-                Invariants.require(command.saveStatus().compareTo(Erased) < 0);
+                Invariants.require(command.saveStatus() != Erased);
+
             case EXPUNGE:
                 result = erased(command, newParticipants);
                 break;
@@ -983,7 +984,7 @@ public class Commands
             return true;
         }
 
-        Invariants.require(cleanup == EXPUNGE || 
command.saveStatus().compareTo(cleanup.appliesIfNot) < 0);
+        Invariants.require(cleanup.appliesTo(command.saveStatus()));
         purge(safeStore, safeCommand, command, cleanupParticipants, cleanup, 
true);
         return true;
     }
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 492dddc2..7efe5509 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -557,7 +557,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
 
             // TODO (required): audit each of these methods
             if (bounds.is(txnId, SHARD_APPLIED)
-                && ((bounds.endEpoch <= txnId.epoch() && 
(!txnId.awaitsPreviouslyOwned() || bounds.isLocallyRetired()))
+                && ((bounds.endEpoch <= txnId.epoch() && 
bounds.isLocallyRetired())
                     || (executeAtIfKnown != null && executeAtIfKnown.epoch() < 
bounds.startEpoch)
                     || bounds.is(txnId, PRE_BOOTSTRAP_OR_STALE)))
                 return execute.without(Ranges.of(bounds.range));
@@ -579,17 +579,28 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
             return ownedOrNotRedundant;
         }
 
-        static Participants<?> withoutShardAppliedBefore(Bounds bounds, 
@Nonnull Participants<?> notShardApplied, TxnId txnId)
+        static Participants<?> withoutShardApplied(Bounds bounds, @Nonnull 
Participants<?> notShardApplied, TxnId txnId)
         {
             if (bounds == null)
                 return notShardApplied;
 
-            if (txnId.compareTo(bounds.maxBound(SHARD_APPLIED)) <= 0)
+            if (bounds.is(txnId, SHARD_APPLIED))
                 return notShardApplied.without(Ranges.of(bounds.range));
 
             return notShardApplied;
         }
 
+        static Participants<?> withoutShardAppliedLocallySynced(Bounds bounds, 
@Nonnull Participants<?> notShardAppliedLocallyDefunct, TxnId txnId)
+        {
+            if (bounds == null)
+                return notShardAppliedLocallyDefunct;
+
+            if (bounds.is(txnId, SHARD_APPLIED, LOCALLY_SYNCED))
+                return 
notShardAppliedLocallyDefunct.without(Ranges.of(bounds.range));
+
+            return notShardAppliedLocallyDefunct;
+        }
+
         static Ranges withoutBeforeGc(Bounds entry, @Nonnull Ranges 
notGarbage, TxnId txnId, @Nullable Timestamp executeAt)
         {
             if (entry == null || (executeAt == null ? entry.outOfBounds(txnId) 
: entry.outOfBounds(txnId, executeAt)))
@@ -1077,7 +1088,17 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
     {
         if (participants.isEmpty() || maxShardAppliedBefore.compareTo(txnId) < 
0)
             return participants;
-        return foldl(participants, Bounds::withoutShardAppliedBefore, 
participants, txnId);
+        return foldl(participants, Bounds::withoutShardApplied, participants, 
txnId);
+    }
+
+    /**
+     * Subtract any ranges we consider stale, pre-bootstrap, or that were 
previously owned and have been retired
+     */
+    public Participants<?> withoutShardAppliedLocallySynced(TxnId txnId, 
Participants<?> participants)
+    {
+        if (participants.isEmpty() || maxShardAppliedBefore.compareTo(txnId) < 
0)
+            return participants;
+        return foldl(participants, Bounds::withoutShardAppliedLocallySynced, 
participants, txnId);
     }
 
     public static class Builder extends AbstractIntervalBuilder<RoutingKey, 
Bounds, RedundantBefore>
diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java 
b/accord-core/src/main/java/accord/local/StoreParticipants.java
index 67373793..50ea88cb 100644
--- a/accord-core/src/main/java/accord/local/StoreParticipants.java
+++ b/accord-core/src/main/java/accord/local/StoreParticipants.java
@@ -321,7 +321,6 @@ public class StoreParticipants
      * Do not invoke this method on a participants we will use to query esp. 
e.g. for calculateDeps.
      * TODO (desired): create separate Query object that cannot be filtered or 
used to update a Command
      */
-    static int count = 0;
     public StoreParticipants filter(Filter filter, RedundantBefore 
redundantBefore, TxnId txnId, @Nullable Timestamp executeAtIfKnown)
     {
         if (filter == QUERY)
@@ -374,7 +373,9 @@ public class StoreParticipants
             Participants<?> extraTouches = touches.without(owns);
             if (!extraTouches.isEmpty())
             {
-                Participants<?> filteredExtra = 
redundantBefore.withoutShardApplied(txnId, extraTouches);
+                Participants<?> filteredExtra = txnId.is(ExclusiveSyncPoint)
+                                                ? 
redundantBefore.withoutShardAppliedLocallySynced(txnId, extraTouches)
+                                                : 
redundantBefore.withoutShardApplied(txnId, extraTouches);
                 if (extraTouches != filteredExtra)
                     touches = owns.with((Participants)filteredExtra);
             }
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index d6b45dfb..c30a140f 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -2002,7 +2002,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
 
         TxnInfo[] newById = removeRedundantById(byId, bounds, newBounds);
         int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(), 
redundantBefore(newBounds));
-        Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 : 
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
+        Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 || 
byId[prunedBeforeById].compareTo(newBounds.gcBefore) < 0 : 
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
         Object[] newLoadingPruned = 
Pruning.removeRedundantLoadingPruned(loadingPruned, redundantBefore(newBounds));
 
         long maxUniqueHlc = this.maxUniqueHlc;
diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java 
b/accord-core/src/main/java/accord/messages/SetShardDurable.java
index eea0bb66..1ead882f 100644
--- a/accord-core/src/main/java/accord/messages/SetShardDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java
@@ -24,6 +24,7 @@ import accord.primitives.AbstractRanges;
 import accord.primitives.Status.Durability;
 import accord.primitives.SyncPoint;
 import accord.primitives.TxnId;
+import accord.utils.Invariants;
 import accord.utils.MapReduceConsume;
 import accord.utils.async.Cancellable;
 
@@ -42,6 +43,7 @@ public class SetShardDurable extends 
AbstractRequest<SimpleReply>
         super(exclusiveSyncPoint.syncId);
         this.exclusiveSyncPoint = exclusiveSyncPoint;
         this.durability = durability;
+        
Invariants.require(durability.compareTo(Durability.MajorityOrInvalidated) >= 0);
     }
 
     private TxnId syncIdWithFlags()
@@ -52,8 +54,9 @@ public class SetShardDurable extends 
AbstractRequest<SimpleReply>
     @Override
     public Cancellable submit()
     {
+        
Invariants.require(durability.compareTo(Durability.MajorityOrInvalidated) >= 0);
         TxnId syncIdWithFlags = syncIdWithFlags();
-        node.markDurable(exclusiveSyncPoint.route.toRanges(), syncIdWithFlags, 
syncIdWithFlags)
+        node.markDurable(exclusiveSyncPoint.route.toRanges(), syncIdWithFlags, 
durability.compareTo(Durability.UniversalOrInvalidated) >= 0 ? syncIdWithFlags 
: TxnId.NONE)
         .addCallback((success, fail) -> {
             if (fail != null) node.reply(replyTo, replyContext, null, fail);
             else node.mapReduceConsumeLocal(this, exclusiveSyncPoint.route, 
waitForEpoch(), waitForEpoch(), this);
diff --git a/accord-core/src/main/java/accord/primitives/Known.java 
b/accord-core/src/main/java/accord/primitives/Known.java
index 4bea0ae4..4e8b7fd6 100644
--- a/accord-core/src/main/java/accord/primitives/Known.java
+++ b/accord-core/src/main/java/accord/primitives/Known.java
@@ -30,6 +30,7 @@ import accord.utils.UnhandledEnum;
 
 import static accord.primitives.Known.KnownExecuteAt.ApplyAtKnown;
 import static accord.primitives.Known.KnownExecuteAt.IS_EXECUTE_AT_KNOWN;
+import static accord.primitives.Known.Outcome.WasApply;
 import static accord.primitives.Known.PrivilegedVote.VotePreAccept;
 import static accord.primitives.Known.PrivilegedVote.NoVote;
 import static accord.primitives.Known.Definition.DefinitionErased;
@@ -535,6 +536,11 @@ public class Known
         return Outcome.VALUES[outcomeOrdinal()];
     }
 
+    public boolean isOrWasApply()
+    {
+        return is(Outcome.Apply) || is(WasApply);
+    }
+
     public boolean is(Outcome outcome)
     {
         return outcomeOrdinal() == outcome.ordinal();
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java 
b/accord-core/src/main/java/accord/utils/Invariants.java
index 0851c720..fe6cae97 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -380,6 +380,18 @@ public class Invariants
             illegalArgument(format(fmt, p1, p2));
     }
 
+    public static void requireArgument(boolean condition, String fmt, 
@Nullable Object p1, @Nullable Object p2, @Nullable Object p3)
+    {
+        if (!condition)
+            illegalArgument(format(fmt, p1, p2, p3));
+    }
+
+    public static void requireArgument(boolean condition, String fmt, 
@Nullable Object p1, @Nullable Object p2, @Nullable Object p3, @Nullable Object 
p4)
+    {
+        if (!condition)
+            illegalArgument(format(fmt, p1, p2, p3, p4));
+    }
+
     public static void requireArgument(boolean condition, String fmt, 
Object... args)
     {
         if (!condition)
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index c7de395f..a1be0008 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -201,7 +201,7 @@ public class Utils
                              InMemoryCommandStores.Synchronized::new,
                              new CoordinationAdapter.DefaultFactory(),
                              DurableBefore.NOOP_PERSISTER,
-                             new InMemoryJournal(nodeId, agent, randomSource));
+                             new InMemoryJournal(nodeId, randomSource));
         awaitUninterruptibly(node.unsafeStart());
         return node;
     }
diff --git a/accord-core/src/test/java/accord/burn/BurnTestBase.java 
b/accord-core/src/test/java/accord/burn/BurnTestBase.java
index 984f782d..35759349 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestBase.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestBase.java
@@ -52,7 +52,6 @@ import java.util.zip.CRC32;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.api.Agent;
 import accord.api.Journal;
 import accord.api.Key;
 import accord.api.ProtocolModifiers.Toggles;
@@ -98,7 +97,6 @@ import accord.utils.DefaultRandom;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
-import accord.utils.TriFunction;
 import accord.utils.UnhandledEnum;
 import accord.utils.Utils;
 import accord.utils.async.AsyncExecutor;
@@ -369,7 +367,7 @@ public class BurnTestBase
         f2.get();
     }
 
-    public static void burn(RandomSource random, TopologyFactory 
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int 
prefixCount, int operations, int concurrency, PendingQueue pendingQueue, 
TriFunction<Id, Agent, RandomSource, Journal> journalFactory)
+    public static void burn(RandomSource random, TopologyFactory 
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int 
prefixCount, int operations, int concurrency, PendingQueue pendingQueue, 
BiFunction<Node.Id, RandomSource, Journal> journalFactory)
     {
         List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
         Thread.currentThread().setUncaughtExceptionHandler((th, fail) -> {
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 1cd03eeb..6a6a8f9e 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -52,7 +52,6 @@ import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.api.Agent;
 import accord.api.Journal;
 import accord.api.MessageSink;
 import accord.api.RoutingKey;
@@ -616,7 +615,7 @@ public class Cluster
                                               Supplier<RandomSource> 
randomSupplier,
                                               Supplier<TimeService> 
timeServiceSupplier,
                                               TopologyFactory topologyFactory, 
Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal,
-                                              Consumer<Map<Id, Node>> 
readySignal, TriFunction<Node.Id, Agent, RandomSource, Journal> journalFactory)
+                                              Consumer<Map<Id, Node>> 
readySignal, BiFunction<Node.Id, RandomSource, Journal> journalFactory)
     {
         Topology topology = topologyFactory.toTopology(nodes);
         Map<Id, Node> nodeMap = new LinkedHashMap<>();
@@ -677,7 +676,7 @@ public class Cluster
                 BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges) 
-> configRandomizer.onStale(id, sinceAtLeast, ranges);
                 AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id, 
onStale, timeouts);
                 executorMap.put(id, nodeExecutor);
-                Journal journal = journalFactory.apply(id, 
nodeExecutor.agent(), random);
+                Journal journal = journalFactory.apply(id, random);
                 journalMap.put(id, journal);
                 BurnTestConfigurationService configService = new 
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, 
nodeMap::get, topologyUpdates);
                 DelayedCommandStores.CacheLoading cacheLoading = new 
RandomLoader(random).newLoader(journal);
@@ -687,6 +686,7 @@ public class Cluster
                                      randomSupplier.get(), scheduler, 
SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, 
DefaultTimeouts::new,
                                      DefaultProgressLogs::new, 
DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, 
cacheLoading), new CoordinationAdapter.DefaultFactory(),
                                      DurableBefore.NOOP_PERSISTER, journal);
+                journal.start(node);
                 DurabilityService durability = node.durability();
                 // TODO (desired): randomise
                 durability.shards().setShardCycleTime(30, SECONDS);
@@ -922,7 +922,7 @@ public class Cluster
                 {
                     if (afterCommand.is(Status.Invalidated))
                         
Invariants.require(beforeCommand.hasBeen(Status.Truncated) || 
(!beforeCommand.hasBeen(Status.PreCommitted)
-                               && Cleanup.shouldCleanup(FULL, s.agent(), 
e.getKey(), beforeCommand.executeAtIfKnown(), beforeCommand.saveStatus(), 
beforeCommand.durability(), beforeCommand.participants(), 
store.unsafeGetRedundantBefore(), store.durableBefore()).compareTo(INVALIDATE) 
>= 0));
+                               && Cleanup.shouldCleanup(FULL, e.getKey(), 
beforeCommand.executeAtIfKnown(), beforeCommand.saveStatus(), 
beforeCommand.durability(), afterCommand.participants(), 
store.unsafeGetRedundantBefore(), store.durableBefore()).compareTo(INVALIDATE) 
>= 0));
                     continue;
                 }
                 if (beforeCommand.hasBeen(Status.Truncated))
@@ -954,7 +954,7 @@ public class Cluster
                         if (beforeCommand.saveStatus() == SaveStatus.Erased)
                             continue;
 
-                        if (Cleanup.shouldCleanup(FULL, s.agent(), 
beforeCommand, store.unsafeGetRedundantBefore(), store.durableBefore()) == 
EXPUNGE)
+                        if (Cleanup.shouldCleanup(FULL, beforeCommand, 
store.unsafeGetRedundantBefore(), store.durableBefore()) == EXPUNGE)
                             continue;
 
                         if 
(store.unsafeGetRedundantBefore().min(beforeCommand.participants().owns(), 
RedundantBefore.Bounds::shardAndLocallyRedundantBefore).compareTo(txnId) > 0)
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 00f00094..32303e91 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
+import javax.annotation.Nonnull;
+
 import com.google.common.collect.ImmutableSortedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,7 +103,7 @@ import static accord.local.Cleanup.NO;
 import static accord.local.Cleanup.TRUNCATE;
 import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
 import static accord.primitives.SaveStatus.Erased;
-import static accord.primitives.Status.Invalidated;
+import static accord.primitives.SaveStatus.Uninitialised;
 import static accord.primitives.Status.Truncated;
 import static accord.utils.Invariants.illegalState;
 
@@ -112,17 +114,22 @@ public class InMemoryJournal implements Journal
     private final List<TopologyUpdate> topologyUpdates = new ArrayList<>();
     private final Int2ObjectHashMap<FieldUpdates> fieldStates = new 
Int2ObjectHashMap<>();
 
-    private final Node.Id id;
-    private final Agent agent;
+    private Node node;
+    private Agent agent;
     private final RandomSource random;
 
-    public InMemoryJournal(Node.Id id, Agent agent, RandomSource random)
+    public InMemoryJournal(Node.Id id, RandomSource random)
     {
-        this.id = id;
-        this.agent = agent;
         this.random = random;
     }
 
+    public Journal start(Node node)
+    {
+        this.node = node;
+        this.agent = node.agent();
+        return this;
+    }
+
     @Override
     public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
@@ -139,7 +146,7 @@ public class InMemoryJournal implements Journal
         if (builder == null)
             return null;
 
-        Cleanup cleanup = builder.maybeCleanup(FULL, agent, redundantBefore, 
durableBefore);
+        Cleanup cleanup = builder.maybeCleanup(true, FULL, redundantBefore, 
durableBefore);
         if (cleanup == EXPUNGE)
             return null;
         return builder.construct(redundantBefore);
@@ -152,7 +159,7 @@ public class InMemoryJournal implements Journal
         if (builder == null || builder.isEmpty())
             return null;
 
-        Cleanup cleanup = builder.shouldCleanup(FULL, agent, redundantBefore, 
durableBefore);
+        Cleanup cleanup = builder.shouldCleanup(FULL, redundantBefore, 
durableBefore);
         switch (cleanup)
         {
             case VESTIGIAL:
@@ -184,6 +191,8 @@ public class InMemoryJournal implements Journal
         for (int i = saved.size() - 1; i >= 0; i--)
         {
             Diff diff = saved.get(i);
+            if (diff == null)
+                continue;
             if (builder == null)
                 builder = new Builder(diff.txnId, load);
             builder.apply(diff);
@@ -331,83 +340,91 @@ public class InMemoryJournal implements Journal
             for (Map.Entry<TxnId, List<Diff>> e2 : localJournal.entrySet())
             {
                 List<Diff> diffs = e2.getValue();
-                int from;
+
                 if (diffs.isEmpty()) continue;
                 List<Diff> subset = diffs;
-                if (isPartialCompaction)
+                if (diffs.size() > 1 && isPartialCompaction)
                 {
-                    subset = new ArrayList<>();
-                    int tmp1 = random.nextInt(diffs.size());
-                    int tmp2 = random.nextInt(diffs.size());
-                    from = Math.min(tmp1, tmp2);
-                    int max = Math.max(tmp1, tmp2);
-                    for (int i = from; i < max; i++)
-                        subset.add(diffs.get(i));
+                    int removeCount = 1 + random.nextInt(diffs.size() - 1);
+                    int count = diffs.size();
+                    subset = new ArrayList<>(diffs);
+                    while (removeCount-- > 0)
+                    {
+                        int removeIndex = random.nextInt(diffs.size());
+                        if (subset.get(removeIndex) == null)
+                            continue;
+                        subset.set(removeIndex, null);
+                        --count;
+                    }
+
+                    if (count == 0)
+                        continue;
                 }
 
-                InMemoryJournal.Builder builder = reconstruct(subset, ALL);
-                if (builder == null || builder.saveStatus() == null)
-                    continue; // Partial compaction, no save status present
+                Builder[] builders = new Builder[diffs.size()];
+                for (int i = 0 ; i < subset.size() ; ++i)
+                {
+                    if (subset.get(i) == null) continue;
+                    Builder builder = new Builder(e2.getKey(), ALL);
+                    builder.apply(subset.get(i));
+                    builders[i] = builder;
+                }
 
-                if (builder.saveStatus().status == Truncated || 
builder.saveStatus().status == Invalidated)
-                    continue; // Already truncated
+                Builder builder = new Builder(e2.getKey(), ALL);
+                for (int i = builders.length - 1; i >= 0 ; --i)
+                {
+                    Builder current = builders[i];
+                    if (current == null)
+                        continue;
+                    current.clearSuperseded(false, builder);
+                    builder.fillInMissingOrCleanup(false, current);
+                }
 
                 Input input = isPartialCompaction ? PARTIAL : FULL;
                 ++counter;
-                Cleanup cleanup = builder.shouldCleanup(input, 
store.agent(),store.unsafeGetRedundantBefore(), store.durableBefore());
-                cleanup = builder.maybeCleanup(input, cleanup);
+                Cleanup cleanup = builder.shouldCleanup(input, 
store.unsafeGetRedundantBefore(), store.durableBefore());
+                cleanup = builder.maybeCleanup(true, input, cleanup);
                 if (cleanup != NO)
                 {
                     if (cleanup == EXPUNGE)
                     {
-                        if (input == FULL) e2.setValue(new PurgedList());
+                        if (input == FULL || subset == diffs) e2.setValue(new 
PurgedList());
                         else diffs.removeAll(subset);
+                        continue;
                     }
                     else
                     {
-                        int flags = 
CommandChange.eraseKnownFieldsMask(cleanup.appliesIfNot);
-                        EnumMap<Field, Object> values = new 
EnumMap<>(Field.class);
-                        int iterator = toIterableSetFields(~flags) & 0x3FFF; 
// limit ourselves to 14 bits
-                        for (Field field = nextSetField(iterator); field != 
null; iterator = unsetIterable(field, iterator), field = nextSetField(iterator))
-                        {
-                            if (field == CLEANUP || field == SAVE_STATUS)
-                                continue;
-                            Object v = builder.get(field);
-                            if (v != null)
-                            {
-                                flags = setChanged(field, flags);
-                                values.put(field, v);
-                            }
-                        }
-                        values.put(SAVE_STATUS, cleanup.appliesIfNot);
-                        values.put(CLEANUP, cleanup);
-                        flags = setChanged(SAVE_STATUS, flags);
-                        flags = setChanged(CLEANUP, flags);
-
-                        Diff diff = new Diff(builder.txnId(), flags, values);
-                        // During partial compaction, we can only append_to 
the existing list, removing items that got compacted away
                         if (isPartialCompaction)
                         {
-                            int i = 0, j = 0;
-                            while (i < diffs.size() && j < subset.size())
+                            boolean saveCleanup = true;
+                            for (int i = builders.length - 1; i >= 0 ; --i)
                             {
-                                if (subset.get(j) == diffs.get(i))
-                                    ++j;
-                                ++i;
+                                if (builders[i] != null)
+                                {
+                                    if (saveCleanup) 
builders[i].addCleanup(false, cleanup);
+                                    else builders[i].cleanup(false, cleanup);
+                                    saveCleanup = false;
+                                }
                             }
-
-                            List<Diff> newDiffs = new 
ArrayList<>(diffs.subList(0, i));
-                            newDiffs.add(diff);
-                            newDiffs.addAll(diffs.subList(i, diffs.size()));
-                            e2.setValue(newDiffs);
                         }
                         // During full compaction, we erase all previous 
records, replacing them with new image
                         else
                         {
+                            Diff diff = builder.toDiff();
                             e2.setValue(cleanup == ERASE ? new 
ErasedList(diff) : new TruncatedList(diff));
+                            continue;
                         }
                     }
                 }
+
+                for (int i = 0 ; i < builders.length ; ++i)
+                {
+                    if (builders[i] != null)
+                    {
+                        Diff diff = builders[i].toDiff();
+                        diffs.set(i, diff.flags == 0 ? null : diff);
+                    }
+                }
             }
         }
     }
@@ -440,11 +457,11 @@ public class InMemoryJournal implements Journal
 
     private static class ErasedList extends AbstractList<Diff>
     {
-        final Diff erased;
+        private Diff erased;
 
         ErasedList(Diff erased)
         {
-            Invariants.requireArgument(erased.changes.get(SAVE_STATUS) == 
Erased);
+            Invariants.requireArgument(erased.changes.get(SAVE_STATUS) == 
Erased || erased.changes.get(CLEANUP) == ERASE);
             this.erased = erased;
         }
 
@@ -466,10 +483,21 @@ public class InMemoryJournal implements Journal
         public boolean add(Diff diff)
         {
             // TODO (expected): we shouldn't really be saving updates (such as 
durability updates) to Erased commands
-            if (diff.changes.get(SAVE_STATUS) == Erased || 
diff.changes.get(SAVE_STATUS) == null)
+            if (diff.changes.get(SAVE_STATUS) == Erased || 
diff.changes.get(SAVE_STATUS) == null || diff.changes.get(CLEANUP) == ERASE)
                 return false;
             throw illegalState();
         }
+
+        @Override
+        public Diff set(int index, Diff diff)
+        {
+            if (diff.changes.get(SAVE_STATUS) == Erased || 
diff.changes.get(CLEANUP) == ERASE)
+            {
+                erased = diff;
+                return erased;
+            }
+            return super.set(index, diff);
+        }
     }
 
     static class TruncatedList extends ArrayList<Diff>
@@ -504,13 +532,19 @@ public class InMemoryJournal implements Journal
         }
     }
 
-    private static Diff toDiff(CommandUpdate update)
+    private static Diff toDiff(@Nonnull CommandUpdate update)
     {
-        if (update == null
-            || update.before == update.after
-            || update.after == null
-            || update.after.saveStatus() == SaveStatus.Uninitialised)
-            return null;
+        if (update.before == null)
+        {
+            if (update.after.saveStatus() == Uninitialised)
+                return null;
+        }
+        else
+        {
+            Invariants.require(update.after.saveStatus() != Uninitialised);
+            if (update.before.saveStatus() == Erased)
+                return null;
+        }
 
         int flags = validateFlags(getFlags(update.before, update.after));
         if (!anyFieldChanged(flags))
@@ -650,11 +684,35 @@ public class InMemoryJournal implements Journal
             return executeAt;
         }
 
+        Diff toDiff()
+        {
+            int flags = this.flags;
+            EnumMap<Field, Object> values = new EnumMap<>(Field.class);
+
+            int iterator = toIterableSetFields(notNulls(flags)); // limit 
ourselves to 14 bits
+            for (Field field = nextSetField(iterator); field != null; iterator 
= unsetIterable(field, iterator), field = nextSetField(iterator))
+            {
+                if (field == CLEANUP)
+                    continue;
+                Object v = Invariants.nonNull(get(field));
+                values.put(field, v);
+            }
+
+            if (cleanup != null)
+            {
+                flags |= CommandChange.eraseKnownFieldsMask(cleanup.newStatus);
+                values.put(CLEANUP, cleanup);
+                flags = setChanged(CLEANUP, flags);
+            }
+
+            return new Diff(txnId(), flags, values);
+        }
+
         private void apply(Diff diff)
         {
             Invariants.require(diff.txnId != null);
             Invariants.require(diff.flags != 0);
-            nextCalled = true;
+            hasUpdate = true;
             count++;
 
             int iterable = toIterableSetFields(diff.flags);
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 1469bc47..3df406ec 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -31,6 +31,7 @@ import accord.api.Journal;
 import accord.local.Command;
 import accord.local.CommandStores;
 import accord.local.DurableBefore;
+import accord.local.Node;
 import accord.local.RedundantBefore;
 import accord.primitives.EpochSupplier;
 import accord.primitives.Ranges;
@@ -73,6 +74,13 @@ public class LoggingJournal implements Journal
         }
     }
 
+    @Override
+
+    public Journal start(Node node)
+    {
+        return this;
+    }
+
     @Override
     public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 28d7dfac..53b0117a 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -133,7 +133,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
         MessageSink messageSink = messageSinkFactory.apply(id, this);
         MockConfigurationService configurationService = new 
MockConfigurationService(messageSink, onFetchTopology, topology);
         Agent agent = new TestAgent();
-        Journal journal = new InMemoryJournal(id, agent, random.fork());
+        Journal journal = new InMemoryJournal(id, random.fork());
         ThreadPoolScheduler scheduler = new ThreadPoolScheduler();
         Node node = new Node(id,
                              messageSink,
@@ -153,6 +153,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
                              new CoordinationAdapter.DefaultFactory(),
                              DurableBefore.NOOP_PERSISTER,
                              journal);
+        journal.start(node);
         awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(topology, false, true);
         return node;
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java 
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index ec7b63a5..34cd2463 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -112,7 +112,7 @@ public class ImmutableCommandTest
                              InMemoryCommandStores.Synchronized::new,
                              new CoordinationAdapter.DefaultFactory(),
                              DurableBefore.NOOP_PERSISTER,
-                             new InMemoryJournal(id, agent, random.fork()));
+                             new InMemoryJournal(id, random.fork()));
         awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(storeSupport.local.get(), false, true);
         return node;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 01161363..ffc0d3ef 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -388,6 +388,7 @@ public class Cluster implements Scheduler
 
     public static class NoOpJournal implements Journal
     {
+        @Override public Journal start(Node node) { return null; }
         @Override public Command loadCommand(int store, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new 
IllegalStateException("Not impelemented"); }
         @Override public Command.Minimal loadMinimal(int store, TxnId txnId, 
Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) { 
throw new IllegalStateException("Not impelemented"); }
         @Override public void saveCommand(int store, CommandUpdate value, 
Runnable onFlush)  { throw new IllegalStateException("Not impelemented"); }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to