This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f7b9bb88 Migrate in memory journal to CommandChange logic shared with 
AccordJournal
f7b9bb88 is described below

commit f7b9bb8887ed672185f269ebcbc9d11e6aeafca9
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Nov 26 15:25:32 2024 +0100

    Migrate in memory journal to CommandChange logic shared with AccordJournal
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20115
---
 accord-core/src/main/java/accord/api/Journal.java  |  12 +
 .../src/main/java/accord/impl/CommandChange.java   | 621 +++++++++++++++++++
 .../src/main/java/accord/local/Command.java        |  58 +-
 .../src/test/java/accord/burn/BurnTestBase.java    |   4 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |   5 +-
 .../java/accord/impl/basic/InMemoryJournal.java    | 661 ++++++++++-----------
 .../java/accord/impl/basic/LoggingJournal.java     |  21 +
 .../java/accord/impl/basic/VerifyingJournal.java   | 110 ----
 8 files changed, 1006 insertions(+), 486 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index 300fc274..2fd70165 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -29,6 +29,7 @@ import accord.local.RedundantBefore;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.utils.PersistentField.Persister;
 
 /**
  * Persisted journal for transactional recovery.
@@ -36,6 +37,8 @@ import accord.primitives.TxnId;
 public interface Journal
 {
     Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore 
redundantBefore, DurableBefore durableBefore);
+    Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load load, 
RedundantBefore redundantBefore, DurableBefore durableBefore);
+
     // TODO (required): use OnDone instead of Runnable
     void saveCommand(int store, CommandUpdate value, Runnable onFlush);
 
@@ -47,6 +50,8 @@ public interface Journal
     NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId);
     CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId);
 
+    Persister<DurableBefore, DurableBefore> durableBeforePersister();
+
     void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush);
 
     class CommandUpdate
@@ -82,6 +87,13 @@ public interface Journal
         }
     }
 
+    enum Load
+    {
+        ALL,
+        PURGEABLE,
+        MINIMAL
+    }
+
     /**
      * Helper for CommandStore to restore Command states.
      */
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java 
b/accord-core/src/main/java/accord/impl/CommandChange.java
new file mode 100644
index 00000000..e2c450d9
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -0,0 +1,621 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.function.Function;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.api.Agent;
+import accord.api.Result;
+import accord.local.Cleanup;
+import accord.local.Command;
+import accord.local.CommonAttributes;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.local.StoreParticipants;
+import accord.primitives.Ballot;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.SaveStatus;
+import accord.primitives.Status;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Writes;
+import accord.utils.Invariants;
+
+import static accord.api.Journal.Load;
+import static accord.api.Journal.Load.ALL;
+import static accord.impl.CommandChange.Fields.ACCEPTED;
+import static accord.impl.CommandChange.Fields.CLEANUP;
+import static accord.impl.CommandChange.Fields.DURABILITY;
+import static accord.impl.CommandChange.Fields.EXECUTES_AT_LEAST;
+import static accord.impl.CommandChange.Fields.EXECUTE_AT;
+import static accord.impl.CommandChange.Fields.FIELDS;
+import static accord.impl.CommandChange.Fields.PARTIAL_DEPS;
+import static accord.impl.CommandChange.Fields.PARTIAL_TXN;
+import static accord.impl.CommandChange.Fields.PARTICIPANTS;
+import static accord.impl.CommandChange.Fields.PROMISED;
+import static accord.impl.CommandChange.Fields.RESULT;
+import static accord.impl.CommandChange.Fields.SAVE_STATUS;
+import static accord.impl.CommandChange.Fields.WAITING_ON;
+import static accord.impl.CommandChange.Fields.WRITES;
+import static accord.local.Cleanup.NO;
+import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
+import static accord.primitives.Known.KnownDeps.DepsErased;
+import static accord.primitives.Known.KnownDeps.DepsUnknown;
+import static accord.primitives.Known.KnownDeps.NoDeps;
+import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
+import static accord.primitives.Status.Durability.NotDurable;
+import static accord.utils.Invariants.illegalState;
+
+public class CommandChange
+{
+    // This enum is order-dependent
+    public enum Fields
+    {
+        PARTICIPANTS, // stored first so we can index it
+        SAVE_STATUS,
+        PARTIAL_DEPS,
+        EXECUTE_AT,
+        EXECUTES_AT_LEAST,
+        DURABILITY,
+        ACCEPTED,
+        PROMISED,
+        WAITING_ON,
+        PARTIAL_TXN,
+        WRITES,
+        CLEANUP,
+        RESULT,
+        ;
+
+        public static final Fields[] FIELDS = values();
+    }
+
+    public static class Builder
+    {
+        protected final int mask;
+        protected int flags;
+
+        protected TxnId txnId;
+
+        protected Timestamp executeAt;
+        protected Timestamp executeAtLeast;
+        protected SaveStatus saveStatus;
+        protected Status.Durability durability;
+
+        protected Ballot acceptedOrCommitted;
+        protected Ballot promised;
+
+        protected StoreParticipants participants;
+        protected PartialTxn partialTxn;
+        protected PartialDeps partialDeps;
+
+        protected byte[] waitingOnBytes;
+        protected CommandChange.WaitingOnProvider waitingOn;
+        protected Writes writes;
+        protected Result result;
+        protected Cleanup cleanup;
+
+        protected boolean nextCalled;
+        protected int count;
+
+        public Builder(TxnId txnId, Load load)
+        {
+            this.mask = mask(load);
+            init(txnId);
+        }
+
+        public Builder(TxnId txnId)
+        {
+            this(txnId, ALL);
+        }
+
+        public Builder(Load load)
+        {
+            this.mask = mask(load);
+        }
+
+        public Builder()
+        {
+            this(ALL);
+        }
+
+        public TxnId txnId()
+        {
+            return txnId;
+        }
+
+        public Timestamp executeAt()
+        {
+            return executeAt;
+        }
+
+        // TODO: why is this unused in BurnTest
+        public Timestamp executeAtLeast()
+        {
+            return executeAtLeast;
+        }
+
+        public SaveStatus saveStatus()
+        {
+            return saveStatus;
+        }
+
+        public Status.Durability durability()
+        {
+            return durability;
+        }
+
+        public Ballot acceptedOrCommitted()
+        {
+            return acceptedOrCommitted;
+        }
+
+        public Ballot promised()
+        {
+            return promised;
+        }
+
+        public StoreParticipants participants()
+        {
+            return participants;
+        }
+
+        public PartialTxn partialTxn()
+        {
+            return partialTxn;
+        }
+
+        public PartialDeps partialDeps()
+        {
+            return partialDeps;
+        }
+
+        public CommandChange.WaitingOnProvider waitingOn()
+        {
+            return waitingOn;
+        }
+
+        public Writes writes()
+        {
+            return writes;
+        }
+
+        public Result result()
+        {
+            return result;
+        }
+
+        public void clear()
+        {
+            flags = 0;
+            txnId = null;
+
+            executeAt = null;
+            executeAtLeast = null;
+            saveStatus = null;
+            durability = null;
+
+            acceptedOrCommitted = null;
+            promised = null;
+
+            participants = null;
+            partialTxn = null;
+            partialDeps = null;
+
+            waitingOnBytes = null;
+            waitingOn = null;
+            writes = null;
+            result = null;
+            cleanup = null;
+
+            nextCalled = false;
+            count = 0;
+        }
+
+        public void reset(TxnId txnId)
+        {
+            clear();
+            init(txnId);
+        }
+
+        public void init(TxnId txnId)
+        {
+            this.txnId = txnId;
+            durability = NotDurable;
+            acceptedOrCommitted = promised = Ballot.ZERO;
+            waitingOn = (txn, deps) -> null;
+            result = null;
+        }
+
+        public boolean isEmpty()
+        {
+            return !nextCalled;
+        }
+
+        public int count()
+        {
+            return count;
+        }
+
+        public Cleanup shouldCleanup(Agent agent, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
+        {
+            if (!nextCalled)
+                return NO;
+
+            if (saveStatus == null || participants == null)
+                return Cleanup.NO;
+
+            Cleanup cleanup = Cleanup.shouldCleanupPartial(agent, txnId, 
saveStatus, durability, participants, redundantBefore, durableBefore);
+            if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
+                cleanup = this.cleanup;
+            return cleanup;
+        }
+
+        public Builder maybeCleanup(Cleanup cleanup)
+        {
+            if (saveStatus() == null)
+                return this;
+
+            switch (cleanup)
+            {
+                case EXPUNGE:
+                case ERASE:
+                    return null;
+
+                case EXPUNGE_PARTIAL:
+                    return expungePartial(cleanup, saveStatus, true);
+
+                case VESTIGIAL:
+                case INVALIDATE:
+                    return saveStatusOnly();
+
+                case TRUNCATE_WITH_OUTCOME:
+                case TRUNCATE:
+                    return expungePartial(cleanup, cleanup.appliesIfNot, 
cleanup == TRUNCATE_WITH_OUTCOME);
+
+                case NO:
+                    return this;
+                default:
+                    throw new IllegalStateException("Unknown cleanup: " + 
cleanup);}
+        }
+
+        public Builder expungePartial(Cleanup cleanup, SaveStatus saveStatus, 
boolean includeOutcome)
+        {
+            Invariants.checkState(txnId != null);
+            Builder builder = new Builder(txnId, ALL);
+
+            builder.count++;
+            builder.nextCalled = true;
+
+            Invariants.checkState(saveStatus != null);
+            builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
+            builder.saveStatus = saveStatus;
+            builder.flags = setFieldChanged(CLEANUP, builder.flags);
+            builder.cleanup = cleanup;
+            if (executeAt != null)
+            {
+                builder.flags = setFieldChanged(EXECUTE_AT, builder.flags);
+                builder.executeAt = executeAt;
+            }
+            if (durability != null)
+            {
+                builder.flags = setFieldChanged(DURABILITY, builder.flags);
+                builder.durability = durability;
+            }
+            if (participants != null)
+            {
+                builder.flags = setFieldChanged(PARTICIPANTS, builder.flags);
+                builder.participants = participants;
+            }
+            if (includeOutcome && builder.writes != null)
+            {
+                builder.flags = setFieldChanged(WRITES, builder.flags);
+                builder.writes = writes;
+            }
+
+            return builder;
+        }
+
+        public Builder saveStatusOnly()
+        {
+            Invariants.checkState(txnId != null);
+            Builder builder = new Builder(txnId, ALL);
+
+            builder.count++;
+            builder.nextCalled = true;
+
+            if (saveStatus != null)
+            {
+                builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
+                builder.saveStatus = saveStatus;
+            }
+
+            return builder;
+        }
+
+        public Command.Minimal asMinimal()
+        {
+            return new Command.Minimal(txnId, saveStatus, participants, 
durability, executeAt, writes);
+        }
+
+        public void forceResult(Result newValue)
+        {
+            this.result = newValue;
+        }
+
+        public Command construct(RedundantBefore redundantBefore)
+        {
+            if (!nextCalled)
+                return null;
+
+            Invariants.checkState(txnId != null);
+            CommonAttributes.Mutable attrs = new 
CommonAttributes.Mutable(txnId);
+            if (partialTxn != null)
+                attrs.partialTxn(partialTxn);
+            if (durability != null)
+                attrs.durability(durability);
+            if (participants != null)
+                
attrs.setParticipants(participants.filter(StoreParticipants.Filter.LOAD, 
redundantBefore, txnId, saveStatus.known.executeAt.isDecidedAndKnownToExecute() 
? executeAt : null));
+            if (participants != null)
+                attrs.setParticipants(participants);
+            if (partialDeps != null &&
+                (saveStatus.known.deps != NoDeps &&
+                 saveStatus.known.deps != DepsErased &&
+                 saveStatus.known.deps != DepsUnknown))
+                attrs.partialDeps(partialDeps);
+
+            switch (saveStatus.known.outcome)
+            {
+                case Erased:
+                case WasApply:
+                    writes = null;
+                    result = null;
+                    break;
+            }
+
+            Command.WaitingOn waitingOn = null;
+            if (this.waitingOn != null)
+                waitingOn = this.waitingOn.provide(txnId, partialDeps);
+
+            switch (saveStatus.status)
+            {
+                case NotDefined:
+                    return saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
+                                                                  : 
Command.NotDefined.notDefined(attrs, promised);
+                case PreAccepted:
+                    return Command.PreAccepted.preAccepted(attrs, executeAt, 
promised);
+                case AcceptedInvalidate:
+                case Accepted:
+                case PreCommitted:
+                    if (saveStatus == SaveStatus.AcceptedInvalidate)
+                        return 
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, 
acceptedOrCommitted);
+                    else
+                        return Command.Accepted.accepted(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted);
+                case Committed:
+                case Stable:
+                    return Command.Committed.committed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn);
+                case PreApplied:
+                case Applied:
+                    return Command.Executed.executed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
+                case Truncated:
+                case Invalidated:
+                    return truncated(attrs, saveStatus, executeAt, 
executeAtLeast, writes, result);
+                default:
+                    throw new IllegalStateException();
+            }
+        }
+
+        private static Command.Truncated truncated(CommonAttributes.Mutable 
attrs, SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, 
Writes writes, Result result)
+        {
+            switch (status)
+            {
+                default:
+                    throw illegalState("Unhandled SaveStatus: " + status);
+                case TruncatedApplyWithOutcome:
+                case TruncatedApplyWithDeps:
+                case TruncatedApply:
+                    if (status != TruncatedApplyWithOutcome)
+                        result = null;
+                    if (attrs.txnId().kind().awaitsOnlyDeps())
+                        return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result, executesAtLeast);
+                    return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result, null);
+                case ErasedOrVestigial:
+                    return Command.Truncated.erasedOrVestigial(attrs.txnId(), 
attrs.participants());
+                case Erased:
+                    // TODO (expected): why are we saving Durability here for 
erased commands?
+                    return Command.Truncated.erased(attrs.txnId(), 
attrs.durability(), attrs.participants());
+                case Invalidated:
+                    return Command.Truncated.invalidated(attrs.txnId());
+            }
+        }
+
+        public String toString()
+        {
+            return "Builder {" +
+                   "txnId=" + txnId +
+                   ", executeAt=" + executeAt +
+                   ", saveStatus=" + saveStatus +
+                   ", durability=" + durability +
+                   ", acceptedOrCommitted=" + acceptedOrCommitted +
+                   ", promised=" + promised +
+                   ", participants=" + participants +
+                   ", partialTxn=" + partialTxn +
+                   ", partialDeps=" + partialDeps +
+                   ", waitingOn=" + waitingOn +
+                   ", writes=" + writes +
+                   '}';
+        }
+    }
+
+    /**
+     * Helpers
+     */
+
+    public interface WaitingOnProvider
+    {
+        Command.WaitingOn provide(TxnId txnId, PartialDeps deps);
+    }
+
+    public static Command.WaitingOn getWaitingOn(Command command)
+    {
+        if (command instanceof Command.Committed)
+            return command.asCommitted().waitingOn();
+
+        return null;
+    }
+
+    /**
+     * Managing masks
+     */
+
+    public static int mask(Fields... fields)
+    {
+        int mask = -1;
+        for (Fields field : fields)
+            mask &= ~(1 << field.ordinal());
+        return mask;
+    }
+
+    private static final int[] LOAD_MASKS = new int[] {0,
+                                                       mask(SAVE_STATUS, 
PARTICIPANTS, DURABILITY, EXECUTE_AT, WRITES),
+                                                       mask(SAVE_STATUS, 
PARTICIPANTS, EXECUTE_AT)};
+
+    public static int mask(Load load)
+    {
+        return LOAD_MASKS[load.ordinal()];
+    }
+
+    /**
+     * Managing flags
+     */
+
+    @VisibleForTesting
+    public static int getFlags(Command before, Command after)
+    {
+        int flags = 0;
+
+        flags = collectFlags(before, after, Command::executeAt, true, 
EXECUTE_AT, flags);
+        flags = collectFlags(before, after, Command::executesAtLeast, true, 
EXECUTES_AT_LEAST, flags);
+        flags = collectFlags(before, after, Command::saveStatus, false, 
SAVE_STATUS, flags);
+        flags = collectFlags(before, after, Command::durability, false, 
DURABILITY, flags);
+
+        flags = collectFlags(before, after, Command::acceptedOrCommitted, 
false, ACCEPTED, flags);
+        flags = collectFlags(before, after, Command::promised, false, 
PROMISED, flags);
+
+        flags = collectFlags(before, after, Command::participants, true, 
PARTICIPANTS, flags);
+        flags = collectFlags(before, after, Command::partialTxn, false, 
PARTIAL_TXN, flags);
+        flags = collectFlags(before, after, Command::partialDeps, false, 
PARTIAL_DEPS, flags);
+
+        // TODO: waitingOn vs WaitingOnWithExecutedAt?
+        flags = collectFlags(before, after, CommandChange::getWaitingOn, true, 
WAITING_ON, flags);
+
+        flags = collectFlags(before, after, Command::writes, false, WRITES, 
flags);
+
+        // Special-cased for Journal BurnTest integration
+        if ((before != null && after.result() != before.result()) ||
+            (before == null && after.result() != null)) //TODO
+        {
+            flags = collectFlags(before, after, Command::writes, false, 
RESULT, flags);
+        }
+
+        return flags;
+    }
+
+    private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ, 
VAL> convert, boolean allowClassMismatch, Fields field, int flags)
+    {
+        VAL l = null;
+        VAL r = null;
+        if (lo != null) l = convert.apply(lo);
+        if (ro != null) r = convert.apply(ro);
+
+        if (l == r)
+            return flags; // no change
+
+        if (r == null)
+            flags = setFieldIsNull(field, flags);
+
+        if (l == null || r == null)
+            return setFieldChanged(field, flags);
+
+        assert allowClassMismatch || l.getClass() == r.getClass() : 
String.format("%s != %s", l.getClass(), r.getClass());
+
+        if (l.equals(r))
+            return flags; // no change
+
+        return setFieldChanged(field, flags);
+    }
+
+    // TODO (required): calculate flags once
+    public static boolean anyFieldChanged(int flags)
+    {
+        return (flags >>> 16) != 0;
+    }
+
+    public static int validateFlags(int flags)
+    {
+        Invariants.checkState(0 == (~(flags >>> 16) & (flags & 0xffff)));
+        return flags;
+    }
+
+    public static int setFieldChanged(Fields field, int oldFlags)
+    {
+        return oldFlags | (0x10000 << field.ordinal());
+    }
+
+    @VisibleForTesting
+    public static boolean getFieldChanged(Fields field, int oldFlags)
+    {
+        return (oldFlags & (0x10000 << field.ordinal())) != 0;
+    }
+
+    public static int toIterableSetFields(int flags)
+    {
+        return flags >>> 16;
+    }
+
+    public static Fields nextSetField(int iterable)
+    {
+        int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable));
+        return i == 32 ? null : FIELDS[i];
+    }
+
+    public static int unsetIterableFields(Fields field, int iterable)
+    {
+        return iterable & ~(1 << field.ordinal());
+    }
+
+    @VisibleForTesting
+    public static boolean getFieldIsNull(Fields field, int oldFlags)
+    {
+        return (oldFlags & (1 << field.ordinal())) != 0;
+    }
+
+    public static int unsetFieldIsNull(Fields field, int oldFlags)
+    {
+        return oldFlags & ~(1 << field.ordinal());
+    }
+
+    public static int setFieldIsNull(Fields field, int oldFlags)
+    {
+        return oldFlags | (1 << field.ordinal());
+    }
+
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index 14274670..62a85cf9 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -22,19 +22,17 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.api.VisibleForImplementation;
-import accord.primitives.SaveStatus;
-import accord.primitives.Status;
-import accord.primitives.Status.Durability;
-import accord.primitives.Known;
 import accord.local.cfk.CommandsForKey;
 import accord.primitives.Ballot;
 import accord.primitives.Deps;
 import accord.primitives.KeyDeps;
+import accord.primitives.Known;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Participants;
@@ -43,6 +41,9 @@ import accord.primitives.Ranges;
 import accord.primitives.Routable;
 import accord.primitives.Route;
 import accord.primitives.RoutingKeys;
+import accord.primitives.SaveStatus;
+import accord.primitives.Status;
+import accord.primitives.Status.Durability;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -54,12 +55,13 @@ import accord.utils.IndexedTriConsumer;
 import accord.utils.Invariants;
 import accord.utils.SimpleBitSet;
 
-import javax.annotation.Nonnull;
-
 import com.google.common.annotations.VisibleForTesting;
 
 import static accord.local.Command.AbstractCommand.validate;
+import static accord.primitives.Known.KnownExecuteAt.ExecuteAtKnown;
 import static accord.primitives.Routable.Domain.Key;
+import static accord.primitives.Routable.Domain.Range;
+import static accord.primitives.Routables.Slice;
 import static accord.primitives.SaveStatus.AcceptedInvalidate;
 import static accord.primitives.SaveStatus.ErasedOrVestigial;
 import static accord.primitives.SaveStatus.Uninitialised;
@@ -68,10 +70,7 @@ import static accord.primitives.Status.Durability.NotDurable;
 import static accord.primitives.Status.Durability.ShardUniversal;
 import static accord.primitives.Status.Durability.UniversalOrInvalidated;
 import static accord.primitives.Status.Invalidated;
-import static accord.primitives.Known.KnownExecuteAt.ExecuteAtKnown;
 import static accord.primitives.Status.Stable;
-import static accord.primitives.Routable.Domain.Range;
-import static accord.primitives.Routables.Slice.Minimal;
 import static accord.utils.Invariants.Paranoia.LINEAR;
 import static accord.utils.Invariants.Paranoia.NONE;
 import static accord.utils.Invariants.ParanoiaCostFactor.LOW;
@@ -284,7 +283,7 @@ public abstract class Command implements CommonAttributes
                     case DefinitionErased:
                     case DefinitionUnknown:
                     case NoOp:
-                        Invariants.checkState(partialTxn == null, "partialTxn 
is defined");
+                        Invariants.checkState(partialTxn == null, "partialTxn 
is defined %s", validate);
                         break;
                     case DefinitionKnown:
                         Invariants.checkState(partialTxn != null || 
validate.participants().owns().isEmpty(), "partialTxn is null");
@@ -1263,6 +1262,41 @@ public abstract class Command implements CommonAttributes
         }
     }
 
+    public static class Minimal
+    {
+        public final TxnId txnId;
+        public final SaveStatus saveStatus;
+        public final StoreParticipants participants;
+        public final Status.Durability durability;
+        public final Timestamp executeAt;
+        public final Writes writes;
+
+        public Minimal(TxnId txnId, SaveStatus saveStatus, StoreParticipants 
participants, Status.Durability durability, Timestamp executeAt, Writes writes)
+        {
+            this.txnId = txnId;
+            this.saveStatus = saveStatus;
+            this.participants = participants;
+            this.durability = durability;
+            this.executeAt = executeAt;
+            this.writes = writes;
+        }
+
+        @Override
+        public boolean equals(Object object)
+        {
+            if (this == object) return true;
+            if (object == null || getClass() != object.getClass()) return 
false;
+            Minimal minimal = (Minimal) object;
+            return Objects.equals(txnId, minimal.txnId) && saveStatus == 
minimal.saveStatus && Objects.equals(participants, minimal.participants) && 
durability == minimal.durability && Objects.equals(executeAt, 
minimal.executeAt) && Objects.equals(writes, minimal.writes);
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     public static class WaitingOn
     {
         private static final WaitingOn EMPTY_FOR_KEY = new 
WaitingOn(RoutingKeys.EMPTY, RangeDeps.NONE, KeyDeps.NONE, 
ImmutableBitSet.EMPTY, null);
@@ -1495,7 +1529,7 @@ public abstract class Command implements CommonAttributes
                         ranges = 
safeStore.redundantBefore().removePreBootstrap(txnId, ranges);
                         if (!ranges.isEmpty())
                         {
-                            
deps.rangeDeps.forEach(participants.stillExecutes().slice(ranges, Minimal), 
update, (upd, idx) -> {
+                            
deps.rangeDeps.forEach(participants.stillExecutes().slice(ranges, 
Slice.Minimal), update, (upd, idx) -> {
                                 TxnId id = upd.txnId(idx);
                                 // because we use RX as RedundantBefore 
bounds, we must not let an RX on a closing range
                                 // get ahead of one that isn't closed but has 
overlapping transactions (else we may erroneously treat as redundant)
@@ -1528,7 +1562,7 @@ public abstract class Command implements CommonAttributes
             @VisibleForTesting
             public static Update unsafeInitialise(TxnId txnId, Ranges 
executeRanges, Route<?> route, Deps deps)
             {
-                Unseekables<?> executionParticipants = 
route.slice(executeRanges, Minimal);
+                Unseekables<?> executionParticipants = 
route.slice(executeRanges, Slice.Minimal);
                 Update update = new Update(txnId, deps.keyDeps.keys(), 
deps.rangeDeps, deps.directKeyDeps);
                 // TODO (expected): refactor this to operate only on 
participants, not ranges
                 deps.rangeDeps.forEach(executionParticipants, update, 
Update::initialise);
diff --git a/accord-core/src/test/java/accord/burn/BurnTestBase.java 
b/accord-core/src/test/java/accord/burn/BurnTestBase.java
index 5ffa596e..2b04755b 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestBase.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestBase.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntSupplier;
@@ -50,6 +51,7 @@ import java.util.zip.CRC32;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Agent;
 import accord.api.Journal;
 import accord.api.Key;
 import accord.burn.random.FrequentLargeRange;
@@ -301,7 +303,7 @@ public class BurnTestBase
         f2.get();
     }
 
-    public static void burn(RandomSource random, TopologyFactory 
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int 
prefixCount, int operations, int concurrency, PendingQueue pendingQueue, 
Function<Id, Journal> journalFactory)
+    public static void burn(RandomSource random, TopologyFactory 
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int 
prefixCount, int operations, int concurrency, PendingQueue pendingQueue, 
BiFunction<Id, Agent, Journal> journalFactory)
     {
         List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
         AtomicLong progress = new AtomicLong();
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 63c75e74..bc5c4247 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -52,6 +52,7 @@ import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Agent;
 import accord.api.BarrierType;
 import accord.api.Journal;
 import accord.api.LocalConfig;
@@ -607,7 +608,7 @@ public class Cluster
                                               Supplier<RandomSource> 
randomSupplier,
                                               Supplier<TimeService> 
timeServiceSupplier,
                                               TopologyFactory topologyFactory, 
Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal,
-                                              Consumer<Map<Id, Node>> 
readySignal, Function<Node.Id, Journal> journalFactory)
+                                              Consumer<Map<Id, Node>> 
readySignal, BiFunction<Node.Id, Agent, Journal> journalFactory)
     {
         Topology topology = topologyFactory.toTopology(nodes);
         Map<Id, Node> nodeMap = new LinkedHashMap<>();
@@ -666,7 +667,7 @@ public class Cluster
                 BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges) 
-> configRandomizer.onStale(id, sinceAtLeast, ranges);
                 AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id, 
onStale);
                 executorMap.put(id, nodeExecutor);
-                Journal journal = journalFactory.apply(id);
+                Journal journal = journalFactory.apply(id, 
nodeExecutor.agent());
                 journalMap.put(id, journal);
                 BurnTestConfigurationService configService = new 
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, 
nodeMap::get, topologyUpdates);
                 DelayedCommandStores.CacheLoading cacheLoading = new 
RandomLoader(random).newLoader(journal);
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 074e5246..59f16268 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -20,31 +20,30 @@ package accord.impl.basic;
 
 import java.util.AbstractList;
 import java.util.ArrayList;
+import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Objects;
 import java.util.TreeMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSortedMap;
 
+import accord.api.Agent;
 import accord.api.Journal;
 import accord.api.Result;
+import accord.impl.CommandChange;
+import accord.impl.ErasedSafeCommand;
 import accord.impl.InMemoryCommandStore;
 import accord.local.Cleanup;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores;
 import accord.local.Commands;
-import accord.local.CommonAttributes;
 import accord.local.DurableBefore;
 import accord.local.Node;
 import accord.local.RedundantBefore;
 import accord.local.StoreParticipants;
 import accord.primitives.Ballot;
-import accord.primitives.Known;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Ranges;
@@ -54,10 +53,36 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.Invariants;
+import accord.utils.PersistentField;
 import org.agrona.collections.Int2ObjectHashMap;
 
-import static accord.local.StoreParticipants.Filter.LOAD;
-import static accord.primitives.SaveStatus.NotDefined;
+import static accord.api.Journal.Load.ALL;
+import static accord.impl.CommandChange.*;
+import static accord.impl.CommandChange.Fields.ACCEPTED;
+import static accord.impl.CommandChange.Fields.DURABILITY;
+import static accord.impl.CommandChange.Fields.EXECUTES_AT_LEAST;
+import static accord.impl.CommandChange.Fields.EXECUTE_AT;
+import static accord.impl.CommandChange.Fields.PARTIAL_DEPS;
+import static accord.impl.CommandChange.Fields.PARTIAL_TXN;
+import static accord.impl.CommandChange.Fields.PARTICIPANTS;
+import static accord.impl.CommandChange.Fields.PROMISED;
+import static accord.impl.CommandChange.Fields.RESULT;
+import static accord.impl.CommandChange.Fields.SAVE_STATUS;
+import static accord.impl.CommandChange.Fields.WAITING_ON;
+import static accord.impl.CommandChange.Fields.WRITES;
+import static accord.impl.CommandChange.anyFieldChanged;
+import static accord.impl.CommandChange.getFieldChanged;
+import static accord.impl.CommandChange.getFieldIsNull;
+import static accord.impl.CommandChange.getFlags;
+import static accord.impl.CommandChange.getWaitingOn;
+import static accord.impl.CommandChange.nextSetField;
+import static accord.impl.CommandChange.setFieldChanged;
+import static accord.impl.CommandChange.setFieldIsNull;
+import static accord.impl.CommandChange.toIterableSetFields;
+import static accord.impl.CommandChange.unsetFieldIsNull;
+import static accord.impl.CommandChange.unsetIterableFields;
+import static accord.impl.CommandChange.validateFlags;
+import static accord.primitives.SaveStatus.ErasedOrVestigial;
 import static accord.primitives.SaveStatus.Stable;
 import static accord.primitives.Status.Invalidated;
 import static accord.primitives.Status.Truncated;
@@ -69,16 +94,15 @@ public class InMemoryJournal implements Journal
     private final Int2ObjectHashMap<FieldUpdates> fieldStates = new 
Int2ObjectHashMap<>();
 
     private final Node.Id id;
+    private final Agent agent;
 
-    public InMemoryJournal(Node.Id id)
+    public InMemoryJournal(Node.Id id, Agent agent)
     {
         this.id = id;
+        this.agent = agent;
     }
-
     @Override
-    public Command loadCommand(int commandStoreId, TxnId txnId,
-                               // TODO: currently unused!
-                               RedundantBefore redundantBefore, DurableBefore 
durableBefore)
+    public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
         NavigableMap<TxnId, List<Diff>> commandStore = 
this.diffsPerCommandStore.get(commandStoreId);
 
@@ -89,17 +113,69 @@ public class InMemoryJournal implements Journal
         if (saved == null)
             return null;
 
-        return reconstruct(redundantBefore, saved);
+        Builder builder = reconstruct(saved, ALL);
+        Cleanup cleanup = builder.shouldCleanup(agent, redundantBefore, 
durableBefore);
+        switch (cleanup)
+        {
+            case EXPUNGE_PARTIAL:
+            case EXPUNGE:
+            case ERASE:
+                return ErasedSafeCommand.erased(txnId, ErasedOrVestigial);
+        }
+
+        return builder.construct(redundantBefore);
+    }
+
+    @Override
+    public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load 
load, RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
+        Builder builder = reconstruct(commandStoreId, txnId, load);
+        if (builder == null || builder.isEmpty())
+            return null;
+
+        Cleanup cleanup = builder.shouldCleanup(agent, redundantBefore, 
durableBefore);
+        switch (cleanup)
+        {
+            case EXPUNGE_PARTIAL:
+            case EXPUNGE:
+            case ERASE:
+                return null;
+        }
+
+        Invariants.checkState(builder.saveStatus() != null, "No saveSatus 
loaded, but next was called and cleanup was not: %s", builder);
+        return builder.asMinimal();
+    }
+
+    private Builder reconstruct(int commandStoreId, TxnId txnId, Load load)
+    {
+        NavigableMap<TxnId, List<Diff>> commandStore = 
this.diffsPerCommandStore.get(commandStoreId);
+
+        if (commandStore == null)
+            return null;
+
+        return 
reconstruct(this.diffsPerCommandStore.get(commandStoreId).get(txnId), load);
+    }
+
+    private Builder reconstruct(List<Diff> saved, Load load)
+    {
+        if (saved == null)
+            return null;
+
+        Builder builder = null;
+        for (Diff diff : saved)
+        {
+            if (builder == null)
+                builder = new Builder(diff.txnId, load);
+            builder.apply(diff);
+        }
+        return builder;
     }
 
     @Override
     public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
     {
         Diff diff;
-        if (update == null
-            || update.before == update.after
-            || update.after.saveStatus() == SaveStatus.Uninitialised
-            || (diff = diff(update.before, update.after)) == null)
+        if ((diff = toDiff(update)) == null)
         {
             if (onFlush!= null)
                 onFlush.run();
@@ -150,9 +226,15 @@ public class InMemoryJournal implements Journal
         return fieldStates.newRangesForEpoch;
     }
 
-    public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
+    @Override
+    public PersistentField.Persister<DurableBefore, DurableBefore> 
durableBeforePersister()
     {
+        throw new UnsupportedOperationException("Not implemented");
+    }
 
+    @Override
+    public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
+    {
         FieldUpdates fieldStates = this.fieldStates.computeIfAbsent(store, s 
-> {
             FieldUpdates init = new FieldUpdates();
             init.newRedundantBefore = RedundantBefore.EMPTY;
@@ -160,6 +242,7 @@ public class InMemoryJournal implements Journal
             init.newSafeToRead = ImmutableSortedMap.of(Timestamp.NONE, 
Ranges.EMPTY);
             return init;
         });
+
         if (fieldUpdates.newRedundantBefore != null)
             fieldStates.newRedundantBefore = fieldUpdates.newRedundantBefore;
         if (fieldUpdates.newSafeToRead != null)
@@ -180,18 +263,20 @@ public class InMemoryJournal implements Journal
         {
             int commandStoreId = e.getKey();
             Map<TxnId, List<Diff>> localJournal = e.getValue();
-            CommandStore commandStore = commandStores.forId(commandStoreId);
-            if (commandStore == null)
+            CommandStore store = commandStores.forId(commandStoreId);
+            if (store == null)
                 continue;
 
             for (Map.Entry<TxnId, List<Diff>> e2 : localJournal.entrySet())
             {
                 List<Diff> diffs = e2.getValue();
                 if (diffs.isEmpty()) continue;
-                Command command =  
reconstruct(commandStore.unsafeGetRedundantBefore(), diffs);
-                if (command.status() == Truncated || command.status() == 
Invalidated)
+                InMemoryJournal.Builder builder = reconstruct(diffs, ALL);
+                if (builder.saveStatus().status == Truncated || 
builder.saveStatus().status == Invalidated)
                     continue; // Already truncated
-                Cleanup cleanup = Cleanup.shouldCleanup(commandStore.agent(), 
command, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
+
+                Command command = 
builder.construct(store.unsafeGetRedundantBefore());
+                Cleanup cleanup = Cleanup.shouldCleanup(store.agent(), 
command, store.unsafeGetRedundantBefore(), store.durableBefore());
                 switch (cleanup)
                 {
                     case NO:
@@ -200,9 +285,9 @@ public class InMemoryJournal implements Journal
                     case TRUNCATE_WITH_OUTCOME:
                     case TRUNCATE:
                     case ERASE:
-                        command = Commands.purgeUnsafe(commandStore, command, 
cleanup);
+                        command = Commands.purgeUnsafe(store, command, 
cleanup);
                         Invariants.checkState(command.saveStatus() != 
SaveStatus.Uninitialised);
-                        Diff diff = diff(null, command);
+                        Diff diff = toDiff(new CommandUpdate(null, command));
                         e2.setValue(cleanup == Cleanup.ERASE ? new 
ErasedList(diff) : new TruncatedList(diff));
                         break;
 
@@ -225,6 +310,7 @@ public class InMemoryJournal implements Journal
         for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> diffEntry : 
diffsPerCommandStore.entrySet())
         {
             int commandStoreId = diffEntry.getKey();
+
             // copy to avoid concurrent modification when appending to journal
             Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffEntry.getValue());
 
@@ -237,7 +323,7 @@ public class InMemoryJournal implements Journal
             for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
             {
                 if (e.getValue().isEmpty()) continue;
-                Command command = 
reconstruct(commandStore.unsafeGetRedundantBefore(), e.getValue());
+                Command command = reconstruct(e.getValue(), 
ALL).construct(commandStore.unsafeGetRedundantBefore());
                 Invariants.checkState(command.saveStatus() != 
SaveStatus.Uninitialised,
                                       "Found uninitialized command in the log: 
%s %s", diffEntry.getKey(), e.getValue());
                 loader.load(command, sync);
@@ -247,13 +333,13 @@ public class InMemoryJournal implements Journal
         }
     }
 
-    static class ErasedList extends AbstractList<Diff>
+    private static class ErasedList extends AbstractList<Diff>
     {
         final Diff erased;
 
         ErasedList(Diff erased)
         {
-            Invariants.checkArgument(erased.saveStatus.value == 
SaveStatus.Erased);
+            Invariants.checkArgument(erased.changes.get(SAVE_STATUS) == 
SaveStatus.Erased);
             this.erased = erased;
         }
 
@@ -274,7 +360,7 @@ public class InMemoryJournal implements Journal
         @Override
         public boolean add(Diff diff)
         {
-            if (diff.saveStatus != null && diff.saveStatus.value == 
SaveStatus.Erased)
+            if (diff.changes.get(SAVE_STATUS) == SaveStatus.Erased)
                 return false;
             throw illegalState();
         }
@@ -288,7 +374,7 @@ public class InMemoryJournal implements Journal
         }
     }
 
-    static class PurgedList extends AbstractList<Diff>
+    private static class PurgedList extends AbstractList<Diff>
     {
         final List<Diff> purged;
         PurgedList(List<Diff> purged)
@@ -311,370 +397,223 @@ public class InMemoryJournal implements Journal
         @Override
         public boolean add(Diff diff)
         {
-            if (diff.saveStatus != null && diff.saveStatus.value == 
SaveStatus.Erased)
+            Object saveStatus = diff.changes.get(SAVE_STATUS);
+            if (saveStatus == SaveStatus.Erased)
                 return false;
             throw illegalState();
         }
     }
 
-    private Command reconstruct(RedundantBefore redundantBefore, List<Diff> 
diffs)
+    private static Diff toDiff(CommandUpdate update)
     {
-        Invariants.checkState(diffs != null && !diffs.isEmpty());
-
-        TxnId txnId = null;
-        Timestamp executeAt = null;
-        Timestamp executesAtLeast = null;
-        SaveStatus saveStatus = NotDefined;
-        Status.Durability durability = Status.Durability.NotDurable;
-
-        Ballot acceptedOrCommitted = Ballot.ZERO;
-        Ballot promised = Ballot.ZERO;
-
-        StoreParticipants participants = null;
-        PartialTxn partialTxn = null;
-        PartialDeps partialDeps = null;
-
-        Command.WaitingOn waitingOn = null;
-        Writes writes = null;
-        Result result = null;
-
-        for (int i = 0; i < diffs.size(); i++)
-        {
-            Diff diff = diffs.get(i);
-            if (diff.txnId != null)
-                txnId = diff.txnId;
-            if (diff.executeAt != null)
-                executeAt = diff.executeAt.get();
-            if (diff.executesAtLeast != null)
-                executesAtLeast = diff.executesAtLeast.get();
-            if (diff.saveStatus != null)
-                saveStatus = diff.saveStatus.get();
-            if (diff.durability != null)
-                durability = diff.durability.get();
-
-            if (diff.acceptedOrCommitted != null)
-                acceptedOrCommitted = diff.acceptedOrCommitted.get();
-            if (diff.promised != null)
-                promised = diff.promised.get();
-
-            if (diff.participants != null)
-                participants = diff.participants.get();
-            if (diff.partialTxn != null)
-                partialTxn = diff.partialTxn.get();
-            if (diff.partialDeps != null)
-                partialDeps = diff.partialDeps.get();
-
-            if (diff.waitingOn != null)
-                waitingOn = diff.waitingOn.get();
-            if (diff.writes != null)
-                writes = diff.writes.get();
-            if (diff.result != null)
-                result = diff.result.get();
-
-            try
-            {
-                if (!txnId.kind().awaitsOnlyDeps())
-                    executesAtLeast = null;
-            }
-            catch (Throwable t)
-            {
-                t.printStackTrace();
-            }
-
-            switch (saveStatus.known.outcome)
-            {
-                case Erased:
-                case WasApply:
-                    writes = null;
-                    result = null;
-                    break;
-            }
-        }
-
-        CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId);
-        if (partialTxn != null)
-            attrs.partialTxn(partialTxn);
-        if (durability != null)
-            attrs.durability(durability);
-        if (participants != null) 
attrs.setParticipants(participants.filter(LOAD, redundantBefore, txnId, 
saveStatus.known.executeAt.isDecidedAndKnownToExecute() ? executeAt : null));
-        else attrs.setParticipants(StoreParticipants.empty(txnId));
-
-        // TODO (desired): we can simplify this logic if, instead of diffing, 
we will infer the diff from the status
-        if (partialDeps != null &&
-            (saveStatus.known.deps != Known.KnownDeps.NoDeps &&
-             saveStatus.known.deps != Known.KnownDeps.DepsErased &&
-             saveStatus.known.deps != Known.KnownDeps.DepsUnknown))
-            attrs.partialDeps(partialDeps);
-
-        try
-        {
-
-            Command current;
-            switch (saveStatus.status)
-            {
-                case NotDefined:
-                    current = saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
-                                                                     : 
Command.NotDefined.notDefined(attrs, promised);
-                    break;
-                case PreAccepted:
-                    current = Command.PreAccepted.preAccepted(attrs, 
executeAt, promised);
-                    break;
-                case AcceptedInvalidate:
-                case Accepted:
-                case PreCommitted:
-                    if (saveStatus == SaveStatus.AcceptedInvalidate)
-                        current = 
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, 
acceptedOrCommitted);
-                    else
-                        current = Command.Accepted.accepted(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted);
-                    break;
-                case Committed:
-                case Stable:
-                    current = Command.Committed.committed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn);
-                    break;
-                case PreApplied:
-                case Applied:
-                    current = Command.Executed.executed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
-                    break;
-                case Invalidated:
-                case Truncated:
-                    current = truncated(attrs, saveStatus, executeAt, 
executesAtLeast, writes, result);
-                    break;
-                default:
-                    throw new IllegalStateException("Do not know " + 
saveStatus.status + " " + saveStatus);
-            }
+        if (update == null
+            || update.before == update.after
+            || update.after == null
+            || update.after.saveStatus() == SaveStatus.Uninitialised)
+            return null;
 
-            return current;
-        }
-        catch (Throwable t)
-        {
-            throw new RuntimeException("Can not reconstruct from diff:\n" + 
diffs.stream().map(o -> o.toString())
-                                                                               
  .collect(Collectors.joining("\n")),
-                                       t);
-        }
-    }
+        int flags = validateFlags(getFlags(update.before, update.after));
+        if (!anyFieldChanged(flags))
+            return null;
 
-    private static Command.Truncated truncated(CommonAttributes.Mutable attrs, 
SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes 
writes, Result result)
-    {
-        switch (status)
-        {
-            default:
-                throw illegalState("Unhandled SaveStatus: " + status);
-            case TruncatedApplyWithOutcome:
-            case TruncatedApplyWithDeps:
-            case TruncatedApply:
-                return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result, executesAtLeast);
-            case ErasedOrVestigial:
-                return Command.Truncated.erasedOrVestigial(attrs.txnId(), 
attrs.participants());
-            case Erased:
-                // TODO (expected): why are we saving Durability here for 
erased commands?
-                return Command.Truncated.erased(attrs.txnId(), 
attrs.durability(), attrs.participants());
-            case Invalidated:
-                return Command.Truncated.invalidated(attrs.txnId());
-        }
+        return new Diff(flags, update);
     }
 
     private static class Diff
     {
         public final TxnId txnId;
+        public final Map<Fields, Object> changes;
+        public final int flags;
 
-        public final NewValue<Timestamp> executeAt;
-        public final NewValue<Timestamp> executesAtLeast;
-        public final NewValue<SaveStatus> saveStatus;
-        public final NewValue<Status.Durability> durability;
-
-        public final NewValue<Ballot> acceptedOrCommitted;
-        public final NewValue<Ballot> promised;
-
-        public final NewValue<StoreParticipants> participants;
-        public final NewValue<PartialTxn> partialTxn;
-        public final NewValue<PartialDeps> partialDeps;
-
-        public final NewValue<Writes> writes;
-        public final NewValue<Command.WaitingOn> waitingOn;
-
-        public final NewValue<Result> result; // temporarily here for sakes 
for reloads
-
-        public Diff(TxnId txnId,
-                    NewValue<Timestamp> executeAt,
-                    NewValue<Timestamp> executesAtLeast,
-                    NewValue<SaveStatus> saveStatus,
-                    NewValue<Status.Durability> durability,
-
-                    NewValue<Ballot> acceptedOrCommitted,
-                    NewValue<Ballot> promised,
-
-                    NewValue<StoreParticipants> participants,
-                    NewValue<PartialTxn> partialTxn,
-                    NewValue<PartialDeps> partialDeps,
-                    NewValue<Command.WaitingOn> waitingOn,
-
-                    NewValue<Writes> writes,
-                    NewValue<Result> result)
-        {
-            this.txnId = txnId;
-            this.executeAt = executeAt;
-            this.executesAtLeast = executesAtLeast;
-            this.saveStatus = saveStatus;
-            this.durability = durability;
-
-            this.acceptedOrCommitted = acceptedOrCommitted;
-            this.promised = promised;
-
-            this.participants = participants;
-            this.partialTxn = partialTxn;
-            this.partialDeps = partialDeps;
-
-            this.writes = writes;
-            this.waitingOn = waitingOn;
-            this.result = result;
-        }
-
-        public boolean allNulls()
+        private Diff(int flags, CommandUpdate update)
         {
-            if (txnId != null) return false;
-            if (executeAt != null) return false;
-            if (executesAtLeast != null) return false;
-            if (saveStatus != null) return false;
-            if (durability != null) return false;
-            if (acceptedOrCommitted != null) return false;
-            if (promised != null) return false;
-            if (participants != null) return false;
-            if (partialTxn != null) return false;
-            if (partialDeps != null) return false;
-            if (writes != null) return false;
-            if (waitingOn != null) return false;
-            if (result != null) return false;
-            return true;
-        }
-
-        @Override
-        public String toString()
-        {
-            StringBuilder builder = new StringBuilder("Diff{");
-            if (txnId != null)
-                builder.append("txnId = ").append(txnId).append(" ");
-            if (executeAt != null)
-                builder.append("executeAt = ").append(executeAt).append(" ");
-            if (executesAtLeast != null)
-                builder.append("executesAtLeast = 
").append(executesAtLeast).append(" ");
-            if (saveStatus != null)
-                builder.append("saveStatus = ").append(saveStatus).append(" ");
-            if (durability != null)
-                builder.append("durability = ").append(durability).append(" ");
-            if (acceptedOrCommitted != null)
-                builder.append("acceptedOrCommitted = 
").append(acceptedOrCommitted).append(" ");
-            if (promised != null)
-                builder.append("promised = ").append(promised).append(" ");
-            if (participants != null)
-                builder.append("participants = 
").append(participants).append(" ");
-            if (partialTxn != null)
-                builder.append("partialTxn = ").append(partialTxn).append(" ");
-            if (partialDeps != null)
-                builder.append("partialDeps = ").append(partialDeps).append(" 
");
-            if (writes != null)
-                builder.append("writes = ").append(writes).append(" ");
-            if (waitingOn != null)
-                builder.append("waitingOn = ").append(waitingOn).append(" ");
-            if (result != null)
-                builder.append("result = ").append(result).append(" ");
-            builder.append("}");
-            return builder.toString();
-        }
-    }
-
-    static Diff diff(Command before, Command after)
-    {
-        if (Objects.equals(before, after))
-            return null;
-
-        Diff diff = new Diff(after.txnId(),
-                             ifNotEqual(before, after, Command::executeAt, 
true),
-                             ifNotEqual(before, after, 
Command::executesAtLeast, true),
-                             ifNotEqual(before, after, Command::saveStatus, 
false),
-
-                             ifNotEqual(before, after, Command::durability, 
false),
-                             ifNotEqual(before, after, 
Command::acceptedOrCommitted, false),
-                             ifNotEqual(before, after, Command::promised, 
false),
-
-                             ifNotEqual(before, after, Command::participants, 
true),
-                             ifNotEqual(before, after, Command::partialTxn, 
false),
-                             ifNotEqual(before, after, Command::partialDeps, 
false),
-                             ifNotEqual(before, after, 
InMemoryJournal::getWaitingOn, true),
-                             ifNotEqual(before, after, Command::writes, false),
-                             ifNotEqual(before, after, Command::result, 
false));
-        if (diff.allNulls())
-            return null;
-
-        return diff;
-    }
-
-    static Command.WaitingOn getWaitingOn(Command command)
-    {
-        if (command instanceof Command.Committed)
-            return command.asCommitted().waitingOn();
-
-        return null;
-    }
-
-    private static <OBJ, VAL> NewValue<VAL> ifNotEqual(OBJ lo, OBJ ro, 
Function<OBJ, VAL> convert, boolean allowClassMismatch)
-    {
-        VAL l = null;
-        VAL r = null;
-        if (lo != null) l = convert.apply(lo);
-        if (ro != null) r = convert.apply(ro);
-
-        if (l == r)
-            return null; // null here means there was no change
-
-        if (l == null || r == null)
-            return NewValue.of(r);
+            this.flags = flags;
+            this.txnId = update.txnId;
+            this.changes = new EnumMap<>(Fields.class);
 
-        assert allowClassMismatch || l.getClass() == r.getClass() : 
String.format("%s != %s", l.getClass(), r.getClass());
+            Command after = update.after;
+            int iterable = toIterableSetFields(flags);
+            while (iterable != 0)
+            {
+                Fields field = nextSetField(iterable);
+                if (!getFieldChanged(field, flags) || getFieldIsNull(field, 
flags))
+                {
+                    iterable = unsetIterableFields(field, iterable);
+                    continue;
+                }
 
-        if (l.equals(r))
-            return null;
+                switch (field)
+                {
+                    case EXECUTE_AT:
+                        changes.put(EXECUTE_AT, after.executeAt());
+                        break;
+                    case EXECUTES_AT_LEAST:
+                        changes.put(EXECUTES_AT_LEAST, 
after.executesAtLeast());
+                        break;
+                    case SAVE_STATUS:
+                        changes.put(SAVE_STATUS, after.saveStatus());
+                        break;
+                    case DURABILITY:
+                        changes.put(DURABILITY, after.durability());
+                        break;
+                    case ACCEPTED:
+                        changes.put(ACCEPTED, after.acceptedOrCommitted());
+                        break;
+                    case PROMISED:
+                        changes.put(PROMISED, after.promised());
+                        break;
+                    case PARTICIPANTS:
+                        changes.put(PARTICIPANTS, after.participants());
+                        break;
+                    case PARTIAL_TXN:
+                        changes.put(PARTIAL_TXN, after.partialTxn());
+                        break;
+                    case PARTIAL_DEPS:
+                        changes.put(PARTIAL_DEPS, after.partialDeps());
+                        break;
+                    case WAITING_ON:
+                        Command.WaitingOn waitingOn = getWaitingOn(after);
+                        changes.put(WAITING_ON, (WaitingOnProvider) (txnId, 
deps) -> waitingOn);
+                        break;
+                    case WRITES:
+                        changes.put(WRITES, after.writes());
+                        break;
+                    case RESULT:
+                        changes.put(RESULT, after.result());
+                        break;
+                    case CLEANUP:
+                }
 
-        // TODO (expected): do we need to wrap this in a value object if we 
also have flags?
-        return NewValue.of(r);
+                iterable = unsetIterableFields(field, iterable);
+            }
+        }
     }
 
-    private static class NewValue<T>
+    private static class Builder extends CommandChange.Builder
     {
-        final T value;
-
-        private NewValue(T value)
+        private Builder(TxnId txnId, Load load)
         {
-            this.value = value;
+            super(txnId, load);
         }
 
-        public T get()
+        private void apply(Diff diff)
         {
-            return value;
-        }
+            Invariants.checkState(diff.txnId != null);
+            Invariants.checkState(diff.flags != 0);
+            nextCalled = true;
+            count++;
 
-        public static <T> NewValue<T> of(T value)
-        {
-            return new NewValue<>(value);
-        }
+            int iterable = toIterableSetFields(diff.flags);
+            while (iterable != 0)
+            {
+                Fields field = nextSetField(iterable);
+                if (getFieldChanged(field, diff.flags))
+                {
+                    this.flags = setFieldChanged(field, this.flags);
+                    if (getFieldIsNull(field, diff.flags))
+                    {
+                        this.flags = setFieldIsNull(field, this.flags);
+                        setNull(field);
+                    }
+                    else
+                    {
+                        this.flags = unsetFieldIsNull(field, this.flags);
+                        deserialize(diff, field);
+                    }
+                }
 
-        public String toString()
-        {
-            return "" + value;
+                iterable = unsetIterableFields(field, iterable);
+            }
         }
 
-        @Override
-        public boolean equals(Object o)
+        private void setNull(Fields field)
         {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            NewValue<?> newValue = (NewValue<?>) o;
-            return Objects.equals(value, newValue.value);
+            switch (field)
+            {
+                case EXECUTE_AT:
+                    executeAt = null;
+                    break;
+                case EXECUTES_AT_LEAST:
+                    executeAtLeast = null;
+                    break;
+                case SAVE_STATUS:
+                    saveStatus = null;
+                    break;
+                case DURABILITY:
+                    durability = null;
+                    break;
+                case ACCEPTED:
+                    acceptedOrCommitted = null;
+                    break;
+                case PROMISED:
+                    promised = null;
+                    break;
+                case PARTICIPANTS:
+                    participants = null;
+                    break;
+                case PARTIAL_TXN:
+                    partialTxn = null;
+                    break;
+                case PARTIAL_DEPS:
+                    partialDeps = null;
+                    break;
+                case WAITING_ON:
+                    waitingOn = null;
+                    break;
+                case WRITES:
+                    writes = null;
+                    break;
+                case RESULT:
+                    result = null;
+                    break;
+                case CLEANUP:
+                    throw new IllegalStateException();
+            }
         }
 
-        @Override
-        public int hashCode()
+        private void deserialize(Diff diff, Fields field)
         {
-            throw new UnsupportedOperationException();
+            switch (field)
+            {
+                case EXECUTE_AT:
+                    executeAt = Invariants.nonNull((Timestamp) 
diff.changes.get(EXECUTE_AT));
+                    break;
+                case EXECUTES_AT_LEAST:
+                    executeAtLeast = Invariants.nonNull((Timestamp) 
diff.changes.get(EXECUTES_AT_LEAST));
+                    break;
+                case SAVE_STATUS:
+                    saveStatus = Invariants.nonNull((SaveStatus) 
diff.changes.get(SAVE_STATUS));
+                    break;
+                case DURABILITY:
+                    durability = Invariants.nonNull((Status.Durability) 
diff.changes.get(DURABILITY));
+                    break;
+                case ACCEPTED:
+                    acceptedOrCommitted = Invariants.nonNull((Ballot) 
diff.changes.get(ACCEPTED));
+                    break;
+                case PROMISED:
+                    promised = Invariants.nonNull((Ballot) 
diff.changes.get(PROMISED));
+                    break;
+                case PARTICIPANTS:
+                    participants = Invariants.nonNull((StoreParticipants) 
diff.changes.get(PARTICIPANTS));
+                    break;
+                case PARTIAL_TXN:
+                    partialTxn = Invariants.nonNull((PartialTxn) 
diff.changes.get(PARTIAL_TXN));
+                    break;
+                case PARTIAL_DEPS:
+                    partialDeps = Invariants.nonNull((PartialDeps) 
diff.changes.get(PARTIAL_DEPS));
+                    break;
+                case WAITING_ON:
+                    waitingOn = Invariants.nonNull((WaitingOnProvider) 
diff.changes.get(WAITING_ON));
+                    break;
+                case WRITES:
+                    writes = Invariants.nonNull((Writes) 
diff.changes.get(WRITES));
+                    break;
+                case RESULT:
+                    result = Invariants.nonNull((Result) 
diff.changes.get(RESULT));
+                    break;
+                case CLEANUP:
+                    throw new IllegalStateException();
+            }
         }
     }
 }
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index d9cee7af..b40134a3 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -34,6 +34,7 @@ import accord.local.RedundantBefore;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.utils.PersistentField;
 
 /**
  * Logging journal, a wrapper over journal for debugging / inspecting history 
purposes.
@@ -70,48 +71,68 @@ public class LoggingJournal implements Journal
         }
     }
 
+    @Override
     public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
         return delegate.loadCommand(commandStoreId, txnId, redundantBefore, 
durableBefore);
     }
 
+    @Override
+    public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load 
load, RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
+        return delegate.loadMinimal(commandStoreId, txnId, load, 
redundantBefore, durableBefore);
+    }
+
+    @Override
     public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
     {
         log("%d: %s\n", store, update.after);
         delegate.saveCommand(store, update, onFlush);
     }
 
+    @Override
     public void purge(CommandStores commandStores)
     {
         log("PURGE\n");
         delegate.purge(commandStores);
     }
 
+    @Override
     public void replay(CommandStores commandStores)
     {
         delegate.replay(commandStores);
     }
 
+    @Override
     public RedundantBefore loadRedundantBefore(int commandStoreId)
     {
         return delegate.loadRedundantBefore(commandStoreId);
     }
 
+    @Override
     public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
     {
         return delegate.loadBootstrapBeganAt(commandStoreId);
     }
 
+    @Override
     public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
     {
         return delegate.loadSafeToRead(commandStoreId);
     }
 
+    @Override
     public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
     {
         return delegate.loadRangesForEpoch(commandStoreId);
     }
 
+    @Override
+    public PersistentField.Persister<DurableBefore, DurableBefore> 
durableBeforePersister()
+    {
+        return delegate.durableBeforePersister();
+    }
+
     public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
     {
         log("%d: %s", store, fieldUpdates);
diff --git a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
deleted file mode 100644
index 5776c51f..00000000
--- a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.impl.basic;
-
-import java.util.NavigableMap;
-
-import accord.api.Journal;
-import accord.local.Command;
-import accord.local.CommandStores;
-import accord.local.DurableBefore;
-import accord.local.RedundantBefore;
-import accord.primitives.Ranges;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.utils.Invariants;
-
-/**
- * A simple version of journal that can be useful for debugging issues with an 
implementation that checks command loading
- * from SUT vs model.
- */
-public class VerifyingJournal implements Journal
-{
-    private final Journal sut;
-    private final Journal model;
-
-    public VerifyingJournal(Journal model, Journal sut)
-    {
-        this.model = model;
-        this.sut = sut;
-    }
-
-    public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
-    {
-        Command model = this.model.loadCommand(commandStoreId, txnId, 
redundantBefore, durableBefore);
-        Command sut = this.sut.loadCommand(commandStoreId, txnId, 
redundantBefore, durableBefore);
-        Invariants.checkState(sut.equals(model));
-        return sut;
-    }
-
-    public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
-    {
-        model.saveCommand(store, update, null);
-        sut.saveCommand(store, update, onFlush);
-    }
-
-    public void purge(CommandStores commandStores)
-    {
-        model.purge(commandStores);
-        sut.purge(commandStores);
-    }
-
-    public void replay(CommandStores commandStores)
-    {
-        sut.replay(commandStores);
-    }
-
-    public RedundantBefore loadRedundantBefore(int commandStoreId)
-    {
-        RedundantBefore model = this.model.loadRedundantBefore(commandStoreId);
-        RedundantBefore sut = this.sut.loadRedundantBefore(commandStoreId);
-        Invariants.checkState(sut.equals(model), "%s should equal %s", sut, 
model);
-        return sut;
-    }
-
-    public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
-    {
-        NavigableMap<TxnId, Ranges> model = 
this.sut.loadBootstrapBeganAt(commandStoreId);
-        NavigableMap<TxnId, Ranges> sut = 
this.sut.loadBootstrapBeganAt(commandStoreId);
-        Invariants.checkState(sut.equals(model), "%s should equal %s", sut, 
model);
-        return sut;
-    }
-
-    public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
-    {
-        NavigableMap<Timestamp, Ranges> model = 
this.model.loadSafeToRead(commandStoreId);
-        NavigableMap<Timestamp, Ranges> sut = 
this.sut.loadSafeToRead(commandStoreId);
-        Invariants.checkState(sut.equals(model), "%s should equal %s", sut, 
model);
-        return sut;
-    }
-
-    public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
-    {
-        CommandStores.RangesForEpoch model = 
this.sut.loadRangesForEpoch(commandStoreId);
-        CommandStores.RangesForEpoch sut = 
this.sut.loadRangesForEpoch(commandStoreId);
-        Invariants.checkState(sut.equals(model), "%s should equal %s", sut, 
model);
-        return sut;
-    }
-
-    public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
-    {
-        model.saveStoreState(store, fieldUpdates, onFlush);
-        sut.saveStoreState(store, fieldUpdates, onFlush);
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to