This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4c870dc9 Accord Journal / Determinism
4c870dc9 is described below

commit 4c870dc9b561a841ea9b923ff739953adcc00325
Author: Alex Petrov <[email protected]>
AuthorDate: Mon Jun 17 09:53:43 2024 +0200

    Accord Journal / Determinism
    
      * Store intermediate Command states in the log
      * Load Command states from the log
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-19757
---
 .../java/accord/impl/InMemoryCommandStore.java     |  13 -
 .../src/main/java/accord/local/Cleanup.java        |   1 -
 .../src/main/java/accord/local/Command.java        |  18 +-
 .../src/main/java/accord/local/Listeners.java      |   2 +-
 .../main/java/accord/local/SafeCommandStore.java   |   5 +
 .../main/java/accord/local/SerializerSupport.java  | 613 ---------------------
 .../src/main/java/accord/utils/Invariants.java     |   7 +
 .../accord/impl/basic/DelayedCommandStores.java    |  49 +-
 .../src/test/java/accord/impl/basic/Journal.java   | 560 +++++++++++--------
 .../java/accord/local/BootstrapLocalTxnTest.java   |   3 +-
 .../test/java/accord/local/CheckedCommands.java    |  35 ++
 .../java/accord/utils/ReducingRangeMapTest.java    |   4 -
 12 files changed, 401 insertions(+), 909 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 5ee3f8d5..4932b181 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -461,7 +461,6 @@ public abstract class InMemoryCommandStore extends 
CommandStore
 
     private <T> T executeInContext(InMemoryCommandStore commandStore, 
PreLoadContext preLoadContext, Function<? super SafeCommandStore, T> function, 
boolean isDirectCall)
     {
-
         SafeCommandStore safeStore = 
commandStore.beginOperation(preLoadContext);
         try
         {
@@ -623,18 +622,6 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         }
     }
 
-    private static class TimestampAndStatus
-    {
-        public final Timestamp timestamp;
-        public final Status status;
-
-        public TimestampAndStatus(Timestamp timestamp, Status status)
-        {
-            this.timestamp = timestamp;
-            this.status = status;
-        }
-    }
-
     public static class GlobalTimestampsForKey extends 
GlobalState<TimestampsForKey>
     {
         private final Key key;
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index bb6b1d2e..5da56e3c 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -34,7 +34,6 @@ import static 
accord.local.SaveStatus.TruncatedApplyWithOutcome;
 import static accord.local.SaveStatus.Uninitialised;
 import static accord.local.Status.Applied;
 import static accord.local.Status.Durability.Majority;
-import static accord.local.Status.Durability.Universal;
 import static accord.local.Status.Durability.UniversalOrInvalidated;
 import static accord.local.Status.PreCommitted;
 import static accord.primitives.Txn.Kind.EphemeralRead;
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index 96df4461..51bd9410 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -69,7 +69,7 @@ import static java.lang.String.format;
 
 public abstract class Command implements CommonAttributes
 {
-    interface Listener
+    public interface Listener
     {
         void onChange(SafeCommandStore safeStore, SafeCommand safeCommand);
 
@@ -275,6 +275,7 @@ public abstract class Command implements CommonAttributes
             this.listeners = common.durableListeners();
         }
 
+
         @Override
         public boolean equals(Object o)
         {
@@ -645,9 +646,15 @@ public abstract class Command implements CommonAttributes
         SaveStatus saveStatus = saveStatus();
         return saveStatus.hasBeen(Status.Committed) && 
!saveStatus.hasBeen(Invalidated);
     }
+
     public final boolean isStable()
     {
         SaveStatus saveStatus = saveStatus();
+        return isStable(saveStatus);
+    }
+
+    public static boolean isStable(SaveStatus saveStatus)
+    {
         return saveStatus.hasBeen(Status.Stable) && 
!saveStatus.hasBeen(Invalidated);
     }
 
@@ -661,11 +668,6 @@ public abstract class Command implements CommonAttributes
         return Invariants.cast(this, Executed.class);
     }
 
-    public final boolean isTruncated()
-    {
-        return status().hasBeen(Status.Truncated);
-    }
-
     public abstract Command updateAttributes(CommonAttributes attrs, Ballot 
promised);
 
     public final Command updateAttributes(CommonAttributes attrs)
@@ -1105,7 +1107,7 @@ public abstract class Command implements CommonAttributes
             return Objects.equals(acceptedOrCommitted(), 
that.acceptedOrCommitted());
         }
 
-        static Accepted accepted(CommonAttributes common, SaveStatus status, 
Timestamp executeAt, Ballot promised, Ballot accepted)
+        public static Accepted accepted(CommonAttributes common, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted)
         {
             return validate(new Accepted(common, status, promised, executeAt, 
accepted));
         }
@@ -1197,7 +1199,7 @@ public abstract class Command implements CommonAttributes
             return committed(command, common, command.promised(), 
command.saveStatus(), waitingOn);
         }
 
-        static Committed committed(CommonAttributes common, SaveStatus status, 
Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn)
+        public static Committed committed(CommonAttributes common, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn 
waitingOn)
         {
             return validate(new Committed(common, status, executeAt, promised, 
accepted, waitingOn));
         }
diff --git a/accord-core/src/main/java/accord/local/Listeners.java 
b/accord-core/src/main/java/accord/local/Listeners.java
index 13929713..b4059620 100644
--- a/accord-core/src/main/java/accord/local/Listeners.java
+++ b/accord-core/src/main/java/accord/local/Listeners.java
@@ -51,7 +51,7 @@ public class Listeners<L extends Command.Listener> extends 
DeterministicSet<L>
             super(listeners);
         }
 
-        Listeners<L> mutable()
+        public Listeners<L> mutable()
         {
             return new Listeners<>(this);
         }
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 139bf088..7aea6790 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -194,6 +194,11 @@ public abstract class SafeCommandStore
     protected abstract SafeCommandsForKey 
getInternalIfLoadedAndInitialised(Key key);
     public abstract boolean canExecuteWith(PreLoadContext context);
 
