This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch semi-integrated-burn-test-rebased
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit f0b29213ab32d39e3b030bf217ee8b19836cdfff
Author: Alex Petrov <[email protected]>
AuthorDate: Fri Nov 15 16:46:07 2024 +0100

    Move files
---
 .../main-test/java/accord/impl/basic/Cluster.java  |   2 +-
 .../java/accord/impl/basic/InMemoryJournal.java}   | 340 ++++++------
 .../java/accord/impl/basic/LoggingJournal.java     | 120 +++++
 accord-core/src/main/java/accord/api/Journal.java  | 102 ++++
 .../src/main/java/accord/impl/CommandChange.java   | 598 +++++++++++++++++++++
 .../java/accord/impl/DurabilityScheduling.java     |  11 +-
 .../java/accord/impl/InMemoryCommandStore.java     | 152 +++---
 .../src/main/java/accord/local/CommandStore.java   |   3 +
 .../java/accord/messages/ReadEphemeralTxnData.java |   3 +-
 .../src/test/java/accord/burn/BurnTest.java        |  25 +-
 .../coordinate/CoordinateTransactionTest.java      |  19 +-
 .../test/java/accord/impl/RemoteListenersTest.java |   7 +
 .../java/accord/local/cfk/CommandsForKeyTest.java  |   6 +
 13 files changed, 1155 insertions(+), 233 deletions(-)

diff --git a/accord-core/src/main-test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/main-test/java/accord/impl/basic/Cluster.java
index cf0604ad..bd2fb921 100644
--- a/accord-core/src/main-test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/main-test/java/accord/impl/basic/Cluster.java
@@ -560,6 +560,7 @@ public class Cluster
                     NavigableMap<RoutableKey, Timestamped<int[]>> prevData = 
listStore.copyOfCurrentData();
                     listStore.clear();
                     listStore.restoreFromSnapshot();
+                    Int2ObjectHashMap<NavigableMap<TxnId, Command>> 
beforeStores = copyCommands(stores);
                     for (CommandStore s : stores)
                     {
                         DelayedCommandStores.DelayedCommandStore store = 
(DelayedCommandStores.DelayedCommandStore) s;
@@ -567,7 +568,6 @@ public class Cluster
                     }
 
                     Journal journal = journalMap.get(id);
-                    Int2ObjectHashMap<NavigableMap<TxnId, Command>> 
beforeStores = copyCommands(stores);
                     journal.replay(nodeMap.get(id).commandStores());
                     while (sinks.drain(pred));
                     CommandsForKey.enableLinearizabilityViolationsReporting();
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java 
b/accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java
similarity index 67%
rename from accord-core/src/test/java/accord/impl/basic/Journal.java
rename to accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java
index 27f9cb19..a74232d4 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java
@@ -20,76 +20,156 @@ package accord.impl.basic;
 
 import java.util.AbstractList;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Function;
-import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableSortedMap;
+
+import accord.api.Journal;
 import accord.api.Result;
 import accord.impl.InMemoryCommandStore;
 import accord.local.Cleanup;
 import accord.local.Command;
 import accord.local.CommandStore;
+import accord.local.CommandStores;
 import accord.local.Commands;
 import accord.local.CommonAttributes;
+import accord.local.DurableBefore;
 import accord.local.Node;
-import accord.primitives.Known;
-import accord.primitives.SaveStatus;
-import accord.primitives.Status;
+import accord.local.RedundantBefore;
 import accord.local.StoreParticipants;
 import accord.primitives.Ballot;
+import accord.primitives.Known;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SaveStatus;
+import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.Invariants;
-import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.Int2ObjectHashMap;
 
-import static accord.primitives.SaveStatus.Erased;
 import static accord.primitives.SaveStatus.NotDefined;
 import static accord.primitives.SaveStatus.Stable;
 import static accord.primitives.Status.Invalidated;
 import static accord.primitives.Status.Truncated;
 import static accord.utils.Invariants.illegalState;
 
-public class Journal
+// TODO: looks like we also have accord via dependency here somehow?
+public class InMemoryJournal implements Journal
 {
-    private final Long2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> 
diffsPerCommandStore = new Long2ObjectHashMap<>();
+    private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> 
diffsPerCommandStore = new Int2ObjectHashMap<>();
+    private final FieldStates fieldStates;
+
+    private static class FieldStates
+    {
+        RedundantBefore redundantBefore = RedundantBefore.EMPTY;
+        NavigableMap<TxnId, Ranges> bootstrapBeganAt = 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY);
+        NavigableMap<Timestamp, Ranges> safeToRead = 
ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY);
+        CommandStores.RangesForEpoch.Snapshot rangesForEpoch = null;
+    }
 
     private final Node.Id id;
 
-    public Journal(Node.Id id)
+    public InMemoryJournal(Node.Id id)
     {
         this.id = id;
+        this.fieldStates = new FieldStates();
     }
 
-    public void purge(IntFunction<CommandStore> storeSupplier)
+    @Override
+    public Command loadCommand(int commandStoreId, TxnId txnId,
+                               // TODO: currently unused!
+                               RedundantBefore redundantBefore, DurableBefore 
durableBefore)
     {
-        for (Map.Entry<Long, NavigableMap<TxnId, List<Diff>>> e : 
diffsPerCommandStore.entrySet())
+        List<Diff> diffs = 
this.diffsPerCommandStore.get(commandStoreId).get(txnId);
+        return reconstruct(diffs);
+    }
+
+    @Override
+    public void saveCommand(int store, CommandUpdate diff, Runnable onFlush)
+    {
+        if (diff == null)
+            return;
+
+       if (diff.after.saveStatus() == SaveStatus.Erased)
         {
-            int commandStoreId = e.getKey().intValue();
+            diffsPerCommandStore.computeIfAbsent(store, (k) -> new TreeMap<>())
+                                .remove(diff.after.txnId());
+            return;
+        }
+
+        diffsPerCommandStore.computeIfAbsent(store, (k) -> new TreeMap<>())
+                            .computeIfAbsent(diff.txnId, (k_) -> new 
ArrayList<>())
+                            .add(diff(diff.before, diff.after));
+
+        if (onFlush!= null)
+            onFlush.run();
+    }
+
+    @Override
+    public RedundantBefore loadRedundantBefore(int commandStoreId)
+    {
+        return fieldStates.redundantBefore;
+    }
+
+    @Override
+    public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
+    {
+        return fieldStates.bootstrapBeganAt;
+    }
+
+    @Override
+    public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
+    {
+        return fieldStates.safeToRead;
+    }
+
+    @Override
+    public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId)
+    {
+        return fieldStates.rangesForEpoch;
+    }
+
+    public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
+    {
+        if (fieldUpdates.newRedundantBefore != null)
+            fieldStates.redundantBefore = fieldUpdates.newRedundantBefore;
+        if (fieldUpdates.newSafeToRead != null)
+            fieldStates.safeToRead = fieldUpdates.newSafeToRead;
+        if (fieldUpdates.newBootstrapBeganAt != null)
+            fieldStates.bootstrapBeganAt = fieldUpdates.newBootstrapBeganAt;
+        if (fieldUpdates.newRangesForEpoch != null)
+            fieldStates.rangesForEpoch = fieldUpdates.newRangesForEpoch;
+
+        if (onFlush!= null)
+            onFlush.run();
+    }
+
+    @Override
+    public void purge(CommandStores commandStores)
+    {
+        for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> e : 
diffsPerCommandStore.entrySet())
+        {
+            int commandStoreId = e.getKey();
             Map<TxnId, List<Diff>> localJournal = e.getValue();
-            CommandStore store = storeSupplier.apply(commandStoreId);
+            CommandStore store = commandStores.forId(commandStoreId);
             if (store == null)
                 continue;
 
-            Map<TxnId, List<Diff>> updates = new HashMap<>();
-            List<TxnId> removals = new ArrayList<>();
             for (Map.Entry<TxnId, List<Diff>> e2 : localJournal.entrySet())
             {
                 TxnId txnId = e2.getKey();
                 List<Diff> diffs = e2.getValue();
-                Command command = reconstruct(diffs, Reconstruct.Last).get(0);
+                Command command =  reconstruct(diffs);
                 if (command.status() == Truncated || command.status() == 
Invalidated)
                     continue; // Already truncated
                 Cleanup cleanup = Cleanup.shouldCleanup(store.agent(), 
command, store.unsafeGetRedundantBefore(), store.durableBefore());
@@ -117,30 +197,40 @@ public class Journal
         }
     }
 
-    public void reconstructAll(InMemoryCommandStore.Loader loader, int 
commandStoreId)
+    @Override
+    public void replay(CommandStores commandStores)
     {
-        // Nothing to do here, journal is empty for this command store
-        if (!diffsPerCommandStore.containsKey(commandStoreId))
-            return;
-
-        // copy to avoid concurrent modification when appending to journal
-        Map<TxnId, List<Diff>> diffs = new 
TreeMap<>(diffsPerCommandStore.get(commandStoreId));
-        for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
-            e.setValue(new ArrayList<>(e.getValue()));
+        OnDone sync = new OnDone() {
+            public void success() {}
+            public void failure(Throwable t) { throw new 
RuntimeException("Caught an exception during replay", t); }
+        };
 
-        List<Command> toApply = new ArrayList<>();
-        for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
+        for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> diffEntry : 
diffsPerCommandStore.entrySet())
         {
-            if (e.getValue().isEmpty()) continue;
-            Command command = reconstruct(commandStoreId, e.getKey(), 
e.getValue());
-            if (command.saveStatus().compareTo(Stable) >= 0 && 
!command.hasBeen(Truncated))
-                toApply.add(command);
-            loader.load(command);
-        }
+            int commandStoreId = diffEntry.getKey();
+            // copy to avoid concurrent modification when appending to journal
+            Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffEntry.getValue());
+
+            InMemoryCommandStore commandStore = (InMemoryCommandStore) 
commandStores.forId(commandStoreId);
+            Loader loader = commandStore.loader();
+
+            for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
+                e.setValue(new ArrayList<>(e.getValue()));
 
-        toApply.sort(Comparator.comparing(Command::executeAt));
-        for (Command command : toApply)
-            loader.apply(command);
+            List<Command> toApply = new ArrayList<>();
+            for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
+            {
+                if (e.getValue().isEmpty()) continue;
+                Command command =  reconstruct(e.getValue());
+                if (command.saveStatus().compareTo(Stable) >= 0 && 
!command.hasBeen(Truncated))
+                    toApply.add(command);
+                loader.load(command, sync);
+            }
+
+            toApply.sort(Comparator.comparing(Command::executeAt));
+            for (Command command : toApply)
+                loader.apply(command, sync);
+        }
     }
 
     static class ErasedList extends AbstractList<Diff>
@@ -211,28 +301,10 @@ public class Journal
         }
     }
 
-    private enum Reconstruct
-    {
-        Each,
-        Last
-    }
-
-    public Command reconstruct(int commandStoreId, TxnId txnId)
-    {
-        return reconstruct(commandStoreId, txnId, 
diffsPerCommandStore.get(commandStoreId).get(txnId));
-    }
-
-    public Command reconstruct(int commandStoreId, TxnId txnId, List<Diff> 
diffs)
-    {
-        return reconstruct(diffs, Reconstruct.Last).get(0);
-    }
-
-    private List<Command> reconstruct(List<Diff> diffs, Reconstruct 
reconstruct)
+    private Command reconstruct(List<Diff> diffs)
     {
         Invariants.checkState(diffs != null && !diffs.isEmpty());
 
-        List<Command> results = new ArrayList<>();
-
         TxnId txnId = null;
         Timestamp executeAt = null;
         Timestamp executesAtLeast = null;
@@ -260,13 +332,7 @@ public class Journal
             if (diff.executesAtLeast != null)
                 executesAtLeast = diff.executesAtLeast.get();
             if (diff.saveStatus != null)
-            {
-                Set<SaveStatus> allowed = new HashSet<>();
-                allowed.add(SaveStatus.TruncatedApply);
-                allowed.add(SaveStatus.TruncatedApplyWithOutcome);
-
                 saveStatus = diff.saveStatus.get();
-            }
             if (diff.durability != null)
                 durability = diff.durability.get();
 
@@ -307,75 +373,68 @@ public class Journal
                     result = null;
                     break;
             }
+        }
 
-            CommonAttributes.Mutable attrs = new 
CommonAttributes.Mutable(txnId);
-            if (partialTxn != null)
-                attrs.partialTxn(partialTxn);
-            if (durability != null)
-                attrs.durability(durability);
-            if (participants != null) attrs.setParticipants(participants);
-            else attrs.setParticipants(StoreParticipants.empty(txnId));
-
-            // TODO (desired): we can simplify this logic if, instead of 
diffing, we will infer the diff from the status
-            if (partialDeps != null &&
-                (saveStatus.known.deps != Known.KnownDeps.NoDeps &&
-                 saveStatus.known.deps != Known.KnownDeps.DepsErased &&
-                 saveStatus.known.deps != Known.KnownDeps.DepsUnknown))
-                attrs.partialDeps(partialDeps);
-            Invariants.checkState(saveStatus != null,
-                                  "Save status is null after applying %s", 
diffs);
+        CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId);
+        if (partialTxn != null)
+            attrs.partialTxn(partialTxn);
+        if (durability != null)
+            attrs.durability(durability);
+        if (participants != null) attrs.setParticipants(participants);
+        else attrs.setParticipants(StoreParticipants.empty(txnId));
+
+        // TODO (desired): we can simplify this logic if, instead of diffing, 
we will infer the diff from the status
+        if (partialDeps != null &&
+            (saveStatus.known.deps != Known.KnownDeps.NoDeps &&
+             saveStatus.known.deps != Known.KnownDeps.DepsErased &&
+             saveStatus.known.deps != Known.KnownDeps.DepsUnknown))
+            attrs.partialDeps(partialDeps);
+
+        try
+        {
 
-            try
+            Command current;
+            switch (saveStatus.status)
             {
-                if (reconstruct == Reconstruct.Each ||
-                    (reconstruct == Reconstruct.Last && i == diffs.size() - 1))
-                {
-                    Command current;
-                    switch (saveStatus.status)
-                    {
-                        case NotDefined:
-                            current = saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
-                                                                             : 
Command.NotDefined.notDefined(attrs, promised);
-                            break;
-                        case PreAccepted:
-                            current = Command.PreAccepted.preAccepted(attrs, 
executeAt, promised);
-                            break;
-                        case AcceptedInvalidate:
-                        case Accepted:
-                        case PreCommitted:
-                            if (saveStatus == SaveStatus.AcceptedInvalidate)
-                                current = 
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, 
acceptedOrCommitted);
-                            else
-                                current = Command.Accepted.accepted(attrs, 
saveStatus, executeAt, promised, acceptedOrCommitted);
-                            break;
-                        case Committed:
-                        case Stable:
-                            current = Command.Committed.committed(attrs, 
saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn);
-                            break;
-                        case PreApplied:
-                        case Applied:
-                            current = Command.Executed.executed(attrs, 
saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes, 
result);
-                            break;
-                        case Invalidated:
-                        case Truncated:
-                            current = truncated(attrs, saveStatus, executeAt, 
executesAtLeast, writes, result);
-                            break;
-                        default:
-                            throw new IllegalStateException("Do not know " + 
saveStatus.status + " " + saveStatus);
-                    }
-
-                    results.add(current);
-                }
+                case NotDefined:
+                    current = saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
+                                                                     : 
Command.NotDefined.notDefined(attrs, promised);
+                    break;
+                case PreAccepted:
+                    current = Command.PreAccepted.preAccepted(attrs, 
executeAt, promised);
+                    break;
+                case AcceptedInvalidate:
+                case Accepted:
+                case PreCommitted:
+                    if (saveStatus == SaveStatus.AcceptedInvalidate)
+                        current = 
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, 
acceptedOrCommitted);
+                    else
+                        current = Command.Accepted.accepted(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted);
+                    break;
+                case Committed:
+                case Stable:
+                    current = Command.Committed.committed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn);
+                    break;
+                case PreApplied:
+                case Applied:
+                    current = Command.Executed.executed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
+                    break;
+                case Invalidated:
+                case Truncated:
+                    current = truncated(attrs, saveStatus, executeAt, 
executesAtLeast, writes, result);
+                    break;
+                default:
+                    throw new IllegalStateException("Do not know " + 
saveStatus.status + " " + saveStatus);
             }
-            catch (Throwable t)
-            {
 
-                throw new RuntimeException("Can not reconstruct from diff:\n" 
+ diffs.stream().map(o -> o.toString())
-                                                                               
      .collect(Collectors.joining("\n")),
-                                           t);
-            }
+            return current;
+        }
+        catch (Throwable t)
+        {
+            throw new RuntimeException("Can not reconstruct from diff:\n" + 
diffs.stream().map(o -> o.toString())
+                                                                               
  .collect(Collectors.joining("\n")),
+                                       t);
         }
-        return results;
     }
 
     private static Command.Truncated truncated(CommonAttributes.Mutable attrs, 
SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes 
writes, Result result)
@@ -397,23 +456,6 @@ public class Journal
         }
     }
 
-    public void onExecute(int commandStoreId, Command before, Command after, 
boolean isPrimary)
-    {
-        if (before == null && after == null)
-            return;
-
-        Diff diff = diff(before, after);
-        if (!isPrimary)
-            diff = diff.asNonPrimary();
-
-        if (diff != null)
-        {
-            diffsPerCommandStore.computeIfAbsent(commandStoreId, (k) -> new 
TreeMap<>())
-                                .computeIfAbsent(after.txnId(), (k_) -> new 
ArrayList<>())
-                                .add(diff);
-        }
-    }
-
     private static class Diff
     {
         public final TxnId txnId;
@@ -470,12 +512,6 @@ public class Journal
             this.result = result;
         }
 
-        // We allow only save status, and waitingOn to be updated by 
non-primary transactions
-        public Diff asNonPrimary()
-        {
-            return new Diff(txnId, null, null, saveStatus, null, null, null, 
null, null, null, waitingOn, null, null);
-        }
-
         public boolean allNulls()
         {
             if (txnId != null) return false;
@@ -546,7 +582,7 @@ public class Journal
                              ifNotEqual(before, after, Command::participants, 
true),
                              ifNotEqual(before, after, Command::partialTxn, 
false),
                              ifNotEqual(before, after, Command::partialDeps, 
false),
-                             ifNotEqual(before, after, Journal::getWaitingOn, 
true),
+                             ifNotEqual(before, after, 
InMemoryJournal::getWaitingOn, true),
                              ifNotEqual(before, after, Command::writes, false),
                              ifNotEqual(before, after, Command::result, 
false));
         if (diff.allNulls())
diff --git 
a/accord-core/src/main-test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/main-test/java/accord/impl/basic/LoggingJournal.java
new file mode 100644
index 00000000..012bbff7
--- /dev/null
+++ b/accord-core/src/main-test/java/accord/impl/basic/LoggingJournal.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.NavigableMap;
+
+import accord.api.Journal;
+import accord.local.Command;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+/**
+ * Logging journal, a wrapper over journal for debugging / inspecting history 
purposes
+ */
+public class LoggingJournal implements Journal
+{
+    private final BufferedWriter log;
+    private final Journal delegate;
+
+    public LoggingJournal(Journal delegate)
+    {
+        this.delegate = delegate;
+        File f = new File("journal.log");
+        try
+        {
+            log = new BufferedWriter(new OutputStreamWriter(new 
FileOutputStream(f)));
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private synchronized void log(String format, Object... objects)
+    {
+        try
+        {
+            log.write(String.format(format, objects));
+            log.flush();
+        }
+        catch (IOException e)
+        {
+            // ignore
+        }
+    }
+
+    public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
+        return delegate.loadCommand(commandStoreId, txnId, redundantBefore, 
durableBefore);
+    }
+
+    public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
+    {
+        log("%d: %s\n", store, update.after);
+        delegate.saveCommand(store, update, onFlush);
+    }
+
+    public void purge(CommandStores commandStores)
+    {
+        log("PURGE\n");
+        delegate.purge(commandStores);
+    }
+
+    public void replay(CommandStores commandStores)
+    {
+        delegate.replay(commandStores);
+    }
+
+    public RedundantBefore loadRedundantBefore(int commandStoreId)
+    {
+        return delegate.loadRedundantBefore(commandStoreId);
+    }
+
+    public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
+    {
+        return delegate.loadBootstrapBeganAt(commandStoreId);
+    }
+
+    public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
+    {
+        return delegate.loadSafeToRead(commandStoreId);
+    }
+
+    public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId)
+    {
+        return delegate.loadRangesForEpoch(commandStoreId);
+    }
+
+    public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
+    {
+        log("%d: %s", store, fieldUpdates);
+        delegate.saveStoreState(store, fieldUpdates, onFlush);
+    }
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
new file mode 100644
index 00000000..1747c97f
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.api;
+
+import java.util.NavigableMap;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.local.Command;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+/**
+ * Persisted journal for transactional recovery.
+ */
+public interface Journal
+{
+    Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore 
redundantBefore, DurableBefore durableBefore);
+    // TODO: use OnDone instead of Runnable
+    void saveCommand(int store, CommandUpdate value, Runnable onFlush);
+
+    void purge(CommandStores commandStores);
+    void replay(CommandStores commandStores);
+
+    RedundantBefore loadRedundantBefore(int commandStoreId);
+    NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId);
+    NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId);
+    CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId);
+
+    void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush);
+
+    // TODO (required): this class can be changed in favour of Writer as 
if/when we switch from DelayedCommandStore to AccordCommandStore
+    // TODO: move to a more appropriate spot?
+    class CommandUpdate
+    {
+        public final TxnId txnId;
+        public final Command before;
+        public final Command after;
+
+        public CommandUpdate(@Nullable Command before, @Nonnull Command after)
+        {
+            this.txnId = after.txnId();
+            this.before = before;
+            this.after = after;
+        }
+    }
+
+    class FieldUpdates
+    {
+        // TODO: use persisted field logic
+        public RedundantBefore newRedundantBefore;
+        public NavigableMap<TxnId, Ranges> newBootstrapBeganAt;
+        public NavigableMap<Timestamp, Ranges> newSafeToRead;
+        public CommandStores.RangesForEpoch.Snapshot newRangesForEpoch;
+
+        public String toString()
+        {
+            return "FieldUpdates{" +
+                   "newRedundantBefore=" + newRedundantBefore +
+                   ", newBootstrapBeganAt=" + newBootstrapBeganAt +
+                   ", newSafeToRead=" + newSafeToRead +
+                   ", newRangesForEpoch=" + newRangesForEpoch +
+                   '}';
+        }
+    }
+
+    /**
+     *
+     */
+    interface Loader
+    {
+        void load(Command next, OnDone onDone);
+        void apply(Command next, OnDone onDone);
+    }
+
+
+    interface OnDone
+    {
+        void success();
+        void failure(Throwable t);
+    }
+}
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java 
b/accord-core/src/main/java/accord/impl/CommandChange.java
new file mode 100644
index 00000000..60d1a209
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+//import java.util.function.Function;
+//
+//import com.google.common.annotations.VisibleForTesting;
+//
+//import accord.api.Agent;
+//import accord.api.Result;
+//import accord.local.Cleanup;
+//import accord.local.Command;
+//import accord.local.CommonAttributes;
+//import accord.local.DurableBefore;
+//import accord.local.RedundantBefore;
+//import accord.local.StoreParticipants;
+//import accord.primitives.Ballot;
+//import accord.primitives.PartialDeps;
+//import accord.primitives.PartialTxn;
+//import accord.primitives.SaveStatus;
+//import accord.primitives.Status;
+//import accord.primitives.Timestamp;
+//import accord.primitives.TxnId;
+//import accord.primitives.Writes;
+//import accord.utils.Invariants;
+//
+//import static accord.impl.CommandChange.Fields.ACCEPTED;
+//import static accord.impl.CommandChange.Fields.CLEANUP;
+//import static accord.impl.CommandChange.Fields.DURABILITY;
+//import static accord.impl.CommandChange.Fields.EXECUTES_AT_LEAST;
+//import static accord.impl.CommandChange.Fields.EXECUTE_AT;
+//import static accord.impl.CommandChange.Fields.FIELDS;
+//import static accord.impl.CommandChange.Fields.PARTIAL_DEPS;
+//import static accord.impl.CommandChange.Fields.PARTIAL_TXN;
+//import static accord.impl.CommandChange.Fields.PARTICIPANTS;
+//import static accord.impl.CommandChange.Fields.PROMISED;
+//import static accord.impl.CommandChange.Fields.SAVE_STATUS;
+//import static accord.impl.CommandChange.Fields.WAITING_ON;
+//import static accord.impl.CommandChange.Fields.WRITES;
+//import static accord.impl.CommandChange.Load.ALL;
+//import static accord.local.Cleanup.NO;
+//import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
+//import static accord.primitives.Known.KnownDeps.DepsErased;
+//import static accord.primitives.Known.KnownDeps.DepsUnknown;
+//import static accord.primitives.Known.KnownDeps.NoDeps;
+//import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
+//import static accord.primitives.Status.Durability.NotDurable;
+//import static accord.utils.Invariants.illegalState;
+//import static 
org.apache.cassandra.service.accord.SavedCommand.MinimalCommand;
+//
+///**
+// * Class representing a change in the Command after its execution by 
SafeCommandStore
+// */
+//public class CommandChange
+//{
+//    // This enum is order-dependent
+//    public enum Fields
+//    {
+//        PARTICIPANTS, // stored first so we can index it
+//        SAVE_STATUS,
+//        PARTIAL_DEPS,
+//        EXECUTE_AT,
+//        EXECUTES_AT_LEAST,
+//        DURABILITY,
+//        ACCEPTED,
+//        PROMISED,
+//        WAITING_ON,
+//        PARTIAL_TXN,
+//        WRITES,
+//        CLEANUP,
+//        ;
+//
+//        public static final Fields[] FIELDS = values();
+//    }
+//
+//    // TODO (required): calculate flags once
+//    private static boolean anyFieldChanged(int flags)
+//    {
+//        return (flags >>> 16) != 0;
+//    }
+//
+//    private static int validateFlags(int flags)
+//    {
+//        Invariants.checkState(0 == (~(flags >>> 16) & (flags & 0xffff)));
+//        return flags;
+//    }
+//
+//    /**
+//     * Collects flags
+//     */
+//    public static int getFlags(Command before, Command after)
+//    {
+//        int flags = 0;
+//
+//        flags = collectFlags(before, after, Command::executeAt, true, 
EXECUTE_AT, flags);
+//        flags = collectFlags(before, after, Command::executesAtLeast, true, 
EXECUTES_AT_LEAST, flags);
+//        flags = collectFlags(before, after, Command::saveStatus, false, 
SAVE_STATUS, flags);
+//        flags = collectFlags(before, after, Command::durability, false, 
DURABILITY, flags);
+//
+//        flags = collectFlags(before, after, Command::acceptedOrCommitted, 
false, ACCEPTED, flags);
+//        flags = collectFlags(before, after, Command::promised, false, 
PROMISED, flags);
+//
+//        flags = collectFlags(before, after, Command::participants, true, 
PARTICIPANTS, flags);
+//        flags = collectFlags(before, after, Command::partialTxn, false, 
PARTIAL_TXN, flags);
+//        flags = collectFlags(before, after, Command::partialDeps, false, 
PARTIAL_DEPS, flags);
+//
+//        // TODO: waitingOn vs WaitingOnWithExecutedAt?
+//        flags = collectFlags(before, after, CommandChange::getWaitingOn, 
true, WAITING_ON, flags);
+//
+//        flags = collectFlags(before, after, Command::writes, false, WRITES, 
flags);
+//
+//        return flags;
+//    }
+//
+//    private static Command.WaitingOn getWaitingOn(Command command)
+//    {
+//        if (command instanceof Command.Committed)
+//            return command.asCommitted().waitingOn();
+//
+//        return null;
+//    }
+//
+//    private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ, 
VAL> convert, boolean allowClassMismatch, CommandChange.Fields field, int flags)
+//    {
+//        VAL l = null;
+//        VAL r = null;
+//        if (lo != null) l = convert.apply(lo);
+//        if (ro != null) r = convert.apply(ro);
+//
+//        if (l == r)
+//            return flags; // no change
+//
+//        if (r == null)
+//            flags = setFieldIsNull(field, flags);
+//
+//        if (l == null || r == null)
+//            return setFieldChanged(field, flags);
+//
+//        assert allowClassMismatch || l.getClass() == r.getClass() : 
String.format("%s != %s", l.getClass(), r.getClass());
+//
+//        if (l.equals(r))
+//            return flags; // no change
+//
+//        return setFieldChanged(field, flags);
+//    }
+//
+//    private static int setFieldChanged(CommandChange.Fields field, int 
oldFlags)
+//    {
+//        return oldFlags | (0x10000 << field.ordinal());
+//    }
+//
+//    @VisibleForTesting
+//    public static boolean getFieldChanged(CommandChange.Fields field, int 
oldFlags)
+//    {
+//        return (oldFlags & (0x10000 << field.ordinal())) != 0;
+//    }
+//
+//    static int toIterableSetFields(int flags)
+//    {
+//        return flags >>> 16;
+//    }
+//
+//    static CommandChange.Fields nextSetField(int iterable)
+//    {
+//        int i = 
Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable));
+//        return i == 32 ? null : FIELDS[i];
+//    }
+//
+//    static int unsetIterableFields(CommandChange.Fields field, int iterable)
+//    {
+//        return iterable & ~(1 << field.ordinal());
+//    }
+//
+//    @VisibleForTesting
+//    static boolean getFieldIsNull(CommandChange.Fields field, int oldFlags)
+//    {
+//        return (oldFlags & (1 << field.ordinal())) != 0;
+//    }
+//
+//    private static int setFieldIsNull(CommandChange.Fields field, int 
oldFlags)
+//    {
+//        return oldFlags | (1 << field.ordinal());
+//    }
+//
+//    public enum Load
+//    {
+//        ALL(0),
+//        PURGEABLE(SAVE_STATUS, PARTICIPANTS, DURABILITY, EXECUTE_AT, WRITES),
+//        MINIMAL(SAVE_STATUS, PARTICIPANTS, EXECUTE_AT);
+//
+//        final int mask;
+//
+//        Load(int mask)
+//        {
+//            this.mask = mask;
+//        }
+//
+//        Load(Fields ... fields)
+//        {
+//            int mask = -1;
+//            for (Fields field : fields)
+//                mask &= ~(1<< field.ordinal());
+//            this.mask = mask;
+//        }
+//    }
+//
+//    /**
+//     * Helper class for reconstructing Command from a sequence of logged 
CommandChanges
+//     */
+//    public static class CommandBuilder
+//    {
+//        final int mask;
+//        int flags;
+//
+//        TxnId txnId;
+//
+//        Timestamp executeAt;
+//        Timestamp executeAtLeast;
+//        SaveStatus saveStatus;
+//        Status.Durability durability;
+//
+//        Ballot acceptedOrCommitted;
+//        Ballot promised;
+//
+//        StoreParticipants participants;
+//        PartialTxn partialTxn;
+//        PartialDeps partialDeps;
+//
+//        byte[] waitingOnBytes;
+//        CommandChange.WaitingOnProvider waitingOn;
+//        Writes writes;
+//        Result result;
+//        Cleanup cleanup;
+//
+//        boolean nextCalled;
+//        int count;
+//
+//        public CommandBuilder(TxnId txnId, CommandChange.Load load)
+//        {
+//            this.mask = load.mask;
+//            init(txnId);
+//        }
+//
+//        public CommandBuilder(TxnId txnId)
+//        {
+//            this(txnId, ALL);
+//        }
+//
+//        public CommandBuilder(CommandChange.Load load)
+//        {
+//            this.mask = load.mask;
+//        }
+//
+//        public CommandBuilder()
+//        {
+//            this(ALL);
+//        }
+//
+//        public TxnId txnId()
+//        {
+//            return txnId;
+//        }
+//
+//        public Timestamp executeAt()
+//        {
+//            return executeAt;
+//        }
+//
+//        public Timestamp executeAtLeast()
+//        {
+//            return executeAtLeast;
+//        }
+//
+//        public SaveStatus saveStatus()
+//        {
+//            return saveStatus;
+//        }
+//
+//        public Status.Durability durability()
+//        {
+//            return durability;
+//        }
+//
+//        public Ballot acceptedOrCommitted()
+//        {
+//            return acceptedOrCommitted;
+//        }
+//
+//        public Ballot promised()
+//        {
+//            return promised;
+//        }
+//
+//        public StoreParticipants participants()
+//        {
+//            return participants;
+//        }
+//
+//        public PartialTxn partialTxn()
+//        {
+//            return partialTxn;
+//        }
+//
+//        public PartialDeps partialDeps()
+//        {
+//            return partialDeps;
+//        }
+//
+//        public CommandChange.WaitingOnProvider waitingOn()
+//        {
+//            return waitingOn;
+//        }
+//
+//        public Writes writes()
+//        {
+//            return writes;
+//        }
+//
+//        public Result result()
+//        {
+//            return result;
+//        }
+//
+//        public void clear()
+//        {
+//            flags = 0;
+//            txnId = null;
+//
+//            executeAt = null;
+//            executeAtLeast = null;
+//            saveStatus = null;
+//            durability = null;
+//
+//            acceptedOrCommitted = null;
+//            promised = null;
+//
+//            participants = null;
+//            partialTxn = null;
+//            partialDeps = null;
+//
+//            waitingOnBytes = null;
+//            waitingOn = null;
+//            writes = null;
+//            result = null;
+//            cleanup = null;
+//
+//            nextCalled = false;
+//            count = 0;
+//        }
+//
+//        public void reset(TxnId txnId)
+//        {
+//            clear();
+//            init(txnId);
+//        }
+//
+//        public void init(TxnId txnId)
+//        {
+//            this.txnId = txnId;
+//            durability = NotDurable;
+//            acceptedOrCommitted = promised = Ballot.ZERO;
+//            waitingOn = (txn, deps) -> null;
+//            // TODO
+//            //result = CommandSerializers.APPLIED;
+//        }
+//
+//        public boolean isEmpty()
+//        {
+//            return !nextCalled;
+//        }
+//
+//        public int count()
+//        {
+//            return count;
+//        }
+//
+//        public Cleanup shouldCleanup(Agent agent, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
+//        {
+//            if (!nextCalled)
+//                return NO;
+//
+//            if (saveStatus == null || participants == null)
+//                return Cleanup.NO;
+//
+//            Cleanup cleanup = Cleanup.shouldCleanupPartial(agent, txnId, 
saveStatus, durability, participants, redundantBefore, durableBefore);
+//            if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
+//                cleanup = this.cleanup;
+//            return cleanup;
+//        }
+//
+//        // TODO (expected): avoid allocating new builder
+//        public CommandBuilder maybeCleanup(Cleanup cleanup)
+//        {
+//            if (saveStatus() == null)
+//                return this;
+//
+//            switch (cleanup)
+//            {
+//                case EXPUNGE:
+//                case ERASE:
+//                    return null;
+//
+//                case EXPUNGE_PARTIAL:
+//                    return expungePartial(cleanup, saveStatus, true);
+//
+//                case VESTIGIAL:
+//                case INVALIDATE:
+//                    return saveStatusOnly();
+//
+//                case TRUNCATE_WITH_OUTCOME:
+//                case TRUNCATE:
+//                    return expungePartial(cleanup, cleanup.appliesIfNot, 
cleanup == TRUNCATE_WITH_OUTCOME);
+//
+//                case NO:
+//                    return this;
+//                default:
+//                    throw new IllegalStateException("Unknown cleanup: " + 
cleanup);}
+//        }
+//
+//        public CommandBuilder expungePartial(Cleanup cleanup, SaveStatus 
saveStatus, boolean includeOutcome)
+//        {
+//            Invariants.checkState(txnId != null);
+//            CommandBuilder builder = new CommandBuilder(txnId, ALL);
+//
+//            builder.count++;
+//            builder.nextCalled = true;
+//
+//            Invariants.checkState(saveStatus != null);
+//            builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
+//            builder.saveStatus = saveStatus;
+//            builder.flags = setFieldChanged(CLEANUP, builder.flags);
+//            builder.cleanup = cleanup;
+//            if (executeAt != null)
+//            {
+//                builder.flags = setFieldChanged(EXECUTE_AT, builder.flags);
+//                builder.executeAt = executeAt;
+//            }
+//            if (durability != null)
+//            {
+//                builder.flags = setFieldChanged(DURABILITY, builder.flags);
+//                builder.durability = durability;
+//            }
+//            if (participants != null)
+//            {
+//                builder.flags = setFieldChanged(PARTICIPANTS, builder.flags);
+//                builder.participants = participants;
+//            }
+//            if (includeOutcome && builder.writes != null)
+//            {
+//                builder.flags = setFieldChanged(WRITES, builder.flags);
+//                builder.writes = writes;
+//            }
+//
+//            return builder;
+//        }
+//
+//        public CommandBuilder saveStatusOnly()
+//        {
+//            Invariants.checkState(txnId != null);
+//            CommandBuilder builder = new CommandBuilder(txnId, ALL);
+//
+//            builder.count++;
+//            builder.nextCalled = true;
+//
+//            // TODO: these accesses can be abstracted away
+//            if (saveStatus != null)
+//            {
+//                builder.flags = setFieldChanged(SAVE_STATUS, builder.flags);
+//                builder.saveStatus = saveStatus;
+//            }
+//
+//            return builder;
+//        }
+//
+//        public MinimalCommand asMinimal()
+//        {
+//            return new MinimalCommand(txnId, saveStatus, participants, 
durability, executeAt, writes);
+//        }
+//
+//        @VisibleForTesting
+//        public void forceResult(Result newValue)
+//        {
+//            this.result = newValue;
+//        }
+//
+//        public Command construct()
+//        {
+//            if (!nextCalled)
+//                return null;
+//
+//            Invariants.checkState(txnId != null);
+//            CommonAttributes.Mutable attrs = new 
CommonAttributes.Mutable(txnId);
+//            if (partialTxn != null)
+//                attrs.partialTxn(partialTxn);
+//            if (durability != null)
+//                attrs.durability(durability);
+//            if (participants != null)
+//                attrs.setParticipants(participants);
+//            if (partialDeps != null &&
+//                (saveStatus.known.deps != NoDeps &&
+//                 saveStatus.known.deps != DepsErased &&
+//                 saveStatus.known.deps != DepsUnknown))
+//                attrs.partialDeps(partialDeps);
+//
+//            Command.WaitingOn waitingOn = null;
+//            if (this.waitingOn != null)
+//                waitingOn = this.waitingOn.provide(txnId, partialDeps);
+//
+//            switch (saveStatus.status)
+//            {
+//                case NotDefined:
+//                    return saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
+//                                                                  : 
Command.NotDefined.notDefined(attrs, promised);
+//                case PreAccepted:
+//                    return Command.PreAccepted.preAccepted(attrs, executeAt, 
promised);
+//                case AcceptedInvalidate:
+//                case Accepted:
+//                case PreCommitted:
+//                    if (saveStatus == SaveStatus.AcceptedInvalidate)
+//                        return 
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, 
acceptedOrCommitted);
+//                    else
+//                        return Command.Accepted.accepted(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted);
+//                case Committed:
+//                case Stable:
+//                    return Command.Committed.committed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn);
+//                case PreApplied:
+//                case Applied:
+//                    return Command.Executed.executed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
+//                case Truncated:
+//                case Invalidated:
+//                    return truncated(attrs, saveStatus, executeAt, 
executeAtLeast, writes, result);
+//                default:
+//                    throw new IllegalStateException();
+//            }
+//        }
+//
+//        private static Command.Truncated truncated(CommonAttributes.Mutable 
attrs, SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, 
Writes writes, Result result)
+//        {
+//            switch (status)
+//            {
+//                default:
+//                    throw illegalState("Unhandled SaveStatus: " + status);
+//                case TruncatedApplyWithOutcome:
+//                case TruncatedApplyWithDeps:
+//                case TruncatedApply:
+//                    if (status != TruncatedApplyWithOutcome)
+//                        result = null;
+//                    if (attrs.txnId().kind().awaitsOnlyDeps())
+//                        return Command.Truncated.truncatedApply(attrs, 
status, executeAt, writes, result, executesAtLeast);
+//                    return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, result, null);
+//                case ErasedOrVestigial:
+//                    return 
Command.Truncated.erasedOrInvalidOrVestigial(attrs.txnId(), attrs.durability(), 
attrs.participants());
+//                case Erased:
+//                    return Command.Truncated.erased(attrs.txnId(), 
attrs.durability(), attrs.participants());
+//                case Invalidated:
+//                    return Command.Truncated.invalidated(attrs.txnId());
+//            }
+//        }
+//
+//        public String toString()
+//        {
+//            return "Diff {" +
+//                   "txnId=" + txnId +
+//                   ", executeAt=" + executeAt +
+//                   ", saveStatus=" + saveStatus +
+//                   ", durability=" + durability +
+//                   ", acceptedOrCommitted=" + acceptedOrCommitted +
+//                   ", promised=" + promised +
+//                   ", participants=" + participants +
+//                   ", partialTxn=" + partialTxn +
+//                   ", partialDeps=" + partialDeps +
+//                   ", waitingOn=" + waitingOn +
+//                   ", writes=" + writes +
+//                   '}';
+//        }
+//    }
+//
+//    public interface WaitingOnProvider
+//    {
+//        Command.WaitingOn provide(TxnId txnId, PartialDeps deps);
+//    }
+//}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java 
b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
index bd370b93..ada18fc9 100644
--- a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
+++ b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
@@ -36,6 +36,7 @@ import accord.api.Scheduler;
 import accord.coordinate.CoordinateGloballyDurable;
 import accord.coordinate.CoordinationFailed;
 import accord.coordinate.ExecuteSyncPoint.SyncPointErased;
+import accord.coordinate.Timeout;
 import accord.local.Node;
 import accord.local.ShardDistributor;
 import accord.primitives.FullRoute;
@@ -313,7 +314,10 @@ public class DurabilityScheduling implements 
ConfigurationService.Listener
                                 index *= 2;
                                 numberOfSplits *= 2;
                             }
-                            logger.warn("{}: Exception coordinating 
ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + 
numberOfSplits, syncId, ranges, fail);
+                            if (fail instanceof Timeout)
+                                logger.trace("{}: Exception coordinating 
ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + 
numberOfSplits, syncId, ranges, fail);
+                            else
+                                logger.debug("{}: Exception coordinating 
ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + 
numberOfSplits, syncId, ranges, fail);
                         }
                     }
                     else
@@ -335,7 +339,10 @@ public class DurabilityScheduling implements 
ConfigurationService.Listener
                     .addCallback((success, fail) -> {
                         if (fail != null && fail.getClass() != 
SyncPointErased.class)
                         {
-                            logger.debug("Exception coordinating shard 
durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail);
+                            if (fail instanceof Timeout)
+                                logger.trace("Exception coordinating shard 
durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail);
+                            else
+                                logger.debug("Exception coordinating shard 
durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail);
                             retryCoordinateDurability(node, 
exclusiveSyncPoint, nextIndex);
                         }
                         else
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index e50998b4..2322186c 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -44,6 +44,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.RoutingKey;
 import accord.impl.progresslog.DefaultProgressLog;
@@ -130,10 +131,12 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     protected Timestamp maxRedundant = Timestamp.NONE;
 
     private InMemorySafeStore current;
+    private final Journal.Loader loader;
 
     public InMemoryCommandStore(int id, NodeCommandStoreService time, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder)
     {
         super(id, time, agent, store, progressLogFactory, listenersFactory, 
epochUpdateHolder);
+        this.loader = new CommandLoader(this);
     }
 
     protected boolean canExposeUnloaded()
@@ -1357,88 +1360,107 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         unsafeSetRejectBefore(new RejectBefore());
     }
 
-    public interface Loader
+    public Journal.Loader loader()
     {
-        void load(Command next);
-        void apply(Command next);
+        return loader;
     }
 
-    public Loader loader()
+    private static class CommandLoader implements Journal.Loader
     {
-        return new Loader()
+        private final InMemoryCommandStore commandStore;
+
+        private CommandLoader(InMemoryCommandStore commandStore)
         {
-            private PreLoadContext context(Command command, KeyHistory 
keyHistory)
-            {
-                TxnId txnId = command.txnId();
-                AbstractUnseekableKeys keys = null;
+            this.commandStore = commandStore;
+        }
 
-                if (CommandsForKey.manages(txnId))
-                    keys = (AbstractUnseekableKeys) 
command.participants().hasTouched();
-                else if (!CommandsForKey.managesExecution(txnId) && 
command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated))
-                    keys = command.asCommitted().waitingOn.keys;
+        private PreLoadContext context(Command command, KeyHistory keyHistory)
+        {
+            TxnId txnId = command.txnId();
+            AbstractUnseekableKeys keys = null;
 
-                if (keys != null)
-                {
-                    return PreLoadContext.contextFor(txnId, keys, keyHistory);
-                }
+            if (CommandsForKey.manages(txnId))
+                keys = (AbstractUnseekableKeys) 
command.participants().hasTouched();
+            else if (!CommandsForKey.managesExecution(txnId) && 
command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated))
+                keys = command.asCommitted().waitingOn.keys;
 
-                return PreLoadContext.contextFor(txnId);
+            if (keys != null)
+            {
+                return PreLoadContext.contextFor(txnId, keys, keyHistory);
             }
 
-            public void load(Command command)
-            {
-                TxnId txnId = command.txnId();
-
-                executeInContext(InMemoryCommandStore.this,
-                                 context(command, ASYNC),
-                                 safeStore -> {
-                                     Command local = command;
-                                     if (local.status() != Truncated && 
local.status() != Invalidated)
-                                     {
-                                         Cleanup cleanup = 
Cleanup.shouldCleanup(safeStore, local, local.participants());
-                                         switch (cleanup)
-                                         {
-                                             case NO:
-                                                 break;
-                                             case INVALIDATE:
-                                             case TRUNCATE_WITH_OUTCOME:
-                                             case TRUNCATE:
-                                             case ERASE:
-                                                 local = Commands.purge(local, 
local.participants(), cleanup);
-                                         }
-                                     }
-
-                                     local = 
safeStore.unsafeGet(txnId).update(safeStore, local);
-                                     if (local.status() == Truncated)
-                                         
safeStore.progressLog().clear(local.txnId());
-                                     return local;
-                                 });
+            return PreLoadContext.contextFor(txnId);
+        }
 
+        @Override
+        public void load(Command command, Journal.OnDone onDone)
+        {
+            TxnId txnId = command.txnId();
 
+            try
+            {
+                commandStore.executeInContext(commandStore,
+                                              context(command, ASYNC),
+                                              safeStore -> {
+                                                  Command local = command;
+                                                  if (local.status() != 
Truncated && local.status() != Invalidated)
+                                                  {
+                                                      Cleanup cleanup = 
Cleanup.shouldCleanup(safeStore, local, local.participants());
+                                                      switch (cleanup)
+                                                      {
+                                                          case NO:
+                                                              break;
+                                                          case INVALIDATE:
+                                                          case 
TRUNCATE_WITH_OUTCOME:
+                                                          case TRUNCATE:
+                                                          case ERASE:
+                                                              local = 
Commands.purge(local, local.participants(), cleanup);
+                                                      }
+                                                  }
+
+                                                  local = 
safeStore.unsafeGet(txnId).update(safeStore, local);
+                                                  if (local.status() == 
Truncated)
+                                                      
safeStore.progressLog().clear(local.txnId());
+                                                  return local;
+                                              });
+                onDone.success();
             }
-
-            public void apply(Command command)
+            catch (Throwable t)
             {
-                TxnId txnId = command.txnId();
+                onDone.failure(t);
+            }
+        }
+
+        @Override
+        public void apply(Command command, Journal.OnDone onDone)
+        {
+            TxnId txnId = command.txnId();
 
+            try
+            {
                 PreLoadContext context = context(command, 
KeyHistory.TIMESTAMPS);
-                executeInContext(InMemoryCommandStore.this,
-                                 context,
-                                 safeStore -> {
-                                     SafeCommand safeCommand = 
safeStore.unsafeGet(txnId);
-                                     Command local = safeCommand.current();
-                                     if (local.is(Stable) || 
local.is(PreApplied))
-                                     {
-                                         Commands.maybeExecute(safeStore, 
safeCommand, local, true, true);
-                                     }
-                                     else if 
(local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated))
-                                     {
-                                         unsafeApplyWrites(safeStore, 
safeCommand, local);
-                                     }
-                                     return null;
-                                 });
+                commandStore.executeInContext(commandStore,
+                                              context,
+                                              safeStore -> {
+                                                  SafeCommand safeCommand = 
safeStore.unsafeGet(txnId);
+                                                  Command local = 
safeCommand.current();
+                                                  if (local.is(Stable) || 
local.is(PreApplied))
+                                                  {
+                                                      
Commands.maybeExecute(safeStore, safeCommand, local, true, true);
+                                                  }
+                                                  else if 
(local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated))
+                                                  {
+                                                      
unsafeApplyWrites(safeStore, safeCommand, local);
+                                                  }
+                                                  return null;
+                                              });
+                onDone.success();
             }
-        };
+            catch (Throwable t)
+            {
+                onDone.failure(t);
+            }
+        }
     }
 
     public static void unsafeApplyWrites(SafeCommandStore safeStore, 
SafeCommand safeCommand, Command command)
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 6fb4ae04..f7eb397b 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -18,6 +18,7 @@
 
 package accord.local;
 
+import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.DataStore;
@@ -207,6 +208,8 @@ public abstract class CommandStore implements AgentExecutor
         return id;
     }
 
+    public abstract Journal.Loader loader();
+
     @Override
     public Agent agent()
     {
diff --git 
a/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java 
b/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
index e86a1805..1ee926df 100644
--- a/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
+++ b/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
@@ -75,7 +75,8 @@ public class ReadEphemeralTxnData extends ReadData
     private ReadEphemeralTxnData(TxnId txnId, Participants<?> readScope, 
Route<?> scope, long executeAtEpoch, @Nonnull Txn txn, @Nonnull Deps deps, 
@Nonnull FullRoute<?> route)
     {
         super(txnId, readScope.intersecting(scope), executeAtEpoch);
-        Invariants.checkState(executeAtEpoch == txnId.epoch());
+        Invariants.checkState(executeAtEpoch == txnId.epoch(),
+                              "Epoch for transaction %s (%d) did not match 
expected %d", txn, txnId.epoch(), executeAtEpoch);
         this.route = route;
         this.partialTxn = txn.intersecting(scope, false);
         this.partialDeps = deps.intersecting(scope);
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index bfbb7c7b..740b705d 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -52,6 +52,7 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Journal;
 import accord.api.Key;
 import accord.burn.random.FrequentLargeRange;
 import accord.impl.MessageListener;
@@ -59,6 +60,7 @@ import accord.impl.PrefixedIntHashKey;
 import accord.impl.TopologyFactory;
 import accord.impl.basic.Cluster;
 import accord.impl.basic.Cluster.Stats;
+import accord.impl.basic.InMemoryJournal;
 import accord.impl.basic.Packet;
 import accord.impl.basic.PendingQueue;
 import accord.impl.basic.PendingRunnable;
@@ -295,14 +297,14 @@ public class BurnTest
         random2.setSeed(seed);
         ExecutorService exec = Executors.newFixedThreadPool(2);
         RandomDelayQueue.ReconcilingQueueFactory factory = new 
RandomDelayQueue.ReconcilingQueueFactory(seed);
-        Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory, 
clients, nodes, keyCount, operations, concurrency, factory.get(true)));
-        Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory, 
clients, nodes, keyCount, operations, concurrency, factory.get(false)));
+        Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory, 
clients, nodes, keyCount, operations, concurrency, factory.get(true), 
InMemoryJournal::new));
+        Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory, 
clients, nodes, keyCount, operations, concurrency, factory.get(false), 
InMemoryJournal::new));
         exec.shutdown();
         f1.get();
         f2.get();
     }
 
-    static void burn(RandomSource random, TopologyFactory topologyFactory, 
List<Id> clients, List<Id> nodes, int keyCount, int operations, int 
concurrency, PendingQueue pendingQueue)
+    static void burn(RandomSource random, TopologyFactory topologyFactory, 
List<Id> clients, List<Id> nodes, int keyCount, int operations, int 
concurrency, PendingQueue pendingQueue, Function<Id, Journal> journalFactory)
     {
         List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
         AtomicLong progress = new AtomicLong();
@@ -462,8 +464,8 @@ public class BurnTest
                                           responseSink, random::fork, 
nowSupplier,
                                           topologyFactory, 
initialRequests::poll,
                                           onSubmitted::set,
-                                          ignore -> {}
-            );
+                                          ignore -> {},
+                                          journalFactory);
             for (Verifier verifier : validators.values())
                 verifier.close();
         }
@@ -622,12 +624,13 @@ public class BurnTest
             List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
 
             burn(random, new TopologyFactory(rf, ranges(0, HASH_RANGE_START, 
HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() * 
3))),
-                    clients,
-                    nodes,
-                    5 + random.nextInt(15),
-                    operations,
-                    10 + random.nextInt(30),
-                 new Factory(random).get());
+                 clients,
+                 nodes,
+                 5 + random.nextInt(15),
+                 operations,
+                 10 + random.nextInt(30),
+                 new Factory(random).get(),
+                 InMemoryJournal::new);
         }
         catch (Throwable t)
         {
diff --git 
a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java 
b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
index 0688fa5a..dc8b553a 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
@@ -18,6 +18,7 @@
 
 package accord.coordinate;
 
+import java.time.Duration;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -67,7 +68,6 @@ import accord.utils.async.AsyncResult;
 import static accord.Utils.id;
 import static accord.Utils.ids;
 import static accord.Utils.ranges;
-import static accord.Utils.spinUntilSuccess;
 import static accord.Utils.writeTxn;
 import static accord.impl.IntKey.key;
 import static accord.impl.IntKey.keys;
@@ -87,6 +87,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
 
 public class CoordinateTransactionTest
 {
@@ -416,4 +418,19 @@ public class CoordinateTransactionTest
     {
         return keys.toRoute(keys.get(0).toUnseekable());
     }
+
+    public static void spinUntilSuccess(ThrowingRunnable runnable)
+    {
+        spinUntilSuccess(runnable, 10);
+    }
+
+    public static void spinUntilSuccess(ThrowingRunnable runnable, int 
timeoutInSeconds)
+    {
+        Awaitility.await()
+                  .pollInterval(Duration.ofMillis(100))
+                  .pollDelay(0, TimeUnit.MILLISECONDS)
+                  .atMost(timeoutInSeconds, TimeUnit.SECONDS)
+                  .ignoreExceptions()
+                  .untilAsserted(runnable);
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java 
b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
index 7f1f81b8..9ea735c4 100644
--- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
 
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.Journal;
 import accord.api.ProgressLog;
 import accord.api.RemoteListeners;
 import accord.api.RemoteListeners.Registration;
@@ -394,6 +395,12 @@ public class RemoteListenersTest
             this.storeId = id;
         }
 
+        @Override
+        public Journal.Loader loader()
+        {
+            throw new UnsupportedOperationException();
+        }
+
         @Override public boolean inStore() { return false; }
         @Override public AsyncChain<Void> execute(PreLoadContext context, 
Consumer<? super SafeCommandStore> consumer) { return null; }
         @Override public <T> AsyncChain<T> submit(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply) { return null; }
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 50b2b97c..1123ab73 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import accord.api.Agent;
 import accord.api.Data;
 import accord.api.DataStore;
+import accord.api.Journal;
 import accord.api.ProgressLog;
 import accord.api.ProgressLog.BlockedUntil;
 import accord.api.Query;
@@ -949,6 +950,11 @@ public class CommandsForKeyTest
             return true;
         }
 
+        public Journal.Loader loader()
+        {
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public Agent agent()
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to