This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e587fc4c Standardise Replay logic to ensure SafeCommandStore.update is 
called Also Fix:  - Initialise home state when calling waiting() if not already 
initialised  - Don't reportClosed/reportRetired for epochs that are already 
closed/retired
e587fc4c is described below

commit e587fc4c090aeb74a363af43295e0f7c7825445a
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Jul 5 09:57:13 2025 +0100

    Standardise Replay logic to ensure SafeCommandStore.update is called
    Also Fix:
     - Initialise home state when calling waiting() if not already initialised
     - Don't reportClosed/reportRetired for epochs that are already 
closed/retired
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20750
---
 accord-core/src/main/java/accord/api/Journal.java  |  2 +-
 .../src/main/java/accord/impl/AbstractLoader.java  | 36 +++++----
 .../java/accord/impl/InMemoryCommandStore.java     | 43 +++++-----
 .../impl/progresslog/DefaultProgressLog.java       | 91 +++++++++++++---------
 .../src/main/java/accord/local/Commands.java       |  4 +-
 .../src/main/java/accord/local/SafeCommand.java    |  2 +-
 .../main/java/accord/topology/TopologyManager.java | 28 ++++---
 .../java/accord/impl/basic/InMemoryJournal.java    |  2 +-
 8 files changed, 113 insertions(+), 95 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index 99806e2d..f491aafd 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -175,7 +175,7 @@ public interface Journal
      */
     interface Loader
     {
-        AsyncChain<Command> load(TxnId txnId);
+        AsyncChain<?> load(TxnId txnId);
     }
 
 
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java 
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
index 9020b96a..3e0828d7 100644
--- a/accord-core/src/main/java/accord/impl/AbstractLoader.java
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -18,18 +18,16 @@
 
 package accord.impl;
 
-import java.util.function.BiConsumer;
-
 import accord.api.Journal;
-import accord.local.Cleanup;
 import accord.local.Command;
+import accord.local.CommandStore;
 import accord.local.Commands;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
+import accord.primitives.Participants;
 import accord.primitives.SaveStatus;
 import accord.primitives.TxnId;
 
-import static accord.local.Cleanup.Input.FULL;
 import static accord.primitives.SaveStatus.PreApplied;
 import static accord.primitives.Status.Stable;
 import static accord.primitives.Status.Truncated;
@@ -37,27 +35,31 @@ import static accord.primitives.Txn.Kind.Write;
 
 public abstract class AbstractLoader implements Journal.Loader
 {
-    protected Command loadInternal(Command command, SafeCommandStore safeStore)
-    {
-        TxnId txnId = command.txnId();
-        Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command, 
command.participants());
-        if (cleanup != Cleanup.NO)
-            command = Commands.purge(safeStore, command, cleanup);
-
-        return safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
-    }
-
-    protected void maybeApplyWrites(TxnId txnId, SafeCommandStore safeStore, 
BiConsumer<SafeCommand, Command> apply)
+    protected void maybeApplyWrites(SafeCommandStore safeStore, TxnId txnId)
     {
         SafeCommand safeCommand = safeStore.unsafeGet(txnId);
         Command command = safeCommand.current();
         if (command.is(Stable) || command.saveStatus() == PreApplied)
         {
-            Commands.maybeExecute(safeStore, safeCommand, command, true, true);
+            if (Commands.maybeExecute(safeStore, safeCommand, command, true, 
true))
+                return;
         }
         else if (command.txnId().is(Write) && 
command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && 
!command.hasBeen(Truncated))
         {
-            apply.accept(safeCommand, command);
+            CommandStore unsafeStore = safeStore.commandStore();
+            Command.Executed executed = command.asExecuted();
+            Participants<?> executes = executed.participants().stillExecutes();
+            if (!executes.isEmpty())
+            {
+                command.writes()
+                       .apply(safeStore, executes, command.partialTxn())
+                       .invoke(() -> unsafeStore.build(txnId, ss -> {
+                           Commands.postApply(ss, txnId, -1, true);
+                       }))
+                       .begin(safeStore.agent());
+                return;
+            }
         }
+        safeCommand.update(safeStore, command, true);
     }
 }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 31c7dfbf..08d59d83 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -49,13 +49,13 @@ import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
-import accord.api.Write;
 import accord.impl.progresslog.DefaultProgressLog;
 import accord.local.Cleanup;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores.RangesForEpoch;
 import accord.local.CommandSummaries;
+import accord.local.Commands;
 import accord.local.KeyHistory;
 import accord.local.NodeCommandStoreService;
 import accord.local.PreLoadContext;
@@ -84,7 +84,6 @@ import accord.primitives.Unseekables;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
 import accord.utils.async.Cancellable;
 import org.agrona.collections.ObjectHashSet;
 
@@ -1169,46 +1168,42 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             return txnId;
         }
 
-        private AsyncChain<Command> load(Command command)
+        protected TxnId loadInternal(Command command, SafeCommandStore 
safeStore)
+        {
+            TxnId txnId = command.txnId();
+            Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command, 
command.participants());
+            if (cleanup != Cleanup.NO)
+                command = Commands.purge(safeStore, command, cleanup);
+
+            safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
+            return txnId;
+        }
+
+        private AsyncChain<TxnId> load(Command command)
         {
             return 
AsyncChains.success(commandStore.executeInContext(commandStore,
                                                                      
context(command, ASYNC),
                                                                      
(SafeCommandStore safeStore) -> loadInternal(command, safeStore)));
         }
 
-        private AsyncChain<Command> apply(Command command)
+        private AsyncChain<Void> apply(TxnId txnId)
         {
             return 
AsyncChains.success(commandStore.executeInContext(commandStore,
-                                                                     
context(command, SYNC),
+                                                                     txnId,
                                                                      
(SafeCommandStore safeStore) -> {
-                                                                         
maybeApplyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
-                                                                             
applyWritesSync(safeStore, safeCommand, cmd);
-                                                                         });
-                                                                         
return command;
+                                                                         
maybeApplyWrites(safeStore, txnId);
+                                                                         
return null;
                                                                      }));
         }
 
-        private void applyWritesSync(SafeCommandStore safeStore, SafeCommand 
safeCommand, Command command)
-        {
-            Command.Executed executed = command.asExecuted();
-            Participants<?> executes = executed.participants().stillExecutes();
-            if (!executes.isEmpty())
-            {
-                AsyncResult<Void> result = 
command.writes().apply(Write.InMemoryWrite::applySync, 
safeStore.commandStore(), executes, command.partialTxn()).beginAsResult();
-                Invariants.require(result.isDone());
-                safeCommand.applied(safeStore);
-                safeStore.notifyListeners(safeCommand, command);
-            }
-        }
-
         @Override
-        public AsyncChain<Command> load(TxnId txnId)
+        public AsyncChain<Void> load(TxnId txnId)
         {
             // TODO (required): consider this race condition some more:
             //      - can we avoid double-applying?
             //      - is this definitely safe?
             if (commandStore.hasCommand(txnId))
-                return apply(commandStore.command(txnId).value());
+                return apply(txnId);
 
             Command command = 
commandStore.journal.loadCommand(commandStore.id, txnId, 
commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
             if (command == null)
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 469f551d..f349e8e0 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -195,41 +195,8 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         TxnState state = null;
         Route<?> beforeRoute = before.route();
         Route<?> afterRoute = after.route();
-        if (afterRoute != null && (beforeRoute == null || force))
-        {
-            RoutingKey homeKey = afterRoute.homeKey();
-            Ranges coordinateRanges = safeStore.coordinateRanges(txnId);
-            boolean isHome = coordinateRanges.contains(homeKey);
-            state = get(txnId);
-            if (isHome)
-            {
-                if (state == null)
-                    state = insert(txnId);
-
-                if (after.durability().isDurableOrInvalidated())
-                {
-                    state.setHomeDoneAndMaybeRemove(this);
-                    state = maybeFetch(safeStore, txnId, after, state);
-                }
-                else
-                {
-                    state.set(safeStore, this, Undecided, Queued);
-                }
-            }
-            else if (state != null)
-            {
-                // not home shard
-                state.setHomeDone(this);
-            }
-        }
-        else if (after.durability().isDurableOrInvalidated() && (force || 
!before.durability().isDurableOrInvalidated()))
-        {
-            state = get(txnId);
-            if (state != null)
-                state.setHomeDoneAndMaybeRemove(this);
-
-            state = maybeFetch(safeStore, txnId, after, state);
-        }
+        if (force || (afterRoute != null && beforeRoute == null) || 
(after.durability().isDurableOrInvalidated() && 
!before.durability().isDurableOrInvalidated()))
+            state = updateHomeState(safeStore, after, get(txnId));
 
         SaveStatus beforeSaveStatus = before.saveStatus();
         SaveStatus afterSaveStatus = after.saveStatus();
@@ -260,7 +227,51 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         }
     }
 
-    private TxnState maybeFetch(SafeCommandStore safeStore, TxnId txnId, 
Command after, TxnState state)
+    private TxnState updateHomeState(SafeCommandStore safeStore, Command 
after, @Nullable TxnState state)
+    {
+        Route<?> route = after.route();
+        if (after.durability().isDurableOrInvalidated())
+        {
+            // command is durable, so we don't need to coordinate it - whether 
we're the home shard or not
+            if (state != null)
+                state.setHomeDone(this);
+
+            // ... and we should be able to fetch its outcome if we need it
+            state = maybeFetch(safeStore, after, state);
+
+            if (state != null && state.maybeRemove(this))
+                state = null;
+
+            return state;
+        }
+
+        if (route == null)
+            return state; // we don't know if we're the home shard
+
+        TxnId txnId = after.txnId();
+        RoutingKey homeKey = route.homeKey();
+        Ranges coordinateRanges = safeStore.coordinateRanges(txnId);
+        boolean isHome = coordinateRanges.contains(homeKey);
+        state = get(txnId);
+        if (isHome)
+        {
+            if (state == null)
+                state = insert(txnId);
+
+            Invariants.require(!after.durability().isDurableOrInvalidated());
+            if (!state.isHomeInitialised())
+                state.set(safeStore, this, Undecided, Queued); // initialise
+        }
+        else if (state != null)
+        {
+            // not home shard
+            state.setHomeDone(this);
+        }
+
+        return state;
+    }
+
+    private TxnState maybeFetch(SafeCommandStore safeStore, Command after, 
TxnState state)
     {
         if (after.hasBeen(PreApplied))
             return state;
@@ -273,7 +284,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         {
             // this command should be ready to apply locally, so fetch it
             if (state == null)
-                state = insert(txnId);
+                state = insert(after.txnId());
             state.waiting().setBlockedUntil(safeStore, this, CanApply);
         }
         return state;
@@ -388,6 +399,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         if (!blockedBy.txnId().isVisible())
             return;
 
+
         Command command = blockedBy.current();
         if (command == null) command = uninitialised(blockedBy.txnId());
         SaveStatus saveStatus = command.saveStatus();
@@ -419,7 +431,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         if (blockedOnStoreParticipants != null) updatedParticipants = 
updatedParticipants.supplementOrMerge(saveStatus, blockedOnStoreParticipants);
         if (blockedOnStoreParticipants2 != null) updatedParticipants = 
updatedParticipants.supplementOrMerge(saveStatus, blockedOnStoreParticipants2);
         if (participants != updatedParticipants)
-            update = update.updateParticipants(updatedParticipants);
+            update = command.updateParticipants(updatedParticipants);
 
         if (update != command)
             command = blockedBy.incidentalUpdate(update);
@@ -438,6 +450,9 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         //                             later topology that wasn't covered by 
its coordination
         TxnState state = ensure(blockedBy.txnId());
         state.waiting().setBlockedUntil(safeStore, this, blockedUntil);
+        // in case progress log hasn't been updated (e.g. bug on replay), 
force an update to the command's state since we're about to wait on it
+        if (!state.isHomeInitialised() && command.route() != null)
+            updateHomeState(safeStore, command, state);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 7cbb938b..72bcd3d5 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -618,7 +618,7 @@ public class Commands
         }
     }
 
-    protected static void postApply(SafeCommandStore safeStore, TxnId txnId, 
long t0, boolean forceApply)
+    public static void postApply(SafeCommandStore safeStore, TxnId txnId, long 
t0, boolean forceApply)
     {
         SafeCommand safeCommand = safeStore.get(txnId);
         Command command = safeCommand.current();
@@ -662,7 +662,7 @@ public class Commands
     }
 
     @VisibleForImplementation
-    public static AsyncChain<Void> applyWrites(SafeCommandStore safeStore, 
PreLoadContext context, Command command)
+    public static AsyncChain<Void> replayWrites(SafeCommandStore safeStore, 
PreLoadContext context, Command command)
     {
         CommandStore unsafeStore = safeStore.commandStore();
         Command.Executed executed = command.asExecuted();
diff --git a/accord-core/src/main/java/accord/local/SafeCommand.java 
b/accord-core/src/main/java/accord/local/SafeCommand.java
index fe4024e1..dcccb4c3 100644
--- a/accord-core/src/main/java/accord/local/SafeCommand.java
+++ b/accord-core/src/main/java/accord/local/SafeCommand.java
@@ -64,7 +64,7 @@ public abstract class SafeCommand
         return update(safeStore, update, false);
     }
 
-    private <C extends Command> C update(SafeCommandStore safeStore, C update, 
boolean force)
+    public <C extends Command> C update(SafeCommandStore safeStore, C update, 
boolean force)
     {
         Command prev = current();
         if (prev == update)
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 31f57430..fc30ef31 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -193,24 +193,28 @@ public class TopologyManager
             }
         }
 
-        boolean recordClosed(Ranges ranges)
+        // returns those ranges that weren't already closed, so that they can 
be propagated to lower epochs
+        Ranges recordClosed(Ranges ranges)
         {
-            if (closed.containsAll(ranges))
-                return false;
+            ranges = ranges.without(closed);
+            if (ranges.isEmpty())
+                return ranges;
             closed = closed.union(MERGE_ADJACENT, ranges);
             Invariants.require(closed.mergeTouching() == closed);
-            return true;
+            return ranges.without(addedRanges);
         }
 
-        boolean recordRetired(Ranges ranges)
+        // returns those ranges that weren't already retired, so that they can 
be propagated to lower epochs
+        Ranges recordRetired(Ranges ranges)
         {
-            if (retired.containsAll(ranges))
-                return false;
+            ranges = ranges.without(retired);
+            if (ranges.isEmpty())
+                return ranges;
             closed = closed.union(MERGE_ADJACENT, ranges);
             retired = retired.union(MERGE_ADJACENT, ranges);
             Invariants.require(closed.mergeTouching() == closed);
             Invariants.require(retired.mergeTouching() == retired);
-            return true;
+            return ranges.without(addedRanges);
         }
 
         Topology global()
@@ -434,7 +438,9 @@ public class TopologyManager
                 Invariants.require(epoch < minEpoch(), "Could not find epoch 
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
                 return; // notification came for an already truncated epoch
             }
-            while (epochs[i].recordClosed(ranges) && ++i < epochs.length) {}
+
+            while (!ranges.isEmpty() && i < epochs.length)
+                ranges = epochs[i++].recordClosed(ranges);
         }
 
         /**
@@ -458,8 +464,8 @@ public class TopologyManager
                     return;
             }
 
-            for (int i = retiredIdx; i < epochs.length; i++)
-                epochs[i].recordRetired(ranges);
+            for (int i = retiredIdx; !ranges.isEmpty() && i < epochs.length; 
i++)
+                ranges = epochs[i].recordRetired(ranges);
         }
 
         private Notifications pending(long epoch)
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 220d61f4..8adf14ac 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -614,7 +614,7 @@ public class InMemoryJournal implements Journal
             {
                 if (e.getValue().isEmpty()) continue;
 
-                AsyncResult<Command> res = 
loader.load(e.getKey()).beginAsResult();
+                AsyncResult<?> res = loader.load(e.getKey()).beginAsResult();
                 AsyncChains.getUnchecked(res);
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to