+    public void load(Command loaded)
+    {
+        update(null, loaded);
+    }
+
     protected void update(Command prev, Command updated)
     {
         updateMaxConflicts(prev, updated);
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java 
b/accord-core/src/main/java/accord/local/SerializerSupport.java
deleted file mode 100644
index 7ad168a2..00000000
--- a/accord-core/src/main/java/accord/local/SerializerSupport.java
+++ /dev/null
@@ -1,613 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package accord.local;
-
-import java.util.Set;
-import javax.annotation.Nullable;
-
-import com.google.common.collect.ImmutableSet;
-
-import accord.api.Agent;
-import accord.api.Result;
-import accord.api.VisibleForImplementation;
-import accord.local.Command.WaitingOn;
-import accord.local.CommandStores.RangesForEpoch;
-import accord.local.CommonAttributes.Mutable;
-import accord.messages.Accept;
-import accord.messages.Apply;
-import accord.messages.ApplyThenWaitUntilApplied;
-import accord.messages.BeginRecovery;
-import accord.messages.Commit;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.Propagate;
-import accord.primitives.Ballot;
-import accord.primitives.Deps;
-import accord.primitives.FullRangeRoute;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
-import accord.primitives.Ranges;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.primitives.Writes;
-import accord.utils.Invariants;
-
-import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
-import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
-import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ;
-import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
-import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
-import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ;
-import static accord.messages.MessageType.PRE_ACCEPT_REQ;
-import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
-import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
-import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
-import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
-import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
-import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
-import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
-import static accord.primitives.PartialTxn.merge;
-import static accord.utils.Invariants.checkState;
-import static accord.utils.Invariants.illegalState;
-
-@VisibleForImplementation
-public class SerializerSupport
-{
-    private static final Set<MessageType> PRE_ACCEPT_TYPES =
-    ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, 
PROPAGATE_PRE_ACCEPT_MSG);
-
-    private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES =
-    ImmutableSet.<MessageType>builder()
-                .addAll(PRE_ACCEPT_TYPES)
-                .add(COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ)
-                .build();
-
-    private static final Set<MessageType> PRE_ACCEPT_STABLE_TYPES =
-    ImmutableSet.<MessageType>builder()
-                .addAll(PRE_ACCEPT_COMMIT_TYPES)
-                .add(STABLE_FAST_PATH_REQ, STABLE_SLOW_PATH_REQ, 
STABLE_MAXIMAL_REQ, PROPAGATE_STABLE_MSG)
-                .build();
-
-    private static final Set<MessageType> APPLY_TYPES =
-    ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG, 
APPLY_THEN_WAIT_UNTIL_APPLIED_REQ);
-
-    private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
-    ImmutableSet.<MessageType>builder()
-                .addAll(PRE_ACCEPT_STABLE_TYPES)
-                .addAll(APPLY_TYPES)
-                .add(PROPAGATE_OTHER_MSG)
-                .build();
-
-    private static Command localOnly(Agent agent, RangesForEpoch 
rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, 
@Nullable Timestamp executesAtLeast, Ballot promised, Ballot accepted, 
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
-    {
-        //TODO (expected): LocalOnly doesn't reflect normal command flow so 
relying on this special casing helps reconstruct work, but is a bit brittle; 
should find a more maintainable way.
-        TxnId txnId = attrs.txnId();
-        FullRangeRoute route = (FullRangeRoute) attrs.route();
-        //TODO (correctness): not 100% correct as the ranges was "valid" which 
can be mutated "after" the sync point... so might actually have less
-        Ranges participantRanges = route.participants().toRanges();
-
-        Txn emptyTxn = agent.emptyTxn(txnId.kind(), participantRanges);
-        Writes writes = emptyTxn.execute(txnId, txnId, null);
-        Result results = emptyTxn.result(txnId, txnId, null);
-        Ranges coordinateRanges = rangesForEpoch.coordinates(txnId);
-        attrs.partialTxn(emptyTxn.slice(coordinateRanges, true))
-             .partialDeps(Deps.NONE.slice(coordinateRanges));
-
-        switch (status.status)
-        {
-            case NotDefined:
-                return status == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(txnId)
-                                                          : 
Command.NotDefined.notDefined(attrs, promised);
-            case PreAccepted:
-                return Command.PreAccepted.preAccepted(attrs, executeAt, 
promised);
-            case AcceptedInvalidate:
-            case Accepted:
-            case PreCommitted:
-                return Command.Accepted.accepted(attrs, status, executeAt, 
promised, accepted);
-            case Committed:
-            case Stable:
-                return Command.Committed.committed(attrs, status, executeAt, 
promised, accepted, waitingOnProvider.provide(attrs.partialDeps()));
-            case PreApplied:
-            case Applied:
-                return Command.Executed.executed(attrs, status, executeAt, 
promised, accepted, waitingOnProvider.provide(attrs.partialDeps()), writes, 
results);
-            case Truncated:
-            case Invalidated:
-                switch (status)
-                {
-                    case Erased:
-                        return Command.Truncated.erased(txnId, 
attrs.durability(), attrs.route());
-                    case TruncatedApplyWithOutcome:
-                        return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, results, executesAtLeast);
-                    case TruncatedApply:
-                        return Command.Truncated.truncatedApply(attrs, status, 
executeAt, null, null, executesAtLeast);
-                    default:
-                        throw new AssertionError("Unexpected truncate status: 
" + status);
-                }
-            default:
-                throw new IllegalStateException();
-        }
-    }
-
-    /**
-     * Reconstructs Command from register values and protocol messages.
-     */
-    public static Command reconstruct(Agent agent, RangesForEpoch 
rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, 
@Nullable Timestamp executesAtLeast, Ballot promised, Ballot accepted, 
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
-    {
-        if (attrs.txnId().kind() == Txn.Kind.LocalOnly)
-            return localOnly(agent, rangesForEpoch, attrs, status, executeAt, 
executesAtLeast, promised, accepted, waitingOnProvider, messageProvider);
-        switch (status.status)
-        {
-            case NotDefined:
-                return status == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
-                                                          : 
Command.NotDefined.notDefined(attrs, promised);
-            case PreAccepted:
-                return preAccepted(rangesForEpoch, attrs, executeAt, promised, 
messageProvider);
-            case AcceptedInvalidate:
-            case Accepted:
-            case PreCommitted:
-                return accepted(rangesForEpoch, attrs, status, executeAt, 
promised, accepted, messageProvider);
-            case Committed:
-            case Stable:
-                return committed(rangesForEpoch, attrs, status, executeAt, 
promised, accepted, waitingOnProvider, messageProvider);
-            case PreApplied:
-            case Applied:
-                return executed(rangesForEpoch, attrs, status, executeAt, 
promised, accepted, waitingOnProvider, messageProvider);
-            case Truncated:
-            case Invalidated:
-                return truncated(attrs, status, executeAt, executesAtLeast, 
messageProvider);
-            default:
-                throw new IllegalStateException();
-        }
-    }
-
-    private static Command.PreAccepted preAccepted(RangesForEpoch 
rangesForEpoch, Mutable attrs, Timestamp executeAt, Ballot promised, 
MessageProvider messageProvider)
-    {
-        Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
-        checkState(!witnessed.isEmpty(), "PreAccepted message types not 
witnessed; witnessed is ", new LoggedMessageProvider(messageProvider));
-        attrs.partialTxn(txnFromPreAcceptOrBeginRecover(rangesForEpoch, 
witnessed, messageProvider));
-        return Command.PreAccepted.preAccepted(attrs, executeAt, promised);
-    }
-
-    private static Command.Accepted accepted(RangesForEpoch rangesForEpoch, 
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot 
accepted, MessageProvider messageProvider)
-    {
-        if (status.known.isDefinitionKnown())
-        {
-            Set<MessageType> witnessed = 
messageProvider.test(PRE_ACCEPT_TYPES);
-            checkState(!witnessed.isEmpty());
-            attrs.partialTxn(txnFromPreAcceptOrBeginRecover(rangesForEpoch, 
witnessed, messageProvider));
-        }
-
-        if (status.known.deps.hasProposedDeps())
-        {
-            Accept accept = messageProvider.accept(accepted);
-            attrs.partialDeps(slicePartialDeps(rangesForEpoch, accept));
-        }
-
-        return Command.Accepted.accepted(attrs, status, executeAt, promised, 
accepted);
-    }
-
-    private static Command.Committed committed(RangesForEpoch rangesForEpoch, 
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot 
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
-    {
-        attrs = extract(rangesForEpoch, status, accepted, messageProvider, 
(attrs0, txn, deps, i1, i2) -> attrs0.partialTxn(txn).partialDeps(deps), attrs);
-        return Command.Committed.committed(attrs, status, executeAt, promised, 
accepted, waitingOnProvider.provide(attrs.partialDeps()));
-    }
-
-    private static Command.Executed executed(RangesForEpoch rangesForEpoch, 
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot 
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
-    {
-        return extract(rangesForEpoch, status, accepted, messageProvider, 
(attrs0, txn, deps, writes, result) -> {
-            attrs0.partialTxn(txn)
-                 .partialDeps(deps);
-
-            return Command.Executed.executed(attrs, status, executeAt, 
promised, accepted, waitingOnProvider.provide(deps), writes, result);
-        }, attrs);
-    }
-
-    private static Command.Truncated truncated(Mutable attrs, SaveStatus 
status, Timestamp executeAt, @Nullable Timestamp executesAtLeast, 
MessageProvider messageProvider)
-    {
-        Writes writes = null;
-        Result result = null;
-
-        switch (status)
-        {
-            default:
-                throw illegalState("Unhandled SaveStatus: " + status);
-            case TruncatedApplyWithOutcome:
-            case TruncatedApplyWithDeps:
-                Set<MessageType> witnessed = messageProvider.test(APPLY_TYPES);
-                checkState(!witnessed.isEmpty());
-                if (witnessed.contains(APPLY_MAXIMAL_REQ))
-                {
-                    Apply apply = messageProvider.applyMaximal();
-                    writes = apply.writes;
-                    result = apply.result;
-                }
-                else if (witnessed.contains(APPLY_MINIMAL_REQ))
-                {
-                    Apply apply = messageProvider.applyMinimal();
-                    writes = apply.writes;
-                    result = apply.result;
-                }
-                else if (witnessed.contains(PROPAGATE_APPLY_MSG))
-                {
-                    Propagate propagate = messageProvider.propagateApply();
-                    writes = propagate.writes;
-                    result = propagate.result;
-                }
-                else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ))
-                {
-                    ApplyThenWaitUntilApplied apply = 
messageProvider.applyThenWaitUntilApplied();
-                    writes = apply.writes;
-                    result = apply.result;
-                }
-                else
-                {
-                    throw new UnsupportedOperationException("Unhandled types: 
" + witnessed);
-                }
-            case TruncatedApply:
-                return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result, executesAtLeast);
-            case ErasedOrInvalidated:
-                return Command.Truncated.erasedOrInvalidated(attrs.txnId(), 
attrs.durability(), attrs.route());
-            case Erased:
-                return Command.Truncated.erased(attrs.txnId(), 
attrs.durability(), attrs.route());
-            case Invalidated:
-                return Command.Truncated.invalidated(attrs.txnId(), 
attrs.durableListeners());
-        }
-    }
-
-    public static class TxnAndDeps
-    {
-        public static TxnAndDeps EMPTY = new TxnAndDeps(null, null);
-
-        public final PartialTxn txn;
-        public final PartialDeps deps;
-
-        TxnAndDeps(PartialTxn txn, PartialDeps deps)
-        {
-            this.txn = txn;
-            this.deps = deps;
-        }
-    }
-
-    interface WithContents<I, O>
-    {
-        O apply(I in, PartialTxn txn, PartialDeps deps, Writes writes, Result 
result);
-    }
-
-    public static TxnAndDeps extractTxnAndDeps(RangesForEpoch rangesForEpoch, 
SaveStatus status, Ballot accepted, MessageProvider messageProvider)
-    {
-        return extract(rangesForEpoch, status, accepted, messageProvider, (i1, 
txn, deps, i2, i3) -> new TxnAndDeps(txn, deps), null);
-    }
-
-    private static <I, O> O extract(RangesForEpoch rangesForEpoch, SaveStatus 
status, Ballot accepted, MessageProvider messageProvider, WithContents<I, O> 
withContents, I param)
-    {
-        // TODO (expected): first check if we have taken the normal path
-        Set<MessageType> witnessed;
-
-        // TODO (required): we must select the deps we would have used for 
initialiseWaitingOn
-        switch (status.status)
-        {
-            case PreAccepted:
-                witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
-                checkState(!witnessed.isEmpty(), "Unable to find PreAccept 
types; witnessed %s", new LoggedMessageProvider(messageProvider));
-                return withContents.apply(param, 
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider), 
null, null, null);
-
-            case AcceptedInvalidate:
-            case Accepted:
-            case PreCommitted:
-            {
-                PartialTxn txn = null;
-                PartialDeps deps = null;
-
-                if (status.known.isDefinitionKnown())
-                {
-                    witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
-                    checkState(!witnessed.isEmpty(), "Unable to find PreAccept 
types; witnessed %s", new LoggedMessageProvider(messageProvider));
-                    txn = txnFromPreAcceptOrBeginRecover(rangesForEpoch, 
witnessed, messageProvider);
-                }
-
-                if (status.known.deps.hasProposedDeps())
-                {
-                    Accept accept = messageProvider.accept(accepted);
-                    deps = slicePartialDeps(rangesForEpoch, accept);
-                }
-                return withContents.apply(param, txn, deps, null, null);
-            }
-            case Committed:
-            {
-                witnessed = messageProvider.test(PRE_ACCEPT_COMMIT_TYPES);
-                Commit commit;
-                if (witnessed.contains(COMMIT_MAXIMAL_REQ))
-                {
-                    commit = messageProvider.commitMaximal();
-                }
-                else
-                {
-                    
Invariants.checkState(witnessed.contains(COMMIT_SLOW_PATH_REQ), "Unable to find 
COMMIT_SLOW_PATH_REQ; witnessed %s", new 
LoggedMessageProvider(messageProvider));
-                    commit = messageProvider.commitSlowPath();
-                }
-
-                return sliceAndApply(rangesForEpoch, messageProvider, 
witnessed, commit, withContents, param, null, null);
-            }
-
-            case Stable:
-            {
-                // TODO (required): we should piece this back together in the 
precedence order of arrival
-                witnessed = messageProvider.test(PRE_ACCEPT_STABLE_TYPES);
-                Commit commit;
-                if (witnessed.contains(STABLE_MAXIMAL_REQ))
-                {
-                    commit = messageProvider.stableMaximal();
-                }
-                else if (witnessed.contains(PROPAGATE_STABLE_MSG))
-                {
-                    return sliceAndApply(rangesForEpoch, 
messageProvider.propagateStable(), withContents, param, null, null);
-                }
-                else if (witnessed.contains(STABLE_FAST_PATH_REQ))
-                {
-                    commit = messageProvider.stableFastPath();
-                }
-                else
-                {
-                    checkState(witnessed.contains(STABLE_SLOW_PATH_REQ), 
"Unable to find STABLE_SLOW_PATH_REQ; txn_id=%s, witnessed %s", 
messageProvider.txnId(), new LoggedMessageProvider(messageProvider));
-                    if (witnessed.contains(COMMIT_MAXIMAL_REQ))
-                    {
-                        commit = messageProvider.commitMaximal();
-                    }
-                    else if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
-                    {
-                        commit = messageProvider.commitSlowPath();
-                    }
-                    else if (witnessed.contains(PRE_ACCEPT_REQ) || 
witnessed.contains(BEGIN_RECOVER_REQ) || 
witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
-                    {
-                        Commit slowPath = messageProvider.stableSlowPath();
-                        Ranges ranges = 
rangesForEpoch.allBetween(slowPath.txnId.epoch(), slowPath.executeAt.epoch());
-                        PartialTxn txn = 
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, 
messageProvider).slice(ranges, true);
-                        PartialDeps deps = slowPath.partialDeps.slice(ranges);
-                        return withContents.apply(param, txn, deps, null, 
null);
-                    }
-                    else
-                    {
-                        throw illegalState("Unable to find 
COMMIT_SLOW_PATH_REQ; witnessed %s", new 
LoggedMessageProvider(messageProvider));
-                    }
-                }
-
-                return sliceAndApply(rangesForEpoch, messageProvider, 
witnessed, commit, withContents, param, null, null);
-            }
-
-            case PreApplied:
-            case Applied:
-            {
-                witnessed = 
messageProvider.test(PRE_ACCEPT_COMMIT_APPLY_TYPES);
-                if (witnessed.contains(APPLY_MAXIMAL_REQ))
-                {
-                    Apply apply = messageProvider.applyMaximal();
-                    Ranges ranges = 
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
-                    return withContents.apply(param, apply.txn.slice(ranges, 
true), apply.deps.slice(ranges), apply.writes, apply.result);
-                }
-                else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ))
-                {
-                    ApplyThenWaitUntilApplied apply = 
messageProvider.applyThenWaitUntilApplied();
-                    Ranges ranges = 
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
-                    return withContents.apply(param, apply.txn.slice(ranges, 
true), apply.deps.slice(ranges), apply.writes, apply.result);
-                }
-                else if (witnessed.contains(APPLY_MINIMAL_REQ))
-                {
-                    Apply apply = messageProvider.applyMinimal();
-                    Commit commit;
-                    if (witnessed.contains(STABLE_MAXIMAL_REQ))
-                    {
-                        commit = messageProvider.stableMaximal();
-                    }
-                    else if (witnessed.contains(PROPAGATE_STABLE_MSG))
-                    {
-                        Propagate propagate = 
messageProvider.propagateStable();
-                        var ranges = propagate.committedExecuteAt == null ? 
rangesForEpoch.allAt(propagate.txnId) : 
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
-                        return withContents.apply(param, 
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges), 
apply.writes, apply.result);
-                    }
-                    else if (witnessed.contains(PROPAGATE_APPLY_MSG))
-                    {
-                        Propagate propagate = messageProvider.propagateApply();
-                        var ranges = propagate.committedExecuteAt == null ? 
rangesForEpoch.allAt(propagate.txnId) : 
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
-                        return withContents.apply(param, 
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges), 
apply.writes, apply.result);
-                    }
-                    else if (witnessed.contains(COMMIT_MAXIMAL_REQ))
-                    {
-                        commit = messageProvider.commitMaximal();
-                    }
-                    else if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
-                    {
-                        commit = messageProvider.commitSlowPath();
-                    }
-                    else if (witnessed.contains(STABLE_FAST_PATH_REQ))
-                    {
-                        commit = messageProvider.stableFastPath();
-                    }
-                    else if (witnessed.contains(PRE_ACCEPT_REQ) || 
witnessed.contains(BEGIN_RECOVER_REQ) || 
witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
-                    {
-                        PartialTxn txn = 
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
-                        Ranges ranges = 
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
-                        return withContents.apply(param, txn.slice(ranges, 
true), apply.deps.slice(ranges), apply.writes, apply.result);
-                    }
-                    else
-                    {
-                        throw illegalState("Invalid state: insufficient stable 
or commit messages found to reconstruct PreApplied or greater SaveStatus; 
witnessed " + witnessed);
-                    }
-
-                    return sliceAndApply(rangesForEpoch, messageProvider, 
witnessed, commit, withContents, param, apply.writes, apply.result);
-                }
-                else if (witnessed.contains(PROPAGATE_APPLY_MSG))
-                {
-                    Propagate propagate = messageProvider.propagateApply();
-                    Invariants.nonNull(propagate.partialTxn, "Unable to find 
partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider));
-                    Invariants.nonNull(propagate.stableDeps, "Unable to find 
stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider));
-                    return sliceAndApply(rangesForEpoch, propagate, 
withContents, param, propagate.writes, propagate.result);
-                }
-                else if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
-                {
-                    // once propgate runs locally it merges the local state 
with the remote state, which may make this go from PRE_ACCEPT to PRE_APPLIED!
-                    Propagate propagate = messageProvider.propagatePreAccept();
-                    Invariants.nonNull(propagate.partialTxn, "Unable to find 
partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider));
-                    Invariants.nonNull(propagate.stableDeps, "Unable to find 
stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider));
-
-                    var ranges = propagate.committedExecuteAt == null ? 
rangesForEpoch.allAt(propagate.txnId) : 
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
-                    return withContents.apply(param, 
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges), 
propagate.writes, propagate.result);
-                }
-                else if (witnessed.contains(PROPAGATE_OTHER_MSG))
-                {
-                    // the txn/deps may have been erased, won't always be 
here...
-                    Propagate propagate = messageProvider.propagateOther();
-                    var ranges = propagate.committedExecuteAt == null ? 
rangesForEpoch.allAt(propagate.txnId) : 
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
-                    PartialTxn txn = propagate.partialTxn == null ? null : 
propagate.partialTxn.slice(ranges, true);
-                    PartialDeps deps = propagate.stableDeps == null ? null : 
propagate.stableDeps.slice(ranges);
-                    return withContents.apply(param, txn, deps, 
propagate.writes, propagate.result);
-                }
-                else
-                {
-                    throw illegalState("Unable to find messages that lead to 
PreApplied state; txn_id=%s, witnessed %s", messageProvider.txnId(), new 
LoggedMessageProvider(messageProvider));
-                }
-            }
-
-            case NotDefined:
-            case Truncated:
-            case Invalidated:
-                return withContents.apply(param, null, null, null, null);
-
-            default:
-                throw new IllegalStateException();
-        }
-    }
-
-    private static PartialTxn txnFromPreAcceptOrBeginRecover(RangesForEpoch 
rangesForEpoch, Set<MessageType> witnessed, MessageProvider messageProvider)
-    {
-        if (witnessed.contains(PRE_ACCEPT_REQ))
-        {
-            PreAccept preAccept = messageProvider.preAccept();
-            return 
preAccept.partialTxn.slice(rangesForEpoch.allBetween(preAccept.txnId.epoch(), 
preAccept.maxEpoch), true);
-        }
-
-        if (witnessed.contains(BEGIN_RECOVER_REQ))
-        {
-            BeginRecovery beginRecovery = messageProvider.beginRecover();
-            return 
beginRecovery.partialTxn.slice(rangesForEpoch.allAt(beginRecovery.txnId.epoch()),
 true);
-        }
-
-        // TODO (expected): do we ever propagate only preaccept anymore?
-        if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
-        {
-            Propagate propagate = messageProvider.propagatePreAccept();
-            return 
propagate.partialTxn.slice(rangesForEpoch.allBetween(propagate.txnId.epoch(), 
propagate.toEpoch), true);
-        }
-
-        return null;
-    }
-
-    private static PartialDeps slicePartialDeps(RangesForEpoch rangesForEpoch, 
Accept accept)
-    {
-        return 
accept.partialDeps.slice(rangesForEpoch.allBetween(accept.txnId.epoch(), 
accept.executeAt.epoch()));
-    }
-
-    private static <I, O> O sliceAndApply(RangesForEpoch rangesForEpoch, 
Propagate propagate, WithContents<I, O> withContents, I param, Writes writes, 
Result result)
-    {
-        Ranges ranges = rangesForEpoch.allBetween(propagate.txnId.epoch(), 
propagate.committedExecuteAt.epoch());
-        PartialDeps partialDeps = propagate.stableDeps.slice(ranges);
-        PartialTxn partialTxn = propagate.partialTxn.slice(ranges, true);
-        return withContents.apply(param, partialTxn, partialDeps, writes, 
result);
-    }
-
-    private static <I, O> O sliceAndApply(RangesForEpoch rangesForEpoch, 
MessageProvider messageProvider, Set<MessageType> witnessed, Commit commit, 
WithContents<I, O> withContents, I param, Writes writes, Result result)
-    {
-        Ranges ranges = rangesForEpoch.allBetween(commit.txnId.epoch(), 
commit.executeAt.epoch());
-        PartialDeps partialDeps = commit.partialDeps.slice(ranges);
-        PartialTxn partialTxn = commit.partialTxn == null ? null : 
commit.partialTxn.slice(ranges, true);
-        switch (commit.kind)
-        {
-            default: throw new AssertionError("Unhandled Commit.Kind: " + 
commit.kind);
-            case CommitSlowPath:
-            case StableFastPath:
-            case StableSlowPath:
-                PartialTxn preAcceptedPartialTxn = 
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
-                if (partialTxn == null || partialTxn.keys().size() == 0) 
partialTxn = preAcceptedPartialTxn;
-                else partialTxn = merge(preAcceptedPartialTxn, partialTxn);
-                if (partialTxn == null && 
witnessed.contains(COMMIT_MAXIMAL_REQ))
-                    partialTxn = messageProvider.commitMaximal().partialTxn;
-            case StableWithTxnAndDeps:
-            case CommitWithTxn:
-        }
-        return withContents.apply(param, partialTxn, partialDeps, writes, 
result);
-    }
-
-
-    public interface WaitingOnProvider
-    {
-        WaitingOn provide(PartialDeps deps);
-    }
-
-    // TODO (required): randomised testing that we always restore the exact 
same state
-    public interface MessageProvider
-    {
-        TxnId txnId();
-        Set<MessageType> test(Set<MessageType> messages);
-        Set<MessageType> all();
-
-        PreAccept preAccept();
-
-        BeginRecovery beginRecover();
-
-        Propagate propagatePreAccept();
-
-        Accept accept(Ballot ballot);
-
-        Commit commitSlowPath();
-
-        Commit commitMaximal();
-
-        Commit stableFastPath();
-        Commit stableSlowPath();
-
-        Commit stableMaximal();
-
-        Propagate propagateStable();
-
-        Apply applyMinimal();
-
-        Apply applyMaximal();
-
-        Propagate propagateApply();
-
-        Propagate propagateOther();
-
-        ApplyThenWaitUntilApplied applyThenWaitUntilApplied();
-    }
-
-    private static class LoggedMessageProvider
-    {
-        private final MessageProvider messageProvider;
-
-        private LoggedMessageProvider(MessageProvider messageProvider)
-        {
-            this.messageProvider = messageProvider;
-        }
-
-        @Override
-        public String toString()
-        {
-            return messageProvider.all().toString();
-        }
-    }
-}
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java 
b/accord-core/src/main/java/accord/utils/Invariants.java
index 842ed5d3..ad9d8b35 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -19,6 +19,7 @@
 package accord.utils;
 
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
 import net.nicoulaj.compilecommand.annotations.Inline;
@@ -101,6 +102,12 @@ public class Invariants
             illegalState();
     }
 
+    public static void checkState(boolean condition, Supplier<String> msg)
+    {
+        if (!condition)
+            illegalState(msg.get());
+    }
+
     public static void checkState(boolean condition, String msg)
     {
         if (!condition)
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 5ba32268..4e96d890 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
 import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.ProgressLog;
+import accord.api.Result;
 import accord.impl.InMemoryCommandStore;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.InMemorySafeCommand;
@@ -41,15 +42,14 @@ import accord.impl.InMemorySafeCommandsForKey;
 import accord.impl.InMemorySafeTimestampsForKey;
 import accord.impl.PrefixedIntHashKey;
 import accord.impl.basic.TaskExecutorService.Task;
+import accord.impl.mock.MockStore;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores;
-import accord.local.CommonAttributes;
 import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
-import accord.local.SerializerSupport;
 import accord.local.ShardDistributor;
 import accord.primitives.Range;
 import accord.primitives.RoutableKey;
@@ -77,18 +77,12 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             return Pattern.compile(path);
         }
 
-        private static final List<Pattern> KNOWN_ISSUES = 
List.of(field(".partialTxn.keys."),
-                                                                  
field(".partialTxn.read.keys."),
-                                                                  
field(".partialTxn.read.userReadKeys."),
-                                                                  
field(".partialTxn.covering."),
-                                                                  
field(".partialDeps.keyDeps."),
-                                                                  
field(".partialDeps.rangeDeps."),
-                                                                  
field(".partialDeps.covering."),
-                                                                  
field(".additionalKeysOrRanges."),
+        private static final List<Pattern> KNOWN_ISSUES = List.of(
                                                                   // when a 
new epoch is detected and the execute ranges have more than the coordinating 
ranges,
                                                                   // and the 
coordinating ranges doesn't include the home key... we drop the query...
                                                                   // The logic 
to stitch messages together is not able to handle this as it doesn't know the 
original Topologies
-                                                                  
field(".partialTxn.query."),
+                                                                  // TODO 
(required): test int if this is still true
+//                                                                  
field(".partialTxn.query."),
                                                                   // 
cmd.mutable().build() != cmd.  This is due to Command.durability changing 
NotDurable to Local depending on the status
                                                                   
field(".durability."));
     }
@@ -172,28 +166,12 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             // "loading" the command doesn't make sense as we don't "store" 
the command...
             if (current.txnId().kind() == Txn.Kind.EphemeralRead)
                 return;
-            Command.WaitingOn waitingOn = null;
-            if (current.isStable() && !current.isTruncated())
-                waitingOn = current.asCommitted().waitingOn;
-            SerializerSupport.MessageProvider messages = 
journal.makeMessageProvider(current.txnId());
-            Command.WaitingOn finalWaitingOn = waitingOn;
-            CommonAttributes.Mutable mutable = current.mutable();
-            mutable.partialDeps(null).removePartialTxn();
 
-            Command reconstructed;
-            try
-            {
-                reconstructed = SerializerSupport.reconstruct(agent(), 
unsafeRangesForEpoch(), mutable, current.saveStatus(), current.executeAt(), 
current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null, 
current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn, 
messages);
-            }
-            catch (IllegalStateException t)
-            {
-                //TODO (correctness): journal doesn't guarantee we pick the 
same records we used to state transition
-                // Journal stores a list of messages it saw in some order it 
defines, but when reconstructing a command we don't actually know what messages 
were used, this could
-                // lead to a case where deps mismatch, so ignoring this for now
-                if (t.getMessage() != null && t.getMessage().startsWith("Deps 
do not match; expected"))
-                    return;
-                throw t;
-            }
+            Result result = current.result();
+            if (result == null)
+                result = MockStore.RESULT;
+            // Journal will not have result persisted. This part is here for 
test purposes and ensuring that we have strict object equality.
+            Command reconstructed = journal.reconstruct(id, current.txnId(), 
result);
             List<Difference<?>> diff = 
ReflectionUtils.recursiveEquals(current, reconstructed);
             List<String> filteredDiff = diff.stream().filter(d -> 
!DelayedCommandStores.hasKnownIssue(d.path)).map(Object::toString).collect(Collectors.toList());
             Invariants.checkState(filteredDiff.isEmpty(), "Commands did not 
match: expected %s, given %s, node %s, store %d, diff %s", current, 
reconstructed, time, id(), new LazyToString(() -> String.join("\n", 
filteredDiff)));
@@ -303,10 +281,11 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             commands.entrySet().forEach(e -> {
                 InMemorySafeCommand safe = e.getValue();
                 if (!safe.isModified()) return;
+
+                Command before = safe.original();
+                Command after = safe.current();
+                commandStore.journal.onExecute(commandStore.id(), before, 
after, context.primaryTxnId().equals(after.txnId()));
                 commandStore.validateRead(safe.current());
-                Command original = safe.original();
-                if (original != null)
-                    commandStore.validateRead(original);
             });
         }
     }
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java 
b/accord-core/src/test/java/accord/impl/basic/Journal.java
index 4c5f63e5..9258c0e0 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -20,102 +20,49 @@ package accord.impl.basic;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
+import accord.api.Result;
 import accord.impl.MessageListener;
+import accord.local.Command;
+import accord.local.CommonAttributes;
+import accord.local.Listeners;
 import accord.local.Node;
-import accord.local.SerializerSupport;
-import accord.messages.AbstractEpochRequest;
-import accord.messages.Accept;
-import accord.messages.Apply;
-import accord.messages.ApplyThenWaitUntilApplied;
-import accord.messages.BeginRecovery;
-import accord.messages.Commit;
+import accord.local.SaveStatus;
+import accord.local.Status;
 import accord.messages.LocalRequest;
 import accord.messages.Message;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.Propagate;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
-import accord.messages.TxnRequest;
 import accord.primitives.Ballot;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Route;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.primitives.Writes;
 import accord.utils.Invariants;
 import org.agrona.collections.Long2ObjectHashMap;
 import org.agrona.collections.LongArrayList;
 
-import static accord.messages.MessageType.ACCEPT_INVALIDATE_REQ;
-import static accord.messages.MessageType.ACCEPT_REQ;
-import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
-import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
-import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ;
-import static accord.messages.MessageType.BEGIN_INVALIDATE_REQ;
-import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
-import static accord.messages.MessageType.COMMIT_INVALIDATE_REQ;
-import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
-import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ;
-import static accord.messages.MessageType.INFORM_DURABLE_REQ;
-import static accord.messages.MessageType.INFORM_OF_TXN_REQ;
-import static accord.messages.MessageType.PRE_ACCEPT_REQ;
-import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
-import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
-import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
-import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
-import static accord.messages.MessageType.SET_GLOBALLY_DURABLE_REQ;
-import static accord.messages.MessageType.SET_SHARD_DURABLE_REQ;
-import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
-import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
-import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
+import static accord.utils.Invariants.illegalState;
+
 
 public class Journal implements LocalRequest.Handler, Runnable
 {
-    private static final TxnIdProvider EPOCH = msg -> 
((AbstractEpochRequest<?>) msg).txnId;
-    private static final TxnIdProvider TXN   = msg -> ((TxnRequest<?>) 
msg).txnId;
-    private static final TxnIdProvider LOCAL = msg -> ((LocalRequest<?>) 
msg).primaryTxnId();
-    private static final TxnIdProvider INVL  = msg -> ((Commit.Invalidate) 
msg).primaryTxnId();
-    private static final Map<MessageType, TxnIdProvider> typeToProvider = 
ImmutableMap.<MessageType, TxnIdProvider>builder()
-                                                                               
       .put(PRE_ACCEPT_REQ, TXN)
-                                                                               
       .put(ACCEPT_REQ, TXN)
-                                                                               
       .put(ACCEPT_INVALIDATE_REQ, EPOCH)
-                                                                               
       .put(COMMIT_SLOW_PATH_REQ, TXN)
-                                                                               
       .put(COMMIT_MAXIMAL_REQ, TXN)
-                                                                               
       .put(STABLE_FAST_PATH_REQ, TXN)
-                                                                               
       .put(STABLE_SLOW_PATH_REQ, TXN)
-                                                                               
       .put(STABLE_MAXIMAL_REQ, TXN)
-                                                                               
       .put(COMMIT_INVALIDATE_REQ, INVL)
-                                                                               
       .put(APPLY_MINIMAL_REQ, TXN)
-                                                                               
       .put(APPLY_MAXIMAL_REQ, TXN)
-                                                                               
       .put(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, EPOCH)
-                                                                               
       .put(BEGIN_RECOVER_REQ, TXN)
-                                                                               
       .put(BEGIN_INVALIDATE_REQ, EPOCH)
-                                                                               
       .put(INFORM_OF_TXN_REQ, EPOCH)
-                                                                               
       .put(INFORM_DURABLE_REQ, TXN)
-                                                                               
       .put(SET_SHARD_DURABLE_REQ, EPOCH)
-                                                                               
       .put(SET_GLOBALLY_DURABLE_REQ, EPOCH)
-                                                                               
       .put(PROPAGATE_PRE_ACCEPT_MSG, LOCAL)
-                                                                               
       .put(PROPAGATE_STABLE_MSG, LOCAL)
-                                                                               
       .put(PROPAGATE_APPLY_MSG, LOCAL)
-                                                                               
       .put(PROPAGATE_OTHER_MSG, LOCAL)
-                                                                               
       .build();
-
     private final Queue<RequestContext> unframedRequests = new ArrayDeque<>();
     private final LongArrayList waitForEpochs = new LongArrayList();
     private final Long2ObjectHashMap<ArrayList<RequestContext>> 
delayedRequests = new Long2ObjectHashMap<>();
-    private final Map<TxnId, Map<MessageType, Message>> writes = new 
HashMap<>();
     private final MessageListener messageListener;
+    private final Map<Key, List<Diff>> diffs = new HashMap<>();
     private Node node;
 
     public Journal(MessageListener messageListener)
@@ -158,66 +105,6 @@ public class Journal implements LocalRequest.Handler, 
Runnable
         node.receive(request, from, replyContext);
     }
 
-    private void save(Message request)
-    {
-        MessageType type = request.type();
-        TxnIdProvider provider = typeToProvider.get(type);
-        Invariants.nonNull(provider, "Unknown type %s: %s", type, request);
-        TxnId txnId = provider.txnId(request);
-        writes.computeIfAbsent(txnId, ignore -> new Testing()).put(type, 
request);
-    }
-
-    public SerializerSupport.MessageProvider makeMessageProvider(TxnId txnId)
-    {
-        return new MessageProvider(txnId, writes.getOrDefault(txnId, 
Map.of()));
-    }
-
-    private static class Testing extends LinkedHashMap<MessageType, Message>
-    {
-        public Map<MessageType, List<Message>> history()
-        {
-            LinkedHashMap<MessageType, List<Message>> history = new 
LinkedHashMap<>();
-            for (MessageType k : keySet())
-            {
-                Object current = super.get(k);
-                history.put(k, current instanceof List ? (List<Message>) 
current : Collections.singletonList((Message) current));
-            }
-            return history;
-        }
-
-        @Override
-        public Message get(Object key)
-        {
-            Object current = super.get(key);
-            if (current == null || current instanceof Message)
-                return (Message) current;
-            List<Message> messages = (List<Message>) current;
-            return messages.get(messages.size() - 1);
-        }
-
-        @Override
-        public Message put(MessageType key, Message value)
-        {
-            Object current = super.get(key);
-            if (current == null)
-                return super.put(key, value);
-            else if (current instanceof List)
-            {
-                List<Message> list = (List<Message>) current;
-                list.add(value);
-                return list.get(list.size() - 2);
-            }
-            else
-            {
-                List<Message> messages = new ArrayList<>();
-                messages.add((Message) current);
-                messages.add(value);
-                super.put(key, value);
-                return (Message) current;
-            }
-        }
-    }
-
     @Override
     public void run()
     {
@@ -248,7 +135,7 @@ public class Journal implements LocalRequest.Handler, 
Runnable
             requests.addAll(delayed);
         }
         waitForEpochs.removeIfLong(epoch -> 
!delayedRequests.containsKey(epoch));
-        
+
         // for anything queued, put into the pending epochs or schedule
         RequestContext request;
         while (null != (request = unframedRequests.poll()))
@@ -266,19 +153,163 @@ public class Journal implements LocalRequest.Handler, 
Runnable
                 requests.add(request);
             }
         }
-        
+
         // schedule
         if (requests != null)
         {
-            requests.forEach(r -> save(r.message)); // save in batches to 
simulate journal more...
             requests.forEach(Runnable::run);
         }
     }
 
-    @FunctionalInterface
-    interface TxnIdProvider
+    public Command reconstruct(int commandStoreId, TxnId txnId, Result result)
+    {
+        Key key = new Key(txnId, commandStoreId);
+        List<Diff> diffs = this.diffs.get(key);
+        if (diffs == null || diffs.isEmpty())
+            return null;
+
+        Timestamp executeAt = null;
+        Timestamp executesAtLeast = null;
+        SaveStatus saveStatus = null;
+        Status.Durability durability = null;
+
+        Ballot acceptedOrCommitted = Ballot.ZERO;
+        Ballot promised = Ballot.ZERO;
+
+        Route<?> route = null;
+        PartialTxn partialTxn = null;
+        PartialDeps partialDeps = null;
+        Seekables<?, ?> additionalKeysOrRanges = null;
+
+        Command.WaitingOn waitingOn = null;
+        Writes writes = null;
+        Listeners.Immutable<Command.DurableAndIdempotentListener> listeners = 
null;
+
+        for (Diff diff : diffs)
+        {
+            if (diff.txnId != null)
+                txnId = diff.txnId;
+            if (diff.executeAt != null)
+                executeAt = diff.executeAt;
+            if (diff.executesAtLeast != null)
+                executesAtLeast = diff.executesAtLeast;
+            if (diff.saveStatus != null)
+                saveStatus = diff.saveStatus;
+            if (diff.durability != null)
+                durability = diff.durability;
+
+            if (diff.acceptedOrCommitted != null)
+                acceptedOrCommitted = diff.acceptedOrCommitted;
+            if (diff.promised != null)
+                promised = diff.promised;
+
+            if (diff.route != null)
+                route = diff.route;
+            if (diff.partialTxn != null)
+                partialTxn = diff.partialTxn;
+            if (diff.partialDeps != null)
+                partialDeps = diff.partialDeps;
+            if (diff.additionalKeysOrRanges != null)
+                additionalKeysOrRanges = diff.additionalKeysOrRanges;
+
+            if (diff.waitingOn != null)
+                waitingOn = diff.waitingOn;
+            if (diff.writes != null)
+                writes = diff.writes;
+            if (diff.listeners != null)
+                listeners = diff.listeners;
+        }
+
+        if (!txnId.kind().awaitsOnlyDeps())
+            executesAtLeast = null;
+
+        switch (saveStatus.known.outcome)
+        {
+            case Erased:
+            case WasApply:
+                writes = null;
+                result = null;
+                break;
+        }
+
+        CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId);
+        if (partialTxn != null)
+            attrs.partialTxn(partialTxn);
+        if (durability != null)
+            attrs.durability(durability);
+        if (route != null)
+            attrs.route(route);
+
+        // TODO (desired): we can simplify this logic if, instead of diffing, 
we will infer the diff from the status
+        if (partialDeps != null &&
+            (saveStatus.known.deps != Status.KnownDeps.NoDeps &&
+             saveStatus.known.deps != Status.KnownDeps.DepsErased &&
+             saveStatus.known.deps != Status.KnownDeps.DepsUnknown))
+            attrs.partialDeps(partialDeps);
+        if (additionalKeysOrRanges != null)
+            attrs.additionalKeysOrRanges(additionalKeysOrRanges);
+        if (listeners != null)
+            attrs.setListeners(listeners);
+        Invariants.checkState(saveStatus != null,
+                              "Save status is null after applying %s", diffs);
+
+        switch (saveStatus.status)
+        {
+            case NotDefined:
+                return saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
+                                                              : 
Command.NotDefined.notDefined(attrs, promised);
+            case PreAccepted:
+                return Command.PreAccepted.preAccepted(attrs, executeAt, 
promised);
+            case AcceptedInvalidate:
+            case Accepted:
+            case PreCommitted:
+                return Command.Accepted.accepted(attrs, saveStatus, executeAt, 
promised, acceptedOrCommitted);
+            case Committed:
+            case Stable:
+                return Command.Committed.committed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn);
+            case PreApplied:
+            case Applied:
+                return Command.Executed.executed(attrs, saveStatus, executeAt, 
promised, acceptedOrCommitted, waitingOn, writes, result);
+            case Invalidated:
+            case Truncated:
+                return truncated(attrs, saveStatus, executeAt, 
executesAtLeast, writes, result);
+            default:
+                throw new IllegalStateException("Do not know " + 
saveStatus.status + " " + saveStatus);
+        }
+    }
+
+    private static Command.Truncated truncated(CommonAttributes.Mutable attrs, 
SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes 
writes, Result result)
     {
-        TxnId txnId(Message message);
+        switch (status)
+        {
+            default:
+                throw illegalState("Unhandled SaveStatus: " + status);
+            case TruncatedApplyWithOutcome:
+            case TruncatedApplyWithDeps:
+            case TruncatedApply:
+                return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result, executesAtLeast);
+            case ErasedOrInvalidated:
+                return Command.Truncated.erasedOrInvalidated(attrs.txnId(), 
attrs.durability(), attrs.route());
+            case Erased:
+                return Command.Truncated.erased(attrs.txnId(), 
attrs.durability(), attrs.route());
+            case Invalidated:
+                return Command.Truncated.invalidated(attrs.txnId(), 
attrs.durableListeners());
+        }
+    }
+
+    public void onExecute(int commandStoreId, Command before, Command after, 
boolean isPrimary)
+    {
+        if (before == null && after == null)
+            return;
+        Diff diff = diff(before, after);
+        if (!isPrimary)
+            diff = diff.asNonprimary();
+
+        if (diff != null)
+        {
+            Key key = new Key(after.txnId(), commandStoreId);
+            diffs.computeIfAbsent(key, (k_) -> new ArrayList<>()).add(diff);
+        }
     }
 
     private static class RequestContext implements Runnable
@@ -301,137 +332,202 @@ public class Journal implements LocalRequest.Handler, 
Runnable
         }
     }
 
-    public static class MessageProvider implements 
SerializerSupport.MessageProvider
+    private static class Diff
     {
         public final TxnId txnId;
-        private final Map<MessageType, Message> writes;
 
-        public MessageProvider(TxnId txnId, Map<MessageType, Message> writes)
-        {
-            this.txnId = txnId;
-            this.writes = writes;
-        }
+        public final Timestamp executeAt;
+        public final Timestamp executesAtLeast;
+        public final SaveStatus saveStatus;
+        public final Status.Durability durability;
 
-        @Override
-        public TxnId txnId()
-        {
-            return txnId;
-        }
+        public final Ballot acceptedOrCommitted;
+        public final Ballot promised;
 
-        @Override
-        public Set<MessageType> test(Set<MessageType> messages)
-        {
-            return Sets.intersection(writes.keySet(), messages);
-        }
+        public final Route<?> route;
+        public final PartialTxn partialTxn;
+        public final PartialDeps partialDeps;
 
-        @Override
-        public Set<MessageType> all()
-        {
-            return writes.keySet();
-        }
+        public final Writes writes;
+        public final Command.WaitingOn waitingOn;
+        public final Seekables<?, ?> additionalKeysOrRanges;
+        public final Listeners.Immutable<Command.DurableAndIdempotentListener> 
listeners;
 
-        public Map<MessageType, Message> allMessages()
-        {
-            var all = all();
-            Map<MessageType, Message> map = 
Maps.newHashMapWithExpectedSize(all.size());
-            for (MessageType messageType : all)
-                map.put(messageType, get(messageType));
-            return map;
-        }
+        public Diff(TxnId txnId,
+                    Timestamp executeAt,
+                    Timestamp executesAtLeast,
+                    SaveStatus saveStatus,
+                    Status.Durability durability,
 
-        public  <T extends Message> T get(MessageType type)
-        {
-            return (T) writes.get(type);
-        }
+                    Ballot acceptedOrCommitted,
+                    Ballot promised,
 
-        @Override
-        public PreAccept preAccept()
-        {
-            return get(PRE_ACCEPT_REQ);
-        }
+                    Route<?> route,
+                    PartialTxn partialTxn,
+                    PartialDeps partialDeps,
+                    Command.WaitingOn waitingOn,
 
-        @Override
-        public BeginRecovery beginRecover()
+                    Writes writes,
+                    Seekables<?, ?> additionalKeysOrRanges,
+                    Listeners.Immutable<Command.DurableAndIdempotentListener> 
listeners)
         {
-            return get(BEGIN_RECOVER_REQ);
-        }
+            this.txnId = txnId;
+            this.executeAt = executeAt;
+            this.executesAtLeast = executesAtLeast;
+            this.saveStatus = saveStatus;
+            this.durability = durability;
 
-        @Override
-        public Propagate propagatePreAccept()
-        {
-            return get(PROPAGATE_PRE_ACCEPT_MSG);
-        }
+            this.acceptedOrCommitted = acceptedOrCommitted;
+            this.promised = promised;
 
-        @Override
-        public Accept accept(Ballot ballot)
-        {
-            return get(ACCEPT_REQ);
-        }
+            this.route = route;
+            this.partialTxn = partialTxn;
+            this.partialDeps = partialDeps;
 
-        @Override
-        public Commit commitSlowPath()
-        {
-            return get(COMMIT_SLOW_PATH_REQ);
+            this.writes = writes;
+            this.waitingOn = waitingOn;
+            this.additionalKeysOrRanges = additionalKeysOrRanges;
+            this.listeners = listeners;
         }
 
-        @Override
-        public Commit commitMaximal()
+        // We allow only save status, waitingOn, and listeners to be updated 
by non-primary transactions
+        public Diff asNonprimary()
         {
-            return get(COMMIT_MAXIMAL_REQ);
+            return new Diff(null, null, null, saveStatus, null, null, null, 
null, null, null, waitingOn, null, null, listeners);
         }
 
-        @Override
-        public Commit stableFastPath()
+        public boolean allNulls()
         {
-            return get(STABLE_FAST_PATH_REQ);
+            if (txnId != null) return false;
+            if (executeAt != null) return false;
+            if (executesAtLeast != null) return false;
+            if (saveStatus != null) return false;
+            if (durability != null) return false;
+            if (acceptedOrCommitted != null) return false;
+            if (promised != null) return false;
+            if (route != null) return false;
+            if (partialTxn != null) return false;
+            if (partialDeps != null) return false;
+            if (writes != null) return false;
+            if (waitingOn != null) return false;
+            if (additionalKeysOrRanges != null) return false;
+            if (listeners != null) return false;
+            return true;
         }
 
         @Override
-        public Commit stableSlowPath()
-        {
-            return get(STABLE_SLOW_PATH_REQ);
+        public String toString()
+        {
+            StringBuilder builder = new StringBuilder("SavedDiff{");
+            if (txnId != null)
+                builder.append("txnId = ").append(txnId).append(" ");
+            if (executeAt != null)
+                builder.append("executeAt = ").append(executeAt).append(" ");
+            if (executesAtLeast != null)
+                builder.append("executesAtLeast = 
").append(executesAtLeast).append(" ");
+            if (saveStatus != null)
+                builder.append("saveStatus = ").append(saveStatus).append(" ");
+            if (durability != null)
+                builder.append("durability = ").append(durability).append(" ");
+            if (acceptedOrCommitted != null)
+                builder.append("acceptedOrCommitted = 
").append(acceptedOrCommitted).append(" ");
+            if (promised != null)
+                builder.append("promised = ").append(promised).append(" ");
+            if (route != null)
+                builder.append("route = ").append(route).append(" ");
+            if (partialTxn != null)
+                builder.append("partialTxn = ").append(partialTxn).append(" ");
+            if (partialDeps != null)
+                builder.append("partialDeps = ").append(partialDeps).append(" 
");
+            if (writes != null)
+                builder.append("writes = ").append(writes).append(" ");
+            if (waitingOn != null)
+                builder.append("waitingOn = ").append(waitingOn).append(" ");
+            if (additionalKeysOrRanges != null)
+                builder.append("additionalKeysOrRanges = 
").append(additionalKeysOrRanges).append(" ");
+            if (listeners != null)
+                builder.append("listeners = ").append(listeners).append(" ");
+            builder.append("}");
+            return builder.toString();
         }
+    }
 
-        @Override
-        public Commit stableMaximal()
-        {
-            return get(STABLE_MAXIMAL_REQ);
-        }
+    static Diff diff(Command before, Command after)
+    {
+        if (Objects.equals(before, after))
+            return null;
+
+        Diff diff = new Diff(ifNotEqual(before, after, Command::txnId, false),
+                             ifNotEqual(before, after, Command::executeAt, 
true),
+                             ifNotEqual(before, after, 
Command::executesAtLeast, true),
+                             ifNotEqual(before, after, Command::saveStatus, 
false),
+
+                             ifNotEqual(before, after, Command::durability, 
false),
+                             ifNotEqual(before, after, 
Command::acceptedOrCommitted, false),
+                             ifNotEqual(before, after, Command::promised, 
false),
+
+                             ifNotEqual(before, after, Command::route, true),
+                             ifNotEqual(before, after, Command::partialTxn, 
false),
+                             ifNotEqual(before, after, Command::partialDeps, 
false),
+                             ifNotEqual(before, after, Journal::getWaitingOn, 
true),
+                             ifNotEqual(before, after, Command::writes, false),
+                             ifNotEqual(before, after, 
Command::additionalKeysOrRanges, false),
+                             ifNotEqual(before, after, 
Command::durableListeners, false));
+        if (diff.allNulls())
+            return null;
+
+        return diff;
+    }
 
-        @Override
-        public Propagate propagateStable()
-        {
-            return get(PROPAGATE_STABLE_MSG);
-        }
+    static Command.WaitingOn getWaitingOn(Command command)
+    {
+        if (command instanceof Command.Committed)
+            return command.asCommitted().waitingOn();
 
-        @Override
-        public Apply applyMinimal()
-        {
-            return get(APPLY_MINIMAL_REQ);
-        }
+        return null;
+    }
 
-        @Override
-        public Apply applyMaximal()
-        {
-            return get(APPLY_MAXIMAL_REQ);
-        }
+    private static <OBJ, VAL> VAL ifNotEqual(OBJ lo, OBJ ro, Function<OBJ, 
VAL> convert, boolean allowClassMismatch)
+    {
+        VAL l = null;
+        VAL r = null;
+        if (lo != null) l = convert.apply(lo);
+        if (ro != null) r = convert.apply(ro);
 
-        @Override
-        public Propagate propagateApply()
+        if (l == r)
+            return null;
+        if (l == null || r == null)
+            return r;
+        assert allowClassMismatch || l.getClass() == r.getClass() : 
String.format("%s != %s", l.getClass(), r.getClass());
+
+        if (l.equals(r))
+            return null;
+
+        return r;
+    }
+
+    public static class Key
+    {
+        final TxnId timestamp;
+        final int commandStoreId;
+
+        public Key(TxnId timestamp, int commandStoreId)
         {
-            return get(PROPAGATE_APPLY_MSG);
+            this.timestamp = timestamp;
+            this.commandStoreId = commandStoreId;
         }
 
-        @Override
-        public Propagate propagateOther()
+        public boolean equals(Object o)
         {
-            return get(PROPAGATE_OTHER_MSG);
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Key key = (Key) o;
+            return commandStoreId == key.commandStoreId && 
Objects.equals(timestamp, key.timestamp);
         }
 
-        @Override
-        public ApplyThenWaitUntilApplied applyThenWaitUntilApplied()
+        public int hashCode()
         {
-            return get(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ);
+            return Objects.hash(timestamp, commandStoreId);
         }
     }
-}
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java 
b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
index ab38eb7f..22b06545 100644
--- a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
+++ b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
@@ -112,11 +112,10 @@ class BootstrapLocalTxnTest
                              }
                              return target;
                          }))
-                         // validate cmd
+                         // validateRead is called implicitly _on command 
completion_
                          .flatMap(target -> 
store.execute(contextFor(localSyncId), safe -> {
                              SafeCommand cmd = safe.get(localSyncId, 
route.homeKey());
                              Command current = cmd.current();
-                             validate.accept(current);
                              
Assertions.assertThat(current.saveStatus()).isEqualTo(target == Cleanup.NO ? 
SaveStatus.Applied : target.appliesIfNot);
                          }))
                          .begin(on.agent());
diff --git a/accord-core/src/test/java/accord/local/CheckedCommands.java 
b/accord-core/src/test/java/accord/local/CheckedCommands.java
index 64b89e0e..7e93ba3c 100644
--- a/accord-core/src/test/java/accord/local/CheckedCommands.java
+++ b/accord-core/src/test/java/accord/local/CheckedCommands.java
@@ -33,34 +33,69 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 
+import java.util.function.BiConsumer;
+
 import static accord.utils.Invariants.illegalState;
 
 public class CheckedCommands
 {
     public static void preaccept(SafeCommandStore safeStore, TxnId txnId, 
PartialTxn partialTxn, FullRoute<?> route, @Nullable RoutingKey progressKey)
+    {
+        preaccept(safeStore, txnId, partialTxn, route, progressKey, (l, r) -> 
{});
+    }
+
+    public static void preaccept(SafeCommandStore safeStore, TxnId txnId, 
PartialTxn partialTxn, FullRoute<?> route, @Nullable RoutingKey progressKey, 
BiConsumer<Command, Command> consumer)
     {
         SafeCommand safeCommand = safeStore.get(txnId, txnId, route);
+        Command before = safeCommand.current();
         Commands.AcceptOutcome result = Commands.preaccept(safeStore, 
safeCommand, txnId, txnId.epoch(), partialTxn, route, progressKey);
+        Command after = safeCommand.current();
         if (result != Commands.AcceptOutcome.Success) throw 
illegalState("Command mutation rejected: " + result);
+        consumer.accept(before, after);
     }
 
     public static void accept(SafeCommandStore safeStore, TxnId txnId, Ballot 
ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey 
progressKey, Timestamp executeAt, PartialDeps partialDeps)
     {
+        accept(safeStore, txnId, ballot, route, keys, progressKey, executeAt, 
partialDeps, (l, r) -> {});
+    }
+
+    public static void accept(SafeCommandStore safeStore, TxnId txnId, Ballot 
ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey 
progressKey, Timestamp executeAt, PartialDeps partialDeps, BiConsumer<Command, 
Command> consumer)
+    {
+        SafeCommand safeCommand = safeStore.get(txnId, txnId, route);
+        Command before = safeCommand.current();
         Commands.AcceptOutcome result = Commands.accept(safeStore, txnId, 
ballot, route, keys, progressKey, executeAt, partialDeps);
+        Command after = safeCommand.current();
         if (result != Commands.AcceptOutcome.Success) throw 
illegalState("Command mutation rejected: " + result);
+        consumer.accept(before, after);
     }
 
     public static void commit(SafeCommandStore safeStore, SaveStatus 
saveStatus, Ballot ballot, TxnId txnId, Route<?> route, @Nullable RoutingKey 
progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps 
partialDeps)
+    {
+        commit(safeStore, saveStatus, ballot, txnId, route, progressKey, 
partialTxn, executeAt, partialDeps, (l, r) -> {});
+    }
+
+    public static void commit(SafeCommandStore safeStore, SaveStatus 
saveStatus, Ballot ballot, TxnId txnId, Route<?> route, @Nullable RoutingKey 
progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps 
partialDeps, BiConsumer<Command, Command> consumer)
     {
         SafeCommand safeCommand = safeStore.get(txnId, txnId, route);
+        Command before = safeCommand.current();
         Commands.CommitOutcome result = Commands.commit(safeStore, 
safeCommand, saveStatus, ballot, txnId, route, progressKey, partialTxn, 
executeAt, partialDeps);
+        Command after = safeCommand.current();
         if (result != Commands.CommitOutcome.Success) throw 
illegalState("Command mutation rejected: " + result);
+        consumer.accept(before, after);
     }
 
     public static void apply(SafeCommandStore safeStore, TxnId txnId, Route<?> 
route, @Nullable RoutingKey progressKey, Timestamp executeAt, @Nullable 
PartialDeps partialDeps, @Nullable PartialTxn partialTxn, Writes writes, Result 
result)
+    {
+        apply(safeStore, txnId, route, progressKey, executeAt, partialDeps, 
partialTxn, writes, result, (l, r) -> {});
+    }
+
+    public static void apply(SafeCommandStore safeStore, TxnId txnId, Route<?> 
route, @Nullable RoutingKey progressKey, Timestamp executeAt, @Nullable 
PartialDeps partialDeps, @Nullable PartialTxn partialTxn, Writes writes, Result 
result, BiConsumer<Command, Command> consumer)
     {
         SafeCommand safeCommand = safeStore.get(txnId, txnId, route);
+        Command before = safeCommand.current();
         Commands.ApplyOutcome outcome = Commands.apply(safeStore, safeCommand, 
txnId, route, progressKey, executeAt, partialDeps, partialTxn, writes, result);
+        Command after = safeCommand.current();
         if (outcome != Commands.ApplyOutcome.Success) throw 
illegalState("Command mutation rejected: " + outcome);
+        consumer.accept(before, after);
     }
 }
diff --git a/accord-core/src/test/java/accord/utils/ReducingRangeMapTest.java 
b/accord-core/src/test/java/accord/utils/ReducingRangeMapTest.java
index ca5cf4a9..68ae553b 100644
--- a/accord-core/src/test/java/accord/utils/ReducingRangeMapTest.java
+++ b/accord-core/src/test/java/accord/utils/ReducingRangeMapTest.java
@@ -35,8 +35,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import accord.api.RoutingKey;
 import accord.impl.IntKey;
@@ -53,7 +51,6 @@ import static java.lang.Integer.MIN_VALUE;
 // TODO (desired): test start inclusive ranges
 public class ReducingRangeMapTest
 {
-    static final Logger logger = 
LoggerFactory.getLogger(ReducingRangeMapTest.class);
     static final ReducingRangeMap<Timestamp> EMPTY = new ReducingRangeMap<>();
     static final RoutingKey MINIMUM_EXCL = new IntKey.Routing(MIN_VALUE);
     static final RoutingKey MAXIMUM_EXCL = new IntKey.Routing(MAX_VALUE);
@@ -214,7 +211,6 @@ public class ReducingRangeMapTest
         try
         {
             Random random = new Random(seed);
-            logger.info(id);
             List<RandomWithCanonical> merge = new ArrayList<>();
             while (numberOfMerges-- > 0)
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to