This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 79fc1ebf CEP-15: Minimize transaction state kept in system tables (#62)
79fc1ebf is described below

commit 79fc1ebf7db6aa5e616dbef1bc61b616fea3c2c6
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Mon Sep 25 16:07:44 2023 +0100

    CEP-15: Minimize transaction state kept in system tables (#62)
    
    * CEP-15: Minimize transaction state kept in system tables
    
    patch by Aleksey Yeschenko; reviewed by Ariel Weisberg for CASSANDRA-18573
---
 .../src/main/java/accord/api/ProgressLog.java      |   3 +-
 accord-core/src/main/java/accord/api/Read.java     |   1 +
 accord-core/src/main/java/accord/api/Update.java   |   1 +
 .../src/main/java/accord/coordinate/FetchData.java | 255 +------------
 .../java/accord/coordinate/RecoverWithRoute.java   |   8 +-
 .../src/main/java/accord/local/Command.java        | 124 ++++++-
 accord-core/src/main/java/accord/local/Node.java   |  11 +-
 .../main/java/accord/local/SerializerSupport.java  | 413 +++++++++++++++++++++
 accord-core/src/main/java/accord/local/Status.java |   9 +-
 .../src/main/java/accord/messages/Apply.java       |  37 +-
 .../src/main/java/accord/messages/Commit.java      |  23 +-
 .../main/java/accord/messages/InformDurable.java   |  11 -
 .../{api/Read.java => messages/LocalMessage.java}  |  24 +-
 .../src/main/java/accord/messages/MessageType.java | 105 ++++--
 .../src/main/java/accord/messages/Propagate.java   | 371 ++++++++++++++++++
 .../src/main/java/accord/primitives/Deps.java      |   2 +-
 .../main/java/accord/primitives/PartialDeps.java   |   9 +-
 .../main/java/accord/primitives/PartialTxn.java    |  18 +-
 .../src/main/java/accord/primitives/Writes.java    |   7 +-
 accord-core/src/test/java/accord/Utils.java        |   2 +
 .../src/test/java/accord/impl/basic/Cluster.java   |   3 +-
 .../test/java/accord/impl/mock/MockCluster.java    |   2 +
 .../java/accord/local/ImmutableCommandTest.java    |   2 +-
 .../src/main/java/accord/maelstrom/Cluster.java    |   3 +-
 .../src/main/java/accord/maelstrom/Main.java       |   3 +-
 25 files changed, 1079 insertions(+), 368 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ProgressLog.java 
b/accord-core/src/main/java/accord/api/ProgressLog.java
index 4ede480f..47244c17 100644
--- a/accord-core/src/main/java/accord/api/ProgressLog.java
+++ b/accord-core/src/main/java/accord/api/ProgressLog.java
@@ -142,8 +142,7 @@ public interface ProgressLog
     void executed(Command command, ProgressShard shard);
 
     /**
-     * The transaction's outcome has been durably recorded (but not 
necessarily applied) at a quorum of all shards,
-     * including at least those node's ids that are provided.
+     * The transaction's outcome has been durably recorded (but not 
necessarily applied) at a quorum of all shards.
      *
      * If this replica has not witnessed the outcome of the transaction, it 
should poll a majority of each shard
      * for its outcome.
diff --git a/accord-core/src/main/java/accord/api/Read.java 
b/accord-core/src/main/java/accord/api/Read.java
index b9eaf2a1..7def168c 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -32,4 +32,5 @@ public interface Read
     AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore 
commandStore, Timestamp executeAt, DataStore store);
     Read slice(Ranges ranges);
     Read merge(Read other);
+    default boolean isEqualOrFuller(Read other) { return true; }
 }
diff --git a/accord-core/src/main/java/accord/api/Update.java 
b/accord-core/src/main/java/accord/api/Update.java
index 8a56efcc..fa8493e5 100644
--- a/accord-core/src/main/java/accord/api/Update.java
+++ b/accord-core/src/main/java/accord/api/Update.java
@@ -36,4 +36,5 @@ public interface Update
     Write apply(Timestamp executeAt, @Nullable Data data);
     Update slice(Ranges ranges);
     Update merge(Update other);
+    default boolean isEqualOrFuller(Update other) { return true; }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java 
b/accord-core/src/main/java/accord/coordinate/FetchData.java
index 774df8ab..0188a0a7 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -15,37 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package accord.coordinate;
 
 import java.util.function.BiConsumer;
 
-import accord.api.RoutingKey;
-import accord.local.Command;
-import accord.local.Commands;
 import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommand;
-import accord.local.SafeCommandStore;
 import accord.local.Status;
 import accord.local.Status.Known;
-import accord.local.Status.Phase;
 import accord.messages.CheckStatus;
 import accord.messages.CheckStatus.CheckStatusOkFull;
-import accord.messages.CheckStatus.WithQuorum;
+import accord.messages.Propagate;
 import accord.primitives.*;
 import accord.utils.Invariants;
-import accord.utils.MapReduceConsume;
 
 import javax.annotation.Nullable;
 
 import static 
accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback;
-import static accord.local.PreLoadContext.contextFor;
-import static accord.local.Status.NotDefined;
-import static accord.local.Status.Phase.Cleanup;
-import static accord.local.Status.PreApplied;
 import static accord.messages.CheckStatus.WithQuorum.NoQuorum;
-import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Route.castToRoute;
 import static accord.primitives.Route.isRoute;
 
@@ -230,244 +216,7 @@ public class FetchData extends CheckShards<Route<?>>
             if (success == ReadCoordinator.Success.Success)
                 Invariants.checkState(isSufficient(merged), "Status %s is not 
sufficient", merged);
 
-            OnDone.propagate(node, txnId, sourceEpoch, success.withQuorum, 
route(), target, (CheckStatusOkFull) merged, callback);
-        }
-    }
-
-    static class OnDone implements MapReduceConsume<SafeCommandStore, Void>, 
EpochSupplier
-    {
-        final Node node;
-        final TxnId txnId;
-        final Route<?> route;
-        final RoutingKey progressKey;
-        final CheckStatusOkFull full;
-        // this is a WHOLE NODE measure, so if commit epoch has more ranges we 
do not count as committed if we can only commit in coordination epoch
-        final Known achieved;
-        final PartialTxn partialTxn;
-        final PartialDeps partialDeps;
-        final long toEpoch;
-        final BiConsumer<Known, Throwable> callback;
-
-        OnDone(Node node, TxnId txnId, Route<?> route, RoutingKey progressKey, 
CheckStatusOkFull full, Known achieved, PartialTxn partialTxn, PartialDeps 
partialDeps, long toEpoch, BiConsumer<Known, Throwable> callback)
-        {
-            this.node = node;
-            this.txnId = txnId;
-            this.route = route;
-            this.progressKey = progressKey;
-            this.full = full;
-            this.achieved = achieved;
-            this.partialTxn = partialTxn;
-            this.partialDeps = partialDeps;
-            this.toEpoch = toEpoch;
-            this.callback = callback;
-        }
-
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        public static void propagate(Node node, TxnId txnId, long sourceEpoch, 
WithQuorum withQuorum, Route route, @Nullable Known target, CheckStatusOkFull 
full, BiConsumer<Known, Throwable> callback)
-        {
-            if (full.saveStatus.status == NotDefined && 
full.invalidIfNotAtLeast == NotDefined)
-            {
-                callback.accept(Known.Nothing, null);
-                return;
-            }
-
-            Invariants.checkState(sourceEpoch == txnId.epoch() || 
(full.executeAt != null && sourceEpoch == full.executeAt.epoch()));
-            Route<?> maxRoute = Route.merge(route, full.route);
-
-            // TODO (required): permit individual shards that are behind to 
catch up by themselves
-            long toEpoch = sourceEpoch;
-            Ranges sliceRanges = 
node.topology().localRangesForEpochs(txnId.epoch(), toEpoch);
-            if (!maxRoute.covers(sliceRanges))
-            {
-                callback.accept(Known.Nothing, null);
-                return;
-            }
-
-            RoutingKey progressKey = node.trySelectProgressKey(txnId, 
maxRoute);
-
-            Ranges covering = maxRoute.sliceCovering(sliceRanges, Minimal);
-            Participants<?> participatingKeys = 
maxRoute.participants().slice(covering, Minimal);
-            Known achieved = full.sufficientFor(participatingKeys, withQuorum);
-            if (achieved.executeAt.hasDecidedExecuteAt() && 
full.executeAt.epoch() > toEpoch)
-            {
-                Ranges acceptRanges;
-                if (!node.topology().hasEpoch(full.executeAt.epoch()) ||
-                    (!maxRoute.covers(acceptRanges = 
node.topology().localRangesForEpochs(txnId.epoch(), full.executeAt.epoch()))))
-                {
-                    // we don't know what the execution epoch requires, so we 
cannot be sure we can replicate it locally
-                    // we *could* wait until we have the local epoch before 
running this
-                    Status.Outcome outcome = 
achieved.outcome.propagatesBetweenShards() ? achieved.outcome : 
Status.Outcome.Unknown;
-                    achieved = new Known(achieved.definition, 
achieved.executeAt, Status.KnownDeps.DepsUnknown, outcome);
-                }
-                else
-                {
-                    // TODO (expected): this should only be the two precise 
epochs, not the full range of epochs
-                    sliceRanges = acceptRanges;
-                    covering = maxRoute.sliceCovering(sliceRanges, Minimal);
-                    participatingKeys = 
maxRoute.participants().slice(covering, Minimal);
-                    Known knownForExecution = 
full.sufficientFor(participatingKeys, withQuorum);
-                    if ((target != null && 
target.isSatisfiedBy(knownForExecution)) || 
knownForExecution.isSatisfiedBy(achieved))
-                    {
-                        achieved = knownForExecution;
-                        toEpoch = full.executeAt.epoch();
-                    }
-                    else
-                    {
-                        Invariants.checkState(sourceEpoch == txnId.epoch(), 
"%d != %d", sourceEpoch, txnId.epoch());
-                        achieved = new Known(achieved.definition, 
achieved.executeAt, knownForExecution.deps, knownForExecution.outcome);
-                    }
-                }
-            }
-
-            PartialTxn partialTxn = null;
-            if (achieved.definition.isKnown())
-                partialTxn = full.partialTxn.slice(sliceRanges, 
true).reconstitutePartial(covering);
-
-            PartialDeps partialDeps = null;
-            if (achieved.deps.hasDecidedDeps())
-                partialDeps = 
full.committedDeps.slice(sliceRanges).reconstitutePartial(covering);
-
-            new OnDone(node, txnId, maxRoute, progressKey, full, achieved, 
partialTxn, partialDeps, toEpoch, callback).start();
-        }
-
-        void start()
-        {
-            Seekables<?, ?> keys = Keys.EMPTY;
-            if (achieved.definition.isKnown())
-                keys = partialTxn.keys();
-            else if (achieved.deps.hasProposedOrDecidedDeps())
-                keys = partialDeps.keyDeps.keys();
-
-            PreLoadContext loadContext = contextFor(txnId, keys);
-            node.mapReduceConsumeLocal(loadContext, route, txnId.epoch(), 
toEpoch, this);
-        }
-
-        @Override
-        public Void apply(SafeCommandStore safeStore)
-        {
-            SafeCommand safeCommand = safeStore.get(txnId, this, route);
-            Command command = safeCommand.current();
-            if (command.saveStatus().phase.compareTo(Phase.Persist) >= 0)
-                return null;
-
-            Status propagate = achieved.propagate();
-            if (command.hasBeen(propagate))
-            {
-                if (full.maxSaveStatus.phase == Cleanup && 
full.durability.isDurableOrInvalidated() && Infer.safeToCleanup(safeStore, 
command, route, full.executeAt))
-                    Commands.setTruncatedApply(safeStore, safeCommand);
-                return null;
-            }
-
-            switch (propagate)
-            {
-                default: throw new IllegalStateException("Unexpected status: " 
+ propagate);
-                case Accepted:
-                case AcceptedInvalidate:
-                    // we never "propagate" accepted statuses as these are 
essentially votes,
-                    // and contribute nothing to our local state machine
-                    throw new IllegalStateException("Invalid states to 
propagate: " + achieved.propagate());
-
-                case Truncated:
-                    // if our peers have truncated this command, then either:
-                    // 1) we have already applied it locally; 2) the command 
doesn't apply locally; 3) we are stale; or 4) the command is invalidated
-                    if (command.hasBeen(PreApplied) || 
command.saveStatus().isUninitialised())
-                        break;
-
-                    if (Infer.safeToCleanup(safeStore, command, route, 
full.executeAt))
-                    {
-                        Commands.setErased(safeStore, safeCommand);
-                        break;
-                    }
-
-                    // TODO (required): check if we are stale
-                    // otherwise we are either stale, or the command didn't 
reach consensus
-
-                case Invalidated:
-                    Commands.commitInvalidate(safeStore, safeCommand, route);
-                    break;
-
-                case Applied:
-                case PreApplied:
-                    Invariants.checkState(full.executeAt != null);
-                    if (toEpoch >= full.executeAt.epoch())
-                    {
-                        confirm(Commands.apply(safeStore, safeCommand, txnId, 
route, progressKey, full.executeAt, partialDeps, partialTxn, full.writes, 
full.result));
-                        break;
-                    }
-
-                case Committed:
-                case ReadyToExecute:
-                    confirm(Commands.commit(safeStore, safeCommand, txnId, 
route, progressKey, partialTxn, full.executeAt, partialDeps));
-                    break;
-
-                case PreCommitted:
-                    Commands.precommit(safeStore, safeCommand, txnId, 
full.executeAt, route);
-                    if (!achieved.definition.isKnown())
-                        break;
-
-                case PreAccepted:
-                    // only preaccept if we coordinate the transaction
-                    if 
(safeStore.ranges().coordinates(txnId).intersects(route) && 
Route.isFullRoute(route))
-                        Commands.preaccept(safeStore, safeCommand, txnId, 
txnId.epoch(), partialTxn, Route.castToFullRoute(route), progressKey);
-                    break;
-
-                case NotDefined:
-                    break;
-            }
-
-            RoutingKey homeKey = full.homeKey;
-            if (!full.durability.isDurable() || homeKey == null)
-                return null;
-
-            if (!safeStore.ranges().coordinates(txnId).contains(homeKey))
-                return null;
-
-            Timestamp executeAt = 
full.saveStatus.known.executeAt.hasDecidedExecuteAt() ? full.executeAt : null;
-            Commands.setDurability(safeStore, safeCommand, full.durability, 
route, executeAt);
-            return null;
-        }
-
-        @Override
-        public Void reduce(Void o1, Void o2)
-        {
-            return null;
-        }
-
-        @Override
-        public void accept(Void result, Throwable failure)
-        {
-            callback.accept(failure  == null ? achieved : null, failure);
-        }
-
-        @Override
-        public long epoch()
-        {
-            return toEpoch;
+            Propagate.propagate(node, txnId, sourceEpoch, success.withQuorum, 
route(), target, (CheckStatusOkFull) merged, callback);
         }
     }
-
-    private static void confirm(Commands.CommitOutcome outcome)
-    {
-        switch (outcome)
-        {
-            default: throw new IllegalStateException("Unknown outcome: " + 
outcome);
-            case Redundant:
-            case Success:
-                return;
-            case Insufficient: throw new IllegalStateException("Should have 
enough information");
-        }
-    }
-
-    private static void confirm(Commands.ApplyOutcome outcome)
-    {
-        switch (outcome)
-        {
-            default: throw new IllegalStateException("Unknown outcome: " + 
outcome);
-            case Redundant:
-            case Success:
-                return;
-            case Insufficient: throw new IllegalStateException("Should have 
enough information");
-        }
-    }
-
 }
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java 
b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 6f4559e5..fdcff134 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -20,7 +20,6 @@ package accord.coordinate;
 
 import java.util.function.BiConsumer;
 
-import accord.coordinate.FetchData.OnDone;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.Status;
@@ -29,6 +28,7 @@ import accord.messages.CheckStatus;
 import accord.messages.CheckStatus.CheckStatusOk;
 import accord.messages.CheckStatus.CheckStatusOkFull;
 import accord.messages.CheckStatus.IncludeInfo;
+import accord.messages.Propagate;
 import accord.primitives.Ballot;
 import accord.primitives.Deps;
 import accord.primitives.FullRoute;
@@ -177,7 +177,7 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
                         }
                     }
 
-                    OnDone.propagate(node, txnId, sourceEpoch, 
success.withQuorum, route, null, full, (s, f) -> callback.accept(f == null ? 
full.toProgressToken() : null, f));
+                    Propagate.propagate(node, txnId, sourceEpoch, 
success.withQuorum, route, null, full, (s, f) -> callback.accept(f == null ? 
full.toProgressToken() : null, f));
                     break;
                 }
 
@@ -201,11 +201,11 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
                 if (witnessedByInvalidation != null && 
witnessedByInvalidation.hasBeen(Status.PreCommitted))
                     throw new IllegalStateException("We previously 
invalidated, finding a status that should be recoverable");
 
-                OnDone.propagate(node, txnId, sourceEpoch, success.withQuorum, 
route, null, full, (s, f) -> callback.accept(f == null ? INVALIDATED : null, 
f));
+                Propagate.propagate(node, txnId, sourceEpoch, 
success.withQuorum, route, null, full, (s, f) -> callback.accept(f == null ? 
INVALIDATED : null, f));
                 break;
 
             case Erased:
-                OnDone.propagate(node, txnId, sourceEpoch, success.withQuorum, 
route, null, full, (s, f) -> callback.accept(f == null ? TRUNCATED : null, f));
+                Propagate.propagate(node, txnId, sourceEpoch, 
success.withQuorum, route, null, full, (s, f) -> callback.accept(f == null ? 
TRUNCATED : null, f));
                 break;
         }
     }
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index 194fe2da..18d0d605 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -212,6 +212,12 @@ public abstract class Command implements CommonAttributes
         }
     }
 
+    /**
+     * @return true if this command is equivalent to {@code other}; it could 
also be fuller - have some of its
+     *      registers be not sliced, whereas {@code other} may have them sliced
+     */
+    public abstract boolean isEqualOrFuller(Command other);
+
     private abstract static class AbstractCommand extends Command
     {
         private final TxnId txnId;
@@ -247,12 +253,25 @@ public abstract class Command implements CommonAttributes
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             Command command = (Command) o;
-            return txnId.equals(command.txnId())
-                    && status == command.saveStatus()
-                    && durability == command.durability()
-                    && Objects.equals(route, command.route())
-                    && Objects.equals(promised, command.promised())
-                    && listeners.equals(command.durableListeners());
+            return txnId().equals(command.txnId())
+                    && saveStatus() == command.saveStatus()
+                    && durability() == command.durability()
+                    && Objects.equals(route(), command.route())
+                    && Objects.equals(promised(), command.promised())
+                    && Objects.equals(durableListeners(), 
command.durableListeners());
+        }
+
+        @Override
+        public boolean isEqualOrFuller(Command command)
+        {
+            if (this == command) return true;
+            if (command == null || getClass() != command.getClass()) return 
false;
+            return txnId().equals(command.txnId())
+                && saveStatus() == command.saveStatus()
+                && durability() == command.durability()
+                && Objects.equals(route(), command.route())
+                && Objects.equals(promised(), command.promised())
+                && durableListeners().containsAll(command.durableListeners());
         }
 
         @Override
@@ -288,9 +307,7 @@ public abstract class Command implements CommonAttributes
         @Override
         public Listeners.Immutable durableListeners()
         {
-            if (listeners == null)
-                return EMPTY;
-            return listeners;
+            return listeners == null ? EMPTY : listeners;
         }
 
         @Override
@@ -600,6 +617,7 @@ public abstract class Command implements CommonAttributes
         @Nullable final Timestamp executeAt;
         @Nullable final Writes writes;
         @Nullable final Result result;
+
         public Truncated(CommonAttributes commonAttributes, SaveStatus 
saveStatus, @Nullable Timestamp executeAt, @Nullable Writes writes, @Nullable 
Result result)
         {
             super(commonAttributes, saveStatus, Ballot.MAX);
@@ -616,9 +634,38 @@ public abstract class Command implements CommonAttributes
             this.result = result;
         }
 
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            if (!super.equals(o)) return false;
+            Truncated that = (Truncated) o;
+            return Objects.equals(executeAt, that.executeAt)
+                && Objects.equals(writes, that.writes)
+                && Objects.equals(result, that.result);
+        }
+
+        @Override
+        public boolean isEqualOrFuller(Command c)
+        {
+            if (this == c) return true;
+            if (c == null || getClass() != c.getClass()) return false;
+            if (!super.isEqualOrFuller(c)) return false;
+            Truncated that = (Truncated) c;
+            return Objects.equals(executeAt(), that.executeAt())
+                && Objects.equals(writes(), that.writes())
+                && Objects.equals(result(), that.result());
+        }
+
         public static Truncated erased(Command command)
         {
-            return new Truncated(command.txnId(), SaveStatus.Erased, 
command.durability(), command.route(), null, EMPTY, null, null);
+            return erased(command.txnId(), command.durability(), 
command.route());
+        }
+
+        public static Truncated erased(TxnId txnId, Status.Durability 
durability, Route<?> route)
+        {
+            return new Truncated(txnId, SaveStatus.Erased, durability, route, 
null, EMPTY, null, null);
         }
 
         public static Truncated truncatedApply(Command command)
@@ -633,7 +680,6 @@ public abstract class Command implements CommonAttributes
         }
 
         public static Truncated truncatedApply(CommonAttributes common, 
SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result)
-
         {
             Invariants.checkArgument(executeAt != null);
             Invariants.checkArgument(saveStatus == SaveStatus.TruncatedApply 
|| saveStatus == SaveStatus.TruncatedApplyWithDeps || saveStatus == 
SaveStatus.TruncatedApplyWithOutcome);
@@ -729,10 +775,24 @@ public abstract class Command implements CommonAttributes
                     && Objects.equals(partialDeps, that.partialDeps);
         }
 
+        @Override
+        public boolean isEqualOrFuller(Command c)
+        {
+            if (this == c) return true;
+            if (c == null || getClass() != c.getClass()) return false;
+            if (!super.isEqualOrFuller(c)) return false;
+            PreAccepted that = (PreAccepted) c;
+            if (!executeAt().equals(that.executeAt()) || 
!partialTxn().isEqualOrFuller(that.partialTxn()))
+                return false;
+            return (partialDeps() == null && that.partialDeps() == null)
+                || (partialDeps() != null && that.partialDeps() != null && 
partialDeps().isEqualOrFuller(that.partialDeps()));
+        }
+
         public static PreAccepted preAccepted(CommonAttributes common, 
Timestamp executeAt, Ballot promised)
         {
             return new PreAccepted(common, SaveStatus.PreAccepted, executeAt, 
promised);
         }
+
         public static PreAccepted preAccepted(PreAccepted command, 
CommonAttributes common, Ballot promised)
         {
             checkPromised(command, promised);
@@ -792,16 +852,28 @@ public abstract class Command implements CommonAttributes
             return Objects.equals(accepted, that.accepted);
         }
 
+        @Override
+        public boolean isEqualOrFuller(Command c)
+        {
+            if (this == c) return true;
+            if (c == null || getClass() != c.getClass()) return false;
+            if (!super.isEqualOrFuller(c)) return false;
+            Accepted that = (Accepted) c;
+            return Objects.equals(accepted(), that.accepted());
+        }
+
         static Accepted accepted(CommonAttributes common, SaveStatus status, 
Timestamp executeAt, Ballot promised, Ballot accepted)
         {
             return new Accepted(common, status, executeAt, promised, accepted);
         }
+
         static Accepted accepted(Accepted command, CommonAttributes common, 
SaveStatus status, Ballot promised)
         {
             checkPromised(command, promised);
             checkSameClass(command, Accepted.class, "Cannot update");
             return new Accepted(common, status, command.executeAt(), promised, 
command.accepted());
         }
+
         static Accepted accepted(Accepted command, CommonAttributes common, 
Ballot promised)
         {
             return accepted(command, common, command.saveStatus(), promised);
@@ -842,6 +914,17 @@ public abstract class Command implements CommonAttributes
             return Objects.equals(waitingOn, committed.waitingOn);
         }
 
+        @Override
+        public boolean isEqualOrFuller(Command c)
+        {
+            if (this == c) return true;
+            if (c == null || getClass() != c.getClass()) return false;
+            if (!super.isEqualOrFuller(c)) return false;
+            Committed committed = (Committed) c;
+            return (waitingOn() == null && committed.waitingOn() == null)
+                || (waitingOn() != null && committed.waitingOn() != null && 
waitingOn().isEqualOrFuller(committed.waitingOn()));
+        }
+
         private static Committed committed(Committed command, CommonAttributes 
common, Ballot promised, SaveStatus status, WaitingOn waitingOn)
         {
             checkPromised(command, promised);
@@ -922,7 +1005,18 @@ public abstract class Command implements CommonAttributes
             if (!super.equals(o)) return false;
             Executed executed = (Executed) o;
             return Objects.equals(writes, executed.writes)
-                    && Objects.equals(result, executed.result);
+                && Objects.equals(result, executed.result);
+        }
+
+        @Override
+        public boolean isEqualOrFuller(Command c)
+        {
+            if (this == c) return true;
+            if (c == null || getClass() != c.getClass()) return false;
+            if (!super.isEqualOrFuller(c)) return false;
+            Executed executed = (Executed) c;
+            return Objects.equals(writes(), executed.writes())
+                && Objects.equals(result(), executed.result());
         }
 
         public static Executed executed(Executed command, CommonAttributes 
common, SaveStatus status, Ballot promised, WaitingOn waitingOn)
@@ -1107,6 +1201,12 @@ public abstract class Command implements CommonAttributes
                 && 
this.appliedOrInvalidated.equals(other.appliedOrInvalidated);
         }
 
+        public boolean isEqualOrFuller(WaitingOn other)
+        {
+            return 
computeWaitingOnCommit().containsAll(other.computeWaitingOnCommit())
+                && 
computeWaitingOnApply().containsAll(other.computeWaitingOnApply());
+        }
+
         public static class Update
         {
             final Deps deps;
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 276f631e..8e4237f1 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -54,6 +54,7 @@ import accord.coordinate.MaybeRecover;
 import accord.coordinate.Outcome;
 import accord.coordinate.RecoverWithRoute;
 import accord.messages.Callback;
+import accord.messages.LocalMessage;
 import accord.messages.Reply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
@@ -134,6 +135,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
 
     private final Id id;
     private final MessageSink messageSink;
+    private final LocalMessage.Handler localMessageHandler;
     private final ConfigurationService configService;
     private final TopologyManager topology;
     private final CommandStores commandStores;
@@ -150,12 +152,14 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
     // TODO (expected, liveness): monitor the contents of this collection for 
stalled coordination, and excise them
     private final Map<TxnId, AsyncResult<? extends Outcome>> coordinating = 
new ConcurrentHashMap<>();
 
-    public Node(Id id, MessageSink messageSink, ConfigurationService 
configService, LongSupplier nowSupplier, ToLongFunction<TimeUnit> nowTimeUnit,
+    public Node(Id id, MessageSink messageSink, LocalMessage.Handler 
localMessageHandler,
+                ConfigurationService configService, LongSupplier nowSupplier, 
ToLongFunction<TimeUnit> nowTimeUnit,
                 Supplier<DataStore> dataSupplier, ShardDistributor 
shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, 
TopologySorter.Supplier topologySorter,
                 Function<Node, ProgressLog.Factory> progressLogFactory, 
CommandStores.Factory factory)
     {
         this.id = id;
         this.messageSink = messageSink;
+        this.localMessageHandler = localMessageHandler;
         this.configService = configService;
         this.topology = new TopologyManager(topologySorter, id);
         this.nowSupplier = nowSupplier;
@@ -483,6 +487,11 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         messageSink.send(to, send);
     }
 
+    public void localMessage(LocalMessage message)
+    {
+        localMessageHandler.handle(message, this);
+    }
+
     public void reply(Id replyingToNode, ReplyContext replyContext, Reply 
send, Throwable failure)
     {
         if (failure != null)
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java 
b/accord-core/src/main/java/accord/local/SerializerSupport.java
new file mode 100644
index 00000000..a42888bd
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/SerializerSupport.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.local;
+
+import accord.api.Result;
+import accord.api.VisibleForImplementation;
+import accord.local.Command.WaitingOn;
+import accord.local.CommonAttributes.Mutable;
+import accord.messages.*;
+import accord.primitives.Ballot;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Timestamp;
+import accord.primitives.Writes;
+
+import java.util.EnumSet;
+import java.util.Set;
+
+import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
+import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
+import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
+import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
+import static accord.messages.MessageType.COMMIT_MINIMAL_REQ;
+import static accord.messages.MessageType.PRE_ACCEPT_REQ;
+import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
+import static accord.messages.MessageType.PROPAGATE_COMMIT_MSG;
+import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+import static accord.primitives.PartialTxn.merge;
+import static accord.utils.Invariants.checkState;
+
+import static java.util.EnumSet.of;
+
+@VisibleForImplementation
+public class SerializerSupport
+{
+    /**
+     * Reconstructs Command from register values and protocol messages.
+     */
+    public static Command reconstruct(Mutable attrs, SaveStatus status, 
Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider 
waitingOnProvider, MessageProvider messageProvider)
+    {
+        switch (status.status)
+        {
+            case NotDefined:
+                return Command.NotDefined.notDefined(attrs, promised);
+            case PreAccepted:
+                return preAccepted(attrs, executeAt, promised, 
messageProvider);
+            case AcceptedInvalidate:
+            case Accepted:
+            case PreCommitted:
+                return accepted(attrs, status, executeAt, promised, accepted, 
messageProvider);
+            case Committed:
+            case ReadyToExecute:
+                return committed(attrs, status, executeAt, promised, accepted, 
waitingOnProvider, messageProvider);
+            case PreApplied:
+            case Applied:
+                return executed(attrs, status, executeAt, promised, accepted, 
waitingOnProvider, messageProvider);
+            case Truncated:
+            case Invalidated:
+                return truncated(attrs, status, executeAt, messageProvider);
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    private static final EnumSet<MessageType> PRE_ACCEPT_TYPES =
+        of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG);
+
+    private static Command.PreAccepted preAccepted(Mutable attrs, Timestamp 
executeAt, Ballot promised, MessageProvider messageProvider)
+    {
+        Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
+        checkState(!witnessed.isEmpty());
+        attrs.partialTxn(txnFromPreAcceptOrBeginRecover(witnessed, 
messageProvider));
+        return Command.PreAccepted.preAccepted(attrs, executeAt, promised);
+    }
+
+    private static Command.Accepted accepted(Mutable attrs, SaveStatus status, 
Timestamp executeAt, Ballot promised, Ballot accepted, MessageProvider 
messageProvider)
+    {
+        if (status.known.isDefinitionKnown())
+        {
+            Set<MessageType> witnessed = 
messageProvider.test(PRE_ACCEPT_TYPES);
+            checkState(!witnessed.isEmpty());
+            attrs.partialTxn(txnFromPreAcceptOrBeginRecover(witnessed, 
messageProvider));
+        }
+
+        if (status.known.deps.hasProposedDeps())
+        {
+            Accept accept = messageProvider.accept(accepted);
+            attrs.partialDeps(accept.partialDeps);
+        }
+
+        return Command.Accepted.accepted(attrs, status, executeAt, promised, 
accepted);
+    }
+
+    private static final EnumSet<MessageType> PRE_ACCEPT_COMMIT_TYPES =
+        of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG,
+           COMMIT_MINIMAL_REQ, COMMIT_MAXIMAL_REQ, PROPAGATE_COMMIT_MSG);
+
+    private static Command.Committed committed(Mutable attrs, SaveStatus 
status, Timestamp executeAt, Ballot promised, Ballot accepted, 
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
+    {
+        Set<MessageType> witnessed = 
messageProvider.test(PRE_ACCEPT_COMMIT_TYPES);
+
+        PartialTxn txn;
+        PartialDeps deps;
+
+        if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+        {
+            Commit commit = messageProvider.commitMaximal();
+            txn = commit.partialTxn;
+            deps = commit.partialDeps;
+        }
+        else if (witnessed.contains(PROPAGATE_COMMIT_MSG))
+        {
+            Propagate propagate = messageProvider.propagateCommit();
+            txn = propagate.partialTxn;
+            deps = propagate.partialDeps;
+        }
+        else
+        {
+            checkState(witnessed.contains(COMMIT_MINIMAL_REQ));
+            Commit commit = messageProvider.commitMinimal();
+            txn = merge(txnFromPreAcceptOrBeginRecover(witnessed, 
messageProvider), commit.partialTxn);
+            deps = commit.partialDeps;
+        }
+
+        attrs.partialTxn(txn)
+             .partialDeps(deps);
+
+        return Command.Committed.committed(attrs, status, executeAt, promised, 
accepted, waitingOnProvider.provide(deps));
+    }
+
+    private static final EnumSet<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
+        of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ, PROPAGATE_PRE_ACCEPT_MSG,
+           COMMIT_MINIMAL_REQ, COMMIT_MAXIMAL_REQ, PROPAGATE_COMMIT_MSG,
+           APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG);
+
+    private static Command.Executed executed(Mutable attrs, SaveStatus status, 
Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOnProvider 
waitingOnProvider, MessageProvider messageProvider)
+    {
+        Set<MessageType> witnessed = 
messageProvider.test(PRE_ACCEPT_COMMIT_APPLY_TYPES);
+
+        PartialTxn txn;
+        PartialDeps deps;
+        Writes writes;
+        Result result;
+
+        if (witnessed.contains(APPLY_MAXIMAL_REQ))
+        {
+            Apply apply = messageProvider.applyMaximal();
+            txn = apply.txn;
+            deps = apply.deps;
+            writes = apply.writes;
+            result = apply.result;
+        }
+        else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+        {
+            Propagate propagate = messageProvider.propagateApply();
+            txn = propagate.partialTxn;
+            deps = propagate.partialDeps;
+            writes = propagate.writes;
+            result = propagate.result;
+        }
+        else
+        {
+            checkState(witnessed.contains(APPLY_MINIMAL_REQ));
+
+            Apply apply = messageProvider.applyMinimal();
+            writes = apply.writes;
+            result = apply.result;
+
+            /*
+             * NOTE: If Commit has been witnessed, we'll extract deps from 
there;
+             * Apply has an expected TO-DO to stop including deps in such case.
+             */
+            if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+            {
+                Commit commit = messageProvider.commitMaximal();
+                txn = commit.partialTxn;
+                deps = commit.partialDeps;
+            }
+            else if (witnessed.contains(PROPAGATE_COMMIT_MSG))
+            {
+                Propagate propagateCommit = messageProvider.propagateCommit();
+                txn = propagateCommit.partialTxn;
+                deps = propagateCommit.partialDeps;
+            }
+            else if (witnessed.contains(COMMIT_MINIMAL_REQ))
+            {
+                Commit commit = messageProvider.commitMinimal();
+                txn = merge(apply.txn, merge(commit.partialTxn, 
txnFromPreAcceptOrBeginRecover(witnessed, messageProvider)));
+                deps = commit.partialDeps;
+            }
+            else
+            {
+                txn = merge(apply.txn, 
txnFromPreAcceptOrBeginRecover(witnessed, messageProvider));
+                deps = apply.deps;
+            }
+        }
+
+        attrs.partialTxn(txn)
+             .partialDeps(deps);
+
+        return Command.Executed.executed(attrs, status, executeAt, promised, 
accepted, waitingOnProvider.provide(deps), writes, result);
+    }
+
+    private static final EnumSet<MessageType> APPLY_TYPES =
+            of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG);
+
+    private static Command.Truncated truncated(Mutable attrs, SaveStatus 
status, Timestamp executeAt, MessageProvider messageProvider)
+    {
+        Writes writes = null;
+        Result result = null;
+
+        switch (status)
+        {
+            default:
+                throw new IllegalStateException("Unhandled SaveStatus: " + 
status);
+            case TruncatedApplyWithOutcome:
+            case TruncatedApplyWithDeps:
+                Set<MessageType> witnessed = messageProvider.test(APPLY_TYPES);
+                checkState(!witnessed.isEmpty());
+                if (witnessed.contains(APPLY_MINIMAL_REQ))
+                {
+                    Apply apply = messageProvider.applyMinimal();
+                    writes = apply.writes;
+                    result = apply.result;
+                }
+                if (witnessed.contains(APPLY_MAXIMAL_REQ))
+                {
+                    Apply apply = messageProvider.applyMaximal();
+                    writes = apply.writes;
+                    result = apply.result;
+                }
+                else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+                {
+                    Propagate propagate = messageProvider.propagateApply();
+                    writes = propagate.writes;
+                    result = propagate.result;
+                }
+            case TruncatedApply:
+                return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result);
+            case Erased:
+                return Command.Truncated.erased(attrs.txnId(), 
attrs.durability(), attrs.route());
+            case Invalidated:
+                return Command.Truncated.invalidated(attrs.txnId(), 
attrs.durableListeners());
+        }
+    }
+
+    public static class TxnAndDeps
+    {
+        public static TxnAndDeps EMPTY = new TxnAndDeps(null, null);
+
+        public final PartialTxn txn;
+        public final PartialDeps deps;
+
+        TxnAndDeps(PartialTxn txn, PartialDeps deps)
+        {
+            this.txn = txn;
+            this.deps = deps;
+        }
+    }
+
+    public static TxnAndDeps extractTxnAndDeps(SaveStatus status, Ballot 
accepted, MessageProvider messageProvider)
+    {
+        Set<MessageType> witnessed;
+
+        switch (status.status)
+        {
+            case PreAccepted:
+                witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
+                checkState(!witnessed.isEmpty());
+                return new 
TxnAndDeps(txnFromPreAcceptOrBeginRecover(witnessed, messageProvider), null);
+            case AcceptedInvalidate:
+            case Accepted:
+            case PreCommitted:
+                PartialTxn txn = null;
+                PartialDeps deps = null;
+
+                if (status.known.isDefinitionKnown())
+                {
+                    witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
+                    checkState(!witnessed.isEmpty());
+                    txn = txnFromPreAcceptOrBeginRecover(witnessed, 
messageProvider);
+                }
+
+                if (status.known.deps.hasProposedDeps())
+                {
+                    Accept accept = messageProvider.accept(accepted);
+                    deps = accept.partialDeps;
+                }
+
+                return new TxnAndDeps(txn, deps);
+            case Committed:
+            case ReadyToExecute:
+                witnessed = messageProvider.test(PRE_ACCEPT_COMMIT_TYPES);
+                if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+                {
+                    Commit commit = messageProvider.commitMaximal();
+                    return new TxnAndDeps(commit.partialTxn, 
commit.partialDeps);
+                }
+                else if (witnessed.contains(PROPAGATE_COMMIT_MSG))
+                {
+                    Propagate propagate = messageProvider.propagateCommit();
+                    return new TxnAndDeps(propagate.partialTxn, 
propagate.partialDeps);
+                }
+                else
+                {
+                    checkState(witnessed.contains(COMMIT_MINIMAL_REQ));
+                    Commit commit = messageProvider.commitMinimal();
+                    return new 
TxnAndDeps(merge(txnFromPreAcceptOrBeginRecover(witnessed, messageProvider), 
commit.partialTxn), commit.partialDeps);
+                }
+            case PreApplied:
+            case Applied:
+                witnessed = 
messageProvider.test(PRE_ACCEPT_COMMIT_APPLY_TYPES);
+                if (witnessed.contains(APPLY_MAXIMAL_REQ))
+                {
+                    Apply apply = messageProvider.applyMaximal();
+                    return new TxnAndDeps(apply.txn, apply.deps);
+                }
+                else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+                {
+                    Propagate propagate = messageProvider.propagateApply();
+                    return new TxnAndDeps(propagate.partialTxn, 
propagate.partialDeps);
+                }
+                else if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+                {
+                    Commit commit = messageProvider.commitMaximal();
+                    return new TxnAndDeps(commit.partialTxn, 
commit.partialDeps);
+                }
+                else if (witnessed.contains(PROPAGATE_COMMIT_MSG))
+                {
+                    Propagate propagate = messageProvider.propagateCommit();
+                    return new TxnAndDeps(propagate.partialTxn, 
propagate.partialDeps);
+                }
+                else if (witnessed.contains(COMMIT_MINIMAL_REQ))
+                {
+                    checkState(witnessed.contains(APPLY_MINIMAL_REQ));
+                    Apply apply = messageProvider.applyMinimal();
+                    Commit commit = messageProvider.commitMinimal();
+                    return new TxnAndDeps(merge(apply.txn, 
merge(commit.partialTxn, txnFromPreAcceptOrBeginRecover(witnessed, 
messageProvider))), commit.partialDeps);
+                }
+                else
+                {
+                    checkState(witnessed.contains(APPLY_MINIMAL_REQ));
+                    Apply apply = messageProvider.applyMinimal();
+                    return new TxnAndDeps(merge(apply.txn, 
txnFromPreAcceptOrBeginRecover(witnessed, messageProvider)), apply.deps);
+                }
+            case NotDefined:
+            case Truncated:
+            case Invalidated:
+                return TxnAndDeps.EMPTY;
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    private static PartialTxn txnFromPreAcceptOrBeginRecover(Set<MessageType> 
witnessed, MessageProvider messageProvider)
+    {
+        if (witnessed.contains(PRE_ACCEPT_REQ))
+            return messageProvider.preAccept().partialTxn;
+
+        if (witnessed.contains(BEGIN_RECOVER_REQ))
+            return messageProvider.beginRecover().partialTxn;
+
+        if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
+            return messageProvider.propagatePreAccept().partialTxn;
+
+        return null;
+    }
+
+    public interface WaitingOnProvider
+    {
+        WaitingOn provide(PartialDeps deps);
+    }
+
+    public interface MessageProvider
+    {
+        Set<MessageType> test(Set<MessageType> messages);
+
+        PreAccept preAccept();
+
+        BeginRecovery beginRecover();
+
+        Propagate propagatePreAccept();
+
+        Accept accept(Ballot ballot);
+
+        Commit commitMinimal();
+
+        Commit commitMaximal();
+
+        Propagate propagateCommit();
+
+        Apply applyMinimal();
+
+        Apply applyMaximal();
+
+        Propagate propagateApply();
+    }
+}
diff --git a/accord-core/src/main/java/accord/local/Status.java 
b/accord-core/src/main/java/accord/local/Status.java
index a326b2af..bf0f3bef 100644
--- a/accord-core/src/main/java/accord/local/Status.java
+++ b/accord-core/src/main/java/accord/local/Status.java
@@ -55,8 +55,8 @@ public enum Status
      * So, for execution of other transactions we may treat a PreCommitted 
transaction as Committed,
      * using the timestamp to update our dependency set to rule it out as a 
dependency.
      * But we do not have enough information to execute the transaction, and 
when recovery calculates
-     * {@link BeginRecovery#acceptedStartedBeforeWithoutWitnessing}, {@link 
BeginRecovery#hasCommittedExecutesAfterWithoutWitnessing}
-     *
+     * {@link BeginRecovery#acceptedStartedBeforeWithoutWitnessing},
+     * {@link BeginRecovery#hasCommittedExecutesAfterWithoutWitnessing},
      * and {@link BeginRecovery#committedStartedBeforeAndWitnessed} we may not 
have the dependencies
      * to calculate the result. For these operations we treat ourselves as 
whatever Accepted status
      * we may have previously taken, using any proposed dependencies to 
compute the result.
@@ -312,6 +312,11 @@ public enum Status
         NoDeps
         ;
 
+        public boolean hasProposedDeps()
+        {
+            return this == DepsProposed;
+        }
+
         public boolean hasDecidedDeps()
         {
             return this == DepsKnown;
diff --git a/accord-core/src/main/java/accord/messages/Apply.java 
b/accord-core/src/main/java/accord/messages/Apply.java
index 12d5614c..2e1d2011 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -42,19 +42,17 @@ import accord.primitives.Unseekables;
 import accord.primitives.Writes;
 import accord.topology.Topologies;
 
-import static accord.messages.MessageType.APPLY_REQ;
-import static accord.messages.MessageType.APPLY_RSP;
-
 public class Apply extends TxnRequest<ApplyReply>
 {
     public static class SerializationSupport
     {
-        public static Apply create(TxnId txnId, PartialRoute<?> scope, long 
waitForEpoch, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, 
PartialTxn txn, Writes writes, Result result)
+        public static Apply create(TxnId txnId, PartialRoute<?> scope, long 
waitForEpoch, Kind kind, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps 
deps, PartialTxn txn, Writes writes, Result result)
         {
-            return new Apply(txnId, scope, waitForEpoch, keys, executeAt, 
deps, txn, writes, result);
+            return new Apply(kind, txnId, scope, waitForEpoch, keys, 
executeAt, deps, txn, writes, result);
         }
     }
 
+    public final Kind kind;
     public final Timestamp executeAt;
     public final Seekables<?, ?> keys;
     public final PartialDeps deps; // TODO (expected): this should be 
nullable, and only included if we did not send Commit (or if sending Maximal 
apply)
@@ -62,15 +60,18 @@ public class Apply extends TxnRequest<ApplyReply>
     public final Writes writes;
     public final Result result;
 
-    private Apply(Id to, Topologies participates, Topologies executes, TxnId 
txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, boolean 
isMaximal, Writes writes, Result result)
+    public enum Kind { Minimal, Maximal }
+
+    private Apply(Kind kind, Id to, Topologies participates, Topologies 
executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result)
     {
         super(to, participates, route, txnId);
-        Ranges slice = isMaximal || executes == participates ? 
scope.covering() : executes.computeRangesForNode(to);
+        Ranges slice = kind == Kind.Maximal || executes == participates ? 
scope.covering() : executes.computeRangesForNode(to);
         // TODO (desired): it's wasteful to encode the full set of ranges 
owned by the recipient node;
         //     often it will be cheaper to include the FullRoute for Deps 
scope (or come up with some other safety-preserving encoding scheme)
+        this.kind = kind;
         this.deps = deps.slice(slice);
         this.keys = txn.keys().slice(slice);
-        this.txn = isMaximal ? txn.slice(slice, true) : null;
+        this.txn = kind == Kind.Maximal ? txn.slice(slice, true) : null;
         this.executeAt = executeAt;
         this.writes = writes;
         this.result = result;
@@ -102,17 +103,18 @@ public class Apply extends TxnRequest<ApplyReply>
 
     public static Apply applyMinimal(Id to, Topologies sendTo, Topologies 
applyTo, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result)
     {
-        return new Apply(to, sendTo, applyTo, txnId, route, txn, executeAt, 
deps, false, writes, result);
+        return new Apply(Kind.Minimal, to, sendTo, applyTo, txnId, route, txn, 
executeAt, deps, writes, result);
     }
 
     public static Apply applyMaximal(Id to, Topologies participates, 
Topologies executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, 
Deps deps, Writes writes, Result result)
     {
-        return new Apply(to, participates, executes, txnId, route, txn, 
executeAt, deps, true, writes, result);
+        return new Apply(Kind.Maximal, to, participates, executes, txnId, 
route, txn, executeAt, deps, writes, result);
     }
 
-    private Apply(TxnId txnId, PartialRoute<?> route, long waitForEpoch, 
Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, @Nullable 
PartialTxn txn, Writes writes, Result result)
+    private Apply(Kind kind, TxnId txnId, PartialRoute<?> route, long 
waitForEpoch, Seekables<?, ?> keys, Timestamp executeAt, PartialDeps deps, 
@Nullable PartialTxn txn, Writes writes, Result result)
     {
         super(txnId, route, waitForEpoch);
+        this.kind = kind;
         this.executeAt = executeAt;
         this.deps = deps;
         this.keys = keys;
@@ -172,7 +174,12 @@ public class Apply extends TxnRequest<ApplyReply>
     @Override
     public MessageType type()
     {
-        return APPLY_REQ;
+        switch (kind)
+        {
+            case Minimal: return MessageType.APPLY_MINIMAL_REQ;
+            case Maximal: return MessageType.APPLY_MAXIMAL_REQ;
+            default: throw new IllegalStateException();
+        }
     }
 
     public enum ApplyReply implements Reply
@@ -182,7 +189,7 @@ public class Apply extends TxnRequest<ApplyReply>
         @Override
         public MessageType type()
         {
-            return APPLY_RSP;
+            return MessageType.APPLY_RSP;
         }
 
         @Override
@@ -201,8 +208,8 @@ public class Apply extends TxnRequest<ApplyReply>
     @Override
     public String toString()
     {
-        return "Apply{" +
-               "txnId:" + txnId +
+        return "Apply{kind:" + kind +
+               ", txnId:" + txnId +
                ", deps:" + deps +
                ", executeAt:" + executeAt +
                ", writes:" + writes +
diff --git a/accord-core/src/main/java/accord/messages/Commit.java 
b/accord-core/src/main/java/accord/messages/Commit.java
index b755827b..9d692e6a 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -15,10 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package accord.messages;
 
 import java.util.Set;
+
 import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
@@ -59,12 +59,13 @@ public class Commit extends TxnRequest<ReadNack>
 
     public static class SerializerSupport
     {
-        public static Commit create(TxnId txnId, PartialRoute<?> scope, long 
waitForEpoch, Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps 
partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable ReadTxnData read)
+        public static Commit create(TxnId txnId, PartialRoute<?> scope, long 
waitForEpoch, Kind kind, Timestamp executeAt, @Nullable PartialTxn partialTxn, 
PartialDeps partialDeps, @Nullable FullRoute<?> fullRoute, @Nullable 
ReadTxnData read)
         {
-            return new Commit(txnId, scope, waitForEpoch, executeAt, 
partialTxn, partialDeps, fullRoute, read);
+            return new Commit(kind, txnId, scope, waitForEpoch, executeAt, 
partialTxn, partialDeps, fullRoute, read);
         }
     }
 
+    public final Kind kind;
     public final Timestamp executeAt;
     public final @Nullable PartialTxn partialTxn;
     public final PartialDeps partialDeps;
@@ -99,6 +100,7 @@ public class Commit extends TxnRequest<ReadNack>
                 partialTxn = txn.slice(extraRanges, 
coordinateRanges.contains(route.homeKey()));
         }
 
+        this.kind = kind;
         this.executeAt = executeAt;
         this.partialTxn = partialTxn;
         this.partialDeps = deps.slice(scope.covering());
@@ -106,9 +108,10 @@ public class Commit extends TxnRequest<ReadNack>
         this.read = read ? new ReadTxnData(to, topologies, txnId, readScope, 
executeAt) : null;
     }
 
-    Commit(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, Timestamp 
executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, @Nullable 
FullRoute<?> fullRoute, @Nullable ReadTxnData read)
+    Commit(Kind kind, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, 
Timestamp executeAt, @Nullable PartialTxn partialTxn, PartialDeps partialDeps, 
@Nullable FullRoute<?> fullRoute, @Nullable ReadTxnData read)
     {
         super(txnId, scope, waitForEpoch);
+        this.kind = kind;
         this.executeAt = executeAt;
         this.partialTxn = partialTxn;
         this.partialDeps = partialDeps;
@@ -168,7 +171,7 @@ public class Commit extends TxnRequest<ReadNack>
         Route<?> route = this.route != null ? this.route : scope;
         SafeCommand safeCommand = safeStore.get(txnId, executeAt, route);
 
-        switch (Commands.commit(safeStore, safeCommand, txnId, route != null ? 
route : scope, progressKey, partialTxn, executeAt, partialDeps))
+        switch (Commands.commit(safeStore, safeCommand, txnId, route, 
progressKey, partialTxn, executeAt, partialDeps))
         {
             default:
             case Success:
@@ -207,13 +210,19 @@ public class Commit extends TxnRequest<ReadNack>
     @Override
     public MessageType type()
     {
-        return MessageType.COMMIT_REQ;
+        switch (kind)
+        {
+            case Minimal: return MessageType.COMMIT_MINIMAL_REQ;
+            case Maximal: return MessageType.COMMIT_MAXIMAL_REQ;
+            default: throw new IllegalStateException();
+        }
     }
 
     @Override
     public String toString()
     {
-        return "Commit{txnId: " + txnId +
+        return "Commit{kind:" + kind +
+               ", txnId: " + txnId +
                ", executeAt: " + executeAt +
                ", deps: " + partialDeps +
                ", read: " + read +
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java 
b/accord-core/src/main/java/accord/messages/InformDurable.java
index d9f42214..d79b50af 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -15,10 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package accord.messages;
 
-import accord.api.ProgressLog.ProgressShard;
 import accord.local.Commands;
 import accord.local.Node.Id;
 import accord.local.PreLoadContext;
@@ -33,9 +31,6 @@ import accord.primitives.TxnId;
 import accord.topology.Topologies;
 import accord.utils.Invariants;
 
-import static accord.api.ProgressLog.ProgressShard.Adhoc;
-import static accord.api.ProgressLog.ProgressShard.Home;
-import static accord.api.ProgressLog.ProgressShard.Local;
 import static accord.local.PreLoadContext.contextFor;
 import static accord.messages.SimpleReply.Ok;
 
@@ -51,7 +46,6 @@ public class InformDurable extends TxnRequest<Reply> 
implements PreLoadContext
 
     public final Timestamp executeAt;
     public final Durability durability;
-    private transient ProgressShard shard;
 
     public InformDurable(Id to, Topologies topologies, FullRoute<?> route, 
TxnId txnId, Timestamp executeAt, Durability durability)
     {
@@ -79,11 +73,6 @@ public class InformDurable extends TxnRequest<Reply> 
implements PreLoadContext
             for (long epoch = waitForEpoch; progressKey == null && epoch > 
txnId.epoch() ; --epoch)
                 progressKey = node.trySelectProgressKey(epoch, scope, 
scope.homeKey());
             Invariants.checkState(progressKey != null);
-            shard = Adhoc;
-        }
-        else
-        {
-            shard = scope.homeKey().equals(progressKey) ? Home : Local;
         }
 
         // TODO (expected, efficiency): do not load from disk to perform this 
update
diff --git a/accord-core/src/main/java/accord/api/Read.java 
b/accord-core/src/main/java/accord/messages/LocalMessage.java
similarity index 63%
copy from accord-core/src/main/java/accord/api/Read.java
copy to accord-core/src/main/java/accord/messages/LocalMessage.java
index b9eaf2a1..07b11899 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/messages/LocalMessage.java
@@ -15,21 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package accord.messages;
 
-package accord.api;
+import accord.local.Node;
+import accord.local.PreLoadContext;
 
-import accord.local.SafeCommandStore;
-import accord.primitives.*;
-import accord.utils.async.AsyncChain;
-
-
-/**
- * A read to be performed on potentially multiple shards, the inputs of which 
may be fed to a {@link Query}
- */
-public interface Read
+public interface LocalMessage extends Message, PreLoadContext
 {
-    Seekables<?, ?> keys();
-    AsyncChain<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore 
commandStore, Timestamp executeAt, DataStore store);
-    Read slice(Ranges ranges);
-    Read merge(Read other);
+    interface Handler
+    {
+        void handle(LocalMessage message, Node node);
+    }
+
+    void process(Node node);
 }
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java 
b/accord-core/src/main/java/accord/messages/MessageType.java
index 7909157a..aab5e0c1 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -17,53 +17,86 @@
  */
 package accord.messages;
 
+import static accord.messages.MessageType.Kind.REMOTE;
+import static accord.messages.MessageType.Kind.LOCAL;
+
 /**
- * Meant to assist implementations map accord messages to their own messaging 
systems.
+ * Meant to assist implementations with mapping accord messages to their own 
messaging systems.
  */
 public enum MessageType
 {
-    SIMPLE_RSP               (false),
-    PRE_ACCEPT_REQ           (true ),
-    PRE_ACCEPT_RSP           (false),
-    ACCEPT_REQ               (true ),
-    ACCEPT_RSP               (false),
-    ACCEPT_INVALIDATE_REQ    (true ),
-    GET_DEPS_REQ             (false),
-    GET_DEPS_RSP             (false),
-    COMMIT_REQ               (true ),
-    COMMIT_INVALIDATE_REQ    (true ),
-    APPLY_REQ                (true ),
-    APPLY_RSP                (false),
-    READ_REQ                 (false),
-    READ_RSP                 (false),
-    BEGIN_RECOVER_REQ        (true ),
-    BEGIN_RECOVER_RSP        (false),
-    BEGIN_INVALIDATE_REQ     (true ),
-    BEGIN_INVALIDATE_RSP     (false),
-    WAIT_ON_COMMIT_REQ       (false),
-    WAIT_ON_COMMIT_RSP       (false),
-    WAIT_ON_APPLY_REQ        (false),
-    INFORM_OF_TXN_REQ        (true ),
-    INFORM_DURABLE_REQ       (true ),
-    INFORM_HOME_DURABLE_REQ  (true ),
-    CHECK_STATUS_REQ         (false),
-    CHECK_STATUS_RSP         (false),
-    FETCH_DATA_REQ           (false),
-    FETCH_DATA_RSP           (false),
-    SET_SHARD_DURABLE_REQ    (true ),
-    SET_GLOBALLY_DURABLE_REQ (true ),
-    QUERY_DURABLE_BEFORE_REQ (false),
-    QUERY_DURABLE_BEFORE_RSP (false),
-    FAILURE_RSP              (false),
+    SIMPLE_RSP               (REMOTE, false),
+    FAILURE_RSP              (REMOTE, false),
+    PRE_ACCEPT_REQ           (REMOTE, true ),
+    PRE_ACCEPT_RSP           (REMOTE, false),
+    ACCEPT_REQ               (REMOTE, true ),
+    ACCEPT_RSP               (REMOTE, false),
+    ACCEPT_INVALIDATE_REQ    (REMOTE, true ),
+    GET_DEPS_REQ             (REMOTE, false),
+    GET_DEPS_RSP             (REMOTE, false),
+    COMMIT_MINIMAL_REQ       (REMOTE, true ),
+    COMMIT_MAXIMAL_REQ       (REMOTE, true ),
+    COMMIT_INVALIDATE_REQ    (REMOTE, true ),
+    APPLY_MINIMAL_REQ        (REMOTE, true ),
+    APPLY_MAXIMAL_REQ        (REMOTE, true ),
+    APPLY_RSP                (REMOTE, false),
+    READ_REQ                 (REMOTE, false),
+    READ_RSP                 (REMOTE, false),
+    BEGIN_RECOVER_REQ        (REMOTE, true ),
+    BEGIN_RECOVER_RSP        (REMOTE, false),
+    BEGIN_INVALIDATE_REQ     (REMOTE, true ),
+    BEGIN_INVALIDATE_RSP     (REMOTE, false),
+    WAIT_ON_COMMIT_REQ       (REMOTE, false),
+    WAIT_ON_COMMIT_RSP       (REMOTE, false),
+    WAIT_ON_APPLY_REQ        (REMOTE, false),
+    INFORM_OF_TXN_REQ        (REMOTE, true ),
+    INFORM_DURABLE_REQ       (REMOTE, true ),
+    INFORM_HOME_DURABLE_REQ  (REMOTE, true ),
+    CHECK_STATUS_REQ         (REMOTE, false),
+    CHECK_STATUS_RSP         (REMOTE, false),
+    FETCH_DATA_REQ           (REMOTE, false),
+    FETCH_DATA_RSP           (REMOTE, false),
+    SET_SHARD_DURABLE_REQ    (REMOTE, true ),
+    SET_GLOBALLY_DURABLE_REQ (REMOTE, true ),
+    QUERY_DURABLE_BEFORE_REQ (REMOTE, false),
+    QUERY_DURABLE_BEFORE_RSP (REMOTE, false),
+
+    PROPAGATE_PRE_ACCEPT_MSG (LOCAL,  true ),
+    PROPAGATE_COMMIT_MSG     (LOCAL,  true ),
+    PROPAGATE_APPLY_MSG      (LOCAL,  true ),
+    PROPAGATE_OTHER_MSG      (LOCAL,  true ),
     ;
 
+    public enum Kind { LOCAL, REMOTE }
+
+    /**
+     * LOCAL messages are not sent to remote nodes.
+     */
+    private final Kind kind;
+
     /**
      * If true, indicates that processing of the message has important side 
effects.
      */
-    public final boolean hasSideEffects;
+    private final boolean hasSideEffects;
 
-    MessageType(boolean hasSideEffects)
+    MessageType(Kind kind, boolean hasSideEffects)
     {
         this.hasSideEffects = hasSideEffects;
+        this.kind = kind;
+    }
+
+    public boolean isLocal()
+    {
+        return kind == LOCAL;
+    }
+
+    public boolean isRemote()
+    {
+        return kind == REMOTE;
+    }
+
+    public boolean hasSideEffects()
+    {
+        return hasSideEffects;
     }
 }
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java 
b/accord-core/src/main/java/accord/messages/Propagate.java
new file mode 100644
index 00000000..68a63df1
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.messages;
+
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.coordinate.Infer;
+import accord.local.Command;
+import accord.local.Commands;
+import accord.local.Node;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.SaveStatus;
+import accord.local.Status;
+import accord.primitives.EpochSupplier;
+import accord.primitives.Keys;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+import accord.primitives.Route;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.utils.Invariants;
+import accord.utils.MapReduceConsume;
+
+import javax.annotation.Nullable;
+import java.util.function.BiConsumer;
+
+import static accord.local.Status.NotDefined;
+import static accord.local.Status.Phase.Cleanup;
+import static accord.local.Status.PreApplied;
+import static accord.primitives.Routables.Slice.Minimal;
+
+public class Propagate implements MapReduceConsume<SafeCommandStore, Void>, 
EpochSupplier, LocalMessage
+{
+    public static class SerializerSupport
+    {
+        public static Propagate create(TxnId txnId, Route<?> route, SaveStatus 
saveStatus, SaveStatus maxSaveStatus, Status.Durability durability, RoutingKey 
homeKey, RoutingKey progressKey, Status.Known achieved, PartialTxn partialTxn, 
PartialDeps partialDeps, long toEpoch, Timestamp executeAt, Writes writes, 
Result result)
+        {
+            return new Propagate(txnId, route, saveStatus, maxSaveStatus, 
durability, homeKey, progressKey, achieved, partialTxn, partialDeps, toEpoch, 
executeAt, writes, result, null);
+        }
+    }
+
+    public final TxnId txnId;
+    public final Route<?> route;
+    public final SaveStatus saveStatus;
+    public final SaveStatus maxSaveStatus;
+    public final Status.Durability durability;
+    @Nullable public final RoutingKey homeKey;
+    @Nullable public final RoutingKey progressKey;
+    // this is a WHOLE NODE measure, so if commit epoch has more ranges we do 
not count as committed if we can only commit in coordination epoch
+    public final Status.Known achieved;
+    @Nullable public final PartialTxn partialTxn;
+    @Nullable public final PartialDeps partialDeps;
+    public final long toEpoch;
+    @Nullable public final Timestamp executeAt;
+    @Nullable public final Writes writes;
+    @Nullable public final Result result;
+
+    transient final BiConsumer<Status.Known, Throwable> callback;
+
+    Propagate(
+        TxnId txnId,
+        Route<?> route,
+        SaveStatus saveStatus,
+        SaveStatus maxSaveStatus,
+        Status.Durability durability,
+        @Nullable RoutingKey homeKey,
+        @Nullable RoutingKey progressKey,
+        Status.Known achieved,
+        @Nullable PartialTxn partialTxn,
+        @Nullable PartialDeps partialDeps,
+        long toEpoch,
+        @Nullable Timestamp executeAt,
+        @Nullable Writes writes,
+        @Nullable Result result,
+        BiConsumer<Status.Known, Throwable> callback)
+    {
+        this.txnId = txnId;
+        this.route = route;
+        this.saveStatus = saveStatus;
+        this.maxSaveStatus = maxSaveStatus;
+        this.durability = durability;
+        this.homeKey = homeKey;
+        this.progressKey = progressKey;
+        this.achieved = achieved;
+        this.partialTxn = partialTxn;
+        this.partialDeps = partialDeps;
+        this.toEpoch = toEpoch;
+        this.executeAt = executeAt;
+        this.writes = writes;
+        this.result = result;
+        this.callback = callback;
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static void propagate(Node node, TxnId txnId, long sourceEpoch, 
CheckStatus.WithQuorum withQuorum, Route route, @Nullable Status.Known target, 
CheckStatus.CheckStatusOkFull full, BiConsumer<Status.Known, Throwable> 
callback)
+    {
+        if (full.saveStatus.status == NotDefined && full.invalidIfNotAtLeast 
== NotDefined)
+        {
+            callback.accept(Status.Known.Nothing, null);
+            return;
+        }
+
+        Invariants.checkState(sourceEpoch == txnId.epoch() || (full.executeAt 
!= null && sourceEpoch == full.executeAt.epoch()));
+        Route<?> maxRoute = Route.merge(route, full.route);
+
+        // TODO (required): permit individual shards that are behind to catch 
up by themselves
+        long toEpoch = sourceEpoch;
+        Ranges sliceRanges = 
node.topology().localRangesForEpochs(txnId.epoch(), toEpoch);
+        if (!maxRoute.covers(sliceRanges))
+        {
+            callback.accept(Status.Known.Nothing, null);
+            return;
+        }
+
+        RoutingKey progressKey = node.trySelectProgressKey(txnId, maxRoute);
+
+        Ranges covering = maxRoute.sliceCovering(sliceRanges, Minimal);
+        Participants<?> participatingKeys = 
maxRoute.participants().slice(covering, Minimal);
+        Status.Known achieved = full.sufficientFor(participatingKeys, 
withQuorum);
+        if (achieved.executeAt.hasDecidedExecuteAt() && full.executeAt.epoch() 
> toEpoch)
+        {
+            Ranges acceptRanges;
+            if (!node.topology().hasEpoch(full.executeAt.epoch()) ||
+                    (!maxRoute.covers(acceptRanges = 
node.topology().localRangesForEpochs(txnId.epoch(), full.executeAt.epoch()))))
+            {
+                // we don't know what the execution epoch requires, so we 
cannot be sure we can replicate it locally
+                // we *could* wait until we have the local epoch before 
running this
+                Status.Outcome outcome = 
achieved.outcome.propagatesBetweenShards() ? achieved.outcome : 
Status.Outcome.Unknown;
+                achieved = new Status.Known(achieved.definition, 
achieved.executeAt, Status.KnownDeps.DepsUnknown, outcome);
+            }
+            else
+            {
+                // TODO (expected): this should only be the two precise 
epochs, not the full range of epochs
+                sliceRanges = acceptRanges;
+                covering = maxRoute.sliceCovering(sliceRanges, Minimal);
+                participatingKeys = maxRoute.participants().slice(covering, 
Minimal);
+                Status.Known knownForExecution = 
full.sufficientFor(participatingKeys, withQuorum);
+                if ((target != null && 
target.isSatisfiedBy(knownForExecution)) || 
knownForExecution.isSatisfiedBy(achieved))
+                {
+                    achieved = knownForExecution;
+                    toEpoch = full.executeAt.epoch();
+                }
+                else
+                {
+                    Invariants.checkState(sourceEpoch == txnId.epoch(), "%d != 
%d", sourceEpoch, txnId.epoch());
+                    achieved = new Status.Known(achieved.definition, 
achieved.executeAt, knownForExecution.deps, knownForExecution.outcome);
+                }
+            }
+        }
+
+        PartialTxn partialTxn = null;
+        if (achieved.definition.isKnown())
+            partialTxn = full.partialTxn.slice(sliceRanges, 
true).reconstitutePartial(covering);
+
+        PartialDeps partialDeps = null;
+        if (achieved.deps.hasDecidedDeps())
+            partialDeps = 
full.committedDeps.slice(sliceRanges).reconstitutePartial(covering);
+
+        Propagate propagate =
+            new Propagate(txnId, maxRoute, full.saveStatus, 
full.maxSaveStatus, full.durability, full.homeKey, progressKey, achieved, 
partialTxn, partialDeps, toEpoch, full.executeAt, full.writes, full.result, 
callback);
+
+        node.localMessage(propagate);
+    }
+
+    @Override
+    public TxnId primaryTxnId()
+    {
+        return txnId;
+    }
+
+    @Override
+    public Seekables<?, ?> keys()
+    {
+        if (achieved.definition.isKnown())
+            return partialTxn.keys();
+        else if (achieved.deps.hasProposedOrDecidedDeps())
+            return partialDeps.keyDeps.keys();
+        else
+            return Keys.EMPTY;
+    }
+
+    @Override
+    public void process(Node node)
+    {
+        node.mapReduceConsumeLocal(this, route, txnId.epoch(), toEpoch, this);
+    }
+
+    @Override
+    public Void apply(SafeCommandStore safeStore)
+    {
+        SafeCommand safeCommand = safeStore.get(txnId, this, route);
+        Command command = safeCommand.current();
+        if (command.saveStatus().phase.compareTo(Status.Phase.Persist) >= 0)
+            return null;
+
+        Status propagate = achieved.propagate();
+        if (command.hasBeen(propagate))
+        {
+            if (maxSaveStatus.phase == Cleanup && 
durability.isDurableOrInvalidated() && Infer.safeToCleanup(safeStore, command, 
route, executeAt))
+                Commands.setTruncatedApply(safeStore, safeCommand);
+            return null;
+        }
+
+        switch (propagate)
+        {
+            default: throw new IllegalStateException("Unexpected status: " + 
propagate);
+            case Accepted:
+            case AcceptedInvalidate:
+                // we never "propagate" accepted statuses as these are 
essentially votes,
+                // and contribute nothing to our local state machine
+                throw new IllegalStateException("Invalid states to propagate: 
" + achieved.propagate());
+
+            case Truncated:
+                // if our peers have truncated this command, then either:
+                // 1) we have already applied it locally; 2) the command 
doesn't apply locally; 3) we are stale; or 4) the command is invalidated
+                if (command.hasBeen(PreApplied) || 
command.saveStatus().isUninitialised())
+                    break;
+
+                if (Infer.safeToCleanup(safeStore, command, route, executeAt))
+                {
+                    Commands.setErased(safeStore, safeCommand);
+                    break;
+                }
+
+                // TODO (required): check if we are stale
+                // otherwise we are either stale, or the command didn't reach 
consensus
+
+            case Invalidated:
+                Commands.commitInvalidate(safeStore, safeCommand, route);
+                break;
+
+            case Applied:
+            case PreApplied:
+                Invariants.checkState(executeAt != null);
+                if (toEpoch >= executeAt.epoch())
+                {
+                    confirm(Commands.apply(safeStore, safeCommand, txnId, 
route, progressKey, executeAt, partialDeps, partialTxn, writes, result));
+                    break;
+                }
+
+            case Committed:
+            case ReadyToExecute:
+                confirm(Commands.commit(safeStore, safeCommand, txnId, route, 
progressKey, partialTxn, executeAt, partialDeps));
+                break;
+
+            case PreCommitted:
+                Commands.precommit(safeStore, safeCommand, txnId, executeAt, 
route);
+                if (!achieved.definition.isKnown())
+                    break;
+
+            case PreAccepted:
+                // only preaccept if we coordinate the transaction
+                if (safeStore.ranges().coordinates(txnId).intersects(route) && 
Route.isFullRoute(route))
+                    Commands.preaccept(safeStore, safeCommand, txnId, 
txnId.epoch(), partialTxn, Route.castToFullRoute(route), progressKey);
+                break;
+
+            case NotDefined:
+                break;
+        }
+
+
+        if (!durability.isDurable() || homeKey == null)
+            return null;
+
+        if (!safeStore.ranges().coordinates(txnId).contains(homeKey))
+            return null;
+
+        Timestamp executeAt = saveStatus.known.executeAt.hasDecidedExecuteAt() 
? this.executeAt : null;
+        Commands.setDurability(safeStore, safeCommand, durability, route, 
executeAt);
+        return null;
+    }
+
+    @Override
+    public Void reduce(Void o1, Void o2)
+    {
+        return null;
+    }
+
+    @Override
+    public void accept(Void result, Throwable failure)
+    {
+        if (null != callback)
+            callback.accept(failure == null ? achieved : null, failure);
+    }
+
+    @Override
+    public MessageType type()
+    {
+        switch (achieved.propagate())
+        {
+            case Applied:
+            case PreApplied:
+                if (toEpoch >= executeAt.epoch())
+                    return MessageType.PROPAGATE_APPLY_MSG;
+            case Committed:
+            case ReadyToExecute:
+                return MessageType.PROPAGATE_COMMIT_MSG;
+            case PreCommitted:
+                if (!achieved.definition.isKnown())
+                    return MessageType.PROPAGATE_OTHER_MSG;
+            case PreAccepted:
+                return MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+            default:
+                return MessageType.PROPAGATE_OTHER_MSG;
+        }
+    }
+
+    @Override
+    public long epoch()
+    {
+        return toEpoch;
+    }
+
+    private static void confirm(Commands.CommitOutcome outcome)
+    {
+        switch (outcome)
+        {
+            default: throw new IllegalStateException("Unknown outcome: " + 
outcome);
+            case Redundant:
+            case Success:
+                return;
+            case Insufficient: throw new IllegalStateException("Should have 
enough information");
+        }
+    }
+
+    private static void confirm(Commands.ApplyOutcome outcome)
+    {
+        switch (outcome)
+        {
+            default: throw new IllegalStateException("Unknown outcome: " + 
outcome);
+            case Redundant:
+            case Success:
+                return;
+            case Insufficient: throw new IllegalStateException("Should have 
enough information");
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Propagate{type:" + type() +
+                ", txnId: " + txnId +
+                ", saveStatus: " + saveStatus +
+                ", deps: " + partialDeps +
+                ", txn: " + partialTxn +
+                ", executeAt: " + executeAt +
+                ", writes:" + writes +
+                ", result:" + result +
+                '}';
+    }
+}
diff --git a/accord-core/src/main/java/accord/primitives/Deps.java 
b/accord-core/src/main/java/accord/primitives/Deps.java
index 7a899fae..69476ed2 100644
--- a/accord-core/src/main/java/accord/primitives/Deps.java
+++ b/accord-core/src/main/java/accord/primitives/Deps.java
@@ -268,7 +268,7 @@ public class Deps
 
     public boolean equals(Deps that)
     {
-        return this.keyDeps.equals(that.keyDeps) && 
this.rangeDeps.equals(that.rangeDeps);
+        return that != null && this.keyDeps.equals(that.keyDeps) && 
this.rangeDeps.equals(that.rangeDeps);
     }
 
     public @Nullable TxnId maxTxnId()
diff --git a/accord-core/src/main/java/accord/primitives/PartialDeps.java 
b/accord-core/src/main/java/accord/primitives/PartialDeps.java
index a626baf2..f3770062 100644
--- a/accord-core/src/main/java/accord/primitives/PartialDeps.java
+++ b/accord-core/src/main/java/accord/primitives/PartialDeps.java
@@ -59,6 +59,13 @@ public class PartialDeps extends Deps
         return covering.containsAll(participants);
     }
 
+    public boolean isEqualOrFuller(PartialDeps that)
+    {
+        return covering.containsAll(that.covering)
+            && keyDeps.txnIds().containsAll(that.keyDeps.txnIds())
+            && rangeDeps.txnIds().containsAll(that.rangeDeps.txnIds());
+    }
+
     public PartialDeps with(PartialDeps that)
     {
         Invariants.checkArgument((this.rangeDeps == null) == (that.rangeDeps 
== null));
@@ -100,7 +107,7 @@ public class PartialDeps extends Deps
 
     public boolean equals(PartialDeps that)
     {
-        return this.covering.equals(that.covering) && super.equals(that);
+        return that != null && this.covering.equals(that.covering) && 
super.equals(that);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/primitives/PartialTxn.java 
b/accord-core/src/main/java/accord/primitives/PartialTxn.java
index 36bff360..262b3d6e 100644
--- a/accord-core/src/main/java/accord/primitives/PartialTxn.java
+++ b/accord-core/src/main/java/accord/primitives/PartialTxn.java
@@ -18,12 +18,14 @@
 
 package accord.primitives;
 
-import javax.annotation.Nullable;
+import java.util.Objects;
 
 import accord.api.Query;
 import accord.api.Read;
 import accord.api.Update;
 
+import javax.annotation.Nullable;
+
 public interface PartialTxn extends Txn
 {
     // TODO (expected): we no longer need this if everyone has a FullRoute
@@ -55,6 +57,8 @@ public interface PartialTxn extends Txn
         return covering().containsAll(participants);
     }
 
+    boolean isEqualOrFuller(PartialTxn txn);
+
     static PartialTxn merge(@Nullable PartialTxn a, @Nullable PartialTxn b)
     {
         return a == null ? b : b == null ? a : a.with(b);
@@ -106,6 +110,17 @@ public interface PartialTxn extends Txn
             return covering.containsAll(ranges);
         }
 
+        @Override
+        public boolean isEqualOrFuller(PartialTxn txn)
+        {
+            return kind() == txn.kind()
+                && covering().containsAll(txn.covering())
+                && keys().containsAll(txn.keys())
+                && read().isEqualOrFuller(txn.read())
+                && Objects.equals(query(), txn.query())
+                && ((update() == null && txn.update() == null) || (update() != 
null && txn.update() != null && update().isEqualOrFuller(txn.update())));
+        }
+
         @Override
         public Txn reconstitute(FullRoute<?> route)
         {
@@ -127,5 +142,4 @@ public interface PartialTxn extends Txn
             return new PartialTxn.InMemory(covering, kind(), keys(), read(), 
query(), update());
         }
     }
-
 }
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java 
b/accord-core/src/main/java/accord/primitives/Writes.java
index fb908abe..08fc9442 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -23,6 +23,7 @@ import accord.local.SafeCommandStore;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -33,9 +34,9 @@ public class Writes
     public final TxnId txnId;
     public final Timestamp executeAt;
     public final Seekables<?, ?> keys;
-    public final Write write;
+    @Nullable public final Write write;
 
-    public Writes(TxnId txnId, Timestamp executeAt, Seekables<?, ?> keys, 
Write write)
+    public Writes(TxnId txnId, Timestamp executeAt, Seekables<?, ?> keys, 
@Nullable Write write)
     {
         this.txnId = txnId;
         this.executeAt = executeAt;
@@ -49,7 +50,7 @@ public class Writes
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         Writes writes = (Writes) o;
-        return executeAt.equals(writes.executeAt) && keys.equals(writes.keys) 
&& write.equals(writes.write);
+        return executeAt.equals(writes.executeAt) && keys.equals(writes.keys) 
&& Objects.equals(write, writes.write);
     }
 
     public boolean isEmpty()
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index ec1e81e0..77a8bd81 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -38,6 +38,7 @@ import accord.impl.mock.MockStore;
 import accord.local.Node;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
+import accord.messages.LocalMessage;
 import accord.primitives.Keys;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
@@ -141,6 +142,7 @@ public class Utils
         Scheduler scheduler = new ThreadPoolScheduler();
         Node node = new Node(nodeId,
                              messageSink,
+                             LocalMessage::process,
                              new MockConfigurationService(messageSink, 
EpochFunction.noop(), topology),
                              clock,
                              
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 853dfe71..a7eff1cd 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -55,6 +55,7 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
+import accord.messages.LocalMessage;
 import accord.messages.MessageType;
 import accord.messages.Message;
 import accord.messages.Reply;
@@ -228,7 +229,7 @@ public class Cluster implements Scheduler
                 MessageSink messageSink = sinks.create(id, 
randomSupplier.get());
                 LongSupplier nowSupplier = nowSupplierSupplier.get();
                 BurnTestConfigurationService configService = new 
BurnTestConfigurationService(id, executor, randomSupplier, topology, 
lookup::get, topologyUpdates);
-                Node node = new Node(id, messageSink, configService, 
nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
+                Node node = new Node(id, messageSink, LocalMessage::process, 
configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, 
nowSupplier),
                                      () -> new ListStore(id), new 
ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
                                      executor.agent(),
                                      randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER,
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 4d04d512..fdd8a91b 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -46,6 +46,7 @@ import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.messages.Callback;
+import accord.messages.LocalMessage;
 import accord.messages.Reply;
 import accord.messages.Request;
 import accord.messages.SafeCallback;
@@ -120,6 +121,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
         MockConfigurationService configurationService = new 
MockConfigurationService(messageSink, onFetchTopology, topology);
         Node node = new Node(id,
                              messageSink,
+                             LocalMessage::process,
                              configurationService,
                              nowSupplier,
                              
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java 
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 365ceec9..15061ea5 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -109,7 +109,7 @@ public class ImmutableCommandTest
     private static Node createNode(Id id, CommandStoreSupport storeSupport)
     {
         MockCluster.Clock clock = new MockCluster.Clock(100);
-        Node node = new Node(id, null, new MockConfigurationService(null, 
(epoch, service) -> { }, storeSupport.local.get()),
+        Node node = new Node(id, null, null, new 
MockConfigurationService(null, (epoch, service) -> { }, 
storeSupport.local.get()),
                         clock, 
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
                         () -> storeSupport.data, new 
ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new 
TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
                         SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index f988703e..1af7d4c2 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -50,6 +50,7 @@ import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.messages.Callback;
+import accord.messages.LocalMessage;
 import accord.messages.Reply;
 import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
@@ -313,7 +314,7 @@ public class Cluster implements Scheduler
             {
                 MessageSink messageSink = sinks.create(node, 
randomSupplier.get());
                 LongSupplier nowSupplier = nowSupplierSupplier.get();
-                lookup.put(node, new Node(node, messageSink, new 
SimpleConfigService(topology),
+                lookup.put(node, new Node(node, messageSink, 
LocalMessage::process, new SimpleConfigService(topology),
                                           nowSupplier, 
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier),
                                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                                           MaelstromAgent.INSTANCE,
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index a1919045..871123a3 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -43,6 +43,7 @@ import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.maelstrom.Packet.Type;
 import accord.messages.Callback;
+import accord.messages.LocalMessage;
 import accord.messages.Reply;
 import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
@@ -172,7 +173,7 @@ public class Main
             MaelstromInit init = (MaelstromInit) packet.body;
             topology = topologyFactory.toTopology(init.cluster);
             sink = new StdoutSink(System::currentTimeMillis, scheduler, start, 
init.self, out, err);
-            on = new Node(init.self, sink, new SimpleConfigService(topology),
+            on = new Node(init.self, sink, LocalMessage::process, new 
SimpleConfigService(topology),
                           System::currentTimeMillis, 
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis),
                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                           MaelstromAgent.INSTANCE, new DefaultRandom(), 
scheduler, SizeOfIntersectionSorter.SUPPLIER,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to