This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch fixes-260226
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 720a89d8fa49b849390f0fcf5a9be24ba294c9e4
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Jan 12 17:33:51 2026 +0000

     - Do not invoke slowReplicaDelay or slowCoordinatorDelay during restore
     - Do not fail if slowReplicaDelay or slowCoordinatorDelay are invoked 
without knowing the transaction
     - DefaultRemoteListeners not correctly synchronised
     - Topology.cloneEquivalentWithEpoch should also share nodeLookup and 
ranges, especially to accelerate computeWaitForEpoch/computeScope
---
 .../java/accord/impl/DefaultRemoteListeners.java   | 75 +++++++++++++---------
 .../java/accord/impl/InMemoryCommandStore.java     |  2 +-
 .../impl/progresslog/DefaultProgressLog.java       | 35 +++++++---
 .../src/main/java/accord/local/CommandStore.java   |  3 +-
 .../src/main/java/accord/local/Commands.java       | 23 ++++---
 .../java/accord/messages/ParticipantsRequest.java  |  1 -
 .../src/main/java/accord/topology/Topology.java    |  2 +-
 .../java/accord/utils/IntrusivePriorityHeap.java   | 14 ++++
 .../test/java/accord/impl/RemoteListenersTest.java |  2 +-
 .../java/accord/impl/basic/InMemoryJournal.java    |  4 +-
 .../java/accord/local/cfk/CommandsForKeyTest.java  |  3 +-
 11 files changed, 107 insertions(+), 57 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
index 4bf35380..33d0a8da 100644
--- a/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
@@ -297,7 +297,7 @@ public class DefaultRemoteListeners implements 
RemoteListeners
             this.end = 1;
         }
 
-        Listeners merge(Listeners that)
+        synchronized Listeners merge(Listeners that)
         {
             int count = 0;
             int i = this.start, j = that.start;
@@ -344,6 +344,41 @@ public class DefaultRemoteListeners implements 
RemoteListeners
             end = count;
             return this;
         }
+
+        synchronized Listeners notify(NotifySink notifySink, SafeCommandStore 
safeStore, SafeCommand safeCommand, Command prev)
+        {
+            int storeId = safeStore.commandStore().id();
+            SaveStatus newStatus = safeCommand.current().saveStatus();
+            Durability newDurability = safeCommand.current().durability();
+
+            StatusListeners[] listeners = statusListeners;
+            for (int i = start ; i < end ; ++i)
+            {
+                StatusListeners listener = listeners[i];
+                if (awaitSaveStatus(listener.await).compareTo(newStatus) > 0)
+                    return this;
+
+                if 
(awaitDurability(listener.await).compareTo(newDurability.decisionOrOutcome()) > 
0)
+                    continue;
+
+                listener.removeWaitingOn(storeId);
+                if (listener.waitingOnCount == 0)
+                {
+                    // if we get invalidated we don't save the route, so we 
take the combined route of before and after the new status
+                    Route<?> route = 
Route.merge(safeCommand.current().route(), prev == null ? null : 
(Route)prev.route());
+                    notifySink.notify(safeCommand.txnId(), newStatus, route, 
listener.listeners, listener.listenerCount);
+                    if (i != start)
+                        System.arraycopy(listeners, start, listeners, start + 
1, i - start);
+                    listeners[start] = null;
+                    ++start;
+                }
+            }
+
+            if (start == end)
+                return null;
+
+            return this;
+        }
     }
 
     class Register implements Registration
@@ -396,7 +431,7 @@ public class DefaultRemoteListeners implements 
RemoteListeners
         }
 
         @Override
-        public int done()
+        public synchronized int done()
         {
             if (count == 0)
                 return 0;
@@ -435,36 +470,14 @@ public class DefaultRemoteListeners implements 
RemoteListeners
     public void notify(SafeCommandStore safeStore, SafeCommand safeCommand, 
Command prev)
     {
         TxnId txnId = safeCommand.txnId();
-        Listeners entry = this.listeners.get(txnId);
-        if (entry == null)
-            return;
-
-        int storeId = safeStore.commandStore().id();
-        SaveStatus newStatus = safeCommand.current().saveStatus();
-        Durability newDurability = safeCommand.current().durability();
-
-        int start = entry.start, end = entry.end;
-        StatusListeners[] listeners = entry.statusListeners;
-        for (int i = start ; i < end ; ++i)
+        Listeners entry = listeners.get(txnId);
+        if (entry != null && null == entry.notify(notifySink, safeStore, 
safeCommand, prev))
         {
-            StatusListeners listener = listeners[i];
-            if (awaitSaveStatus(listener.await).compareTo(newStatus) > 0)
-                return;
-
-            if 
(awaitDurability(listener.await).compareTo(newDurability.decisionOrOutcome()) > 
0)
-                continue;
-
-            listener.removeWaitingOn(storeId);
-            if (listener.waitingOnCount == 0)
-            {
-                // if we get invalidated we don't save the route, so we take 
the combined route of before and after the new status
-                Route<?> route = Route.merge(safeCommand.current().route(), 
prev == null ? null : (Route)prev.route());
-                notifySink.notify(txnId, newStatus, route, listener.listeners, 
listener.listenerCount);
-                if (i != start)
-                    System.arraycopy(listeners, start, listeners, start + 1, i 
- start);
-                listeners[start] = null;
-                start = ++entry.start;
-            }
+            listeners.compute(txnId, (ignore, e) -> {
+                if (e == entry && e.start == e.end)
+                    return null;
+                return e;
+            });
         }
     }
 
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index f0eb6611..be4a9f59 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -1234,7 +1234,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         hasResumedBootstraps = false;
     }
 
-    public Journal.Replayer replayer()
+    public Journal.Replayer replayer(AbstractReplayer.Mode mode)
     {
         return new CommandReplayer(this);
     }
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index fa0f9466..ec85d716 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -915,27 +915,42 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
     public void setMode(ModeFlag flag)
     {
         commandStore.execute((PreLoadContext.Empty)() -> "Set ProgressLog 
ModeFlag", safeStore -> {
-            modeFlags |= TinyEnumSet.encode(flag);
-            if (flag == ModeFlag.HOME_EXPECTS_LOCALLY_APPLIED)
+            setModeExclusive(safeStore, flag);
+        });
+    }
+
+    public boolean setModeExclusive(SafeCommandStore safeStore, ModeFlag flag)
+    {
+        int encoded = TinyEnumSet.encode(flag);
+        if ((modeFlags & encoded) == encoded)
+            return false;
+
+        modeFlags |= TinyEnumSet.encode(flag);
+        if (flag == ModeFlag.HOME_EXPECTS_LOCALLY_APPLIED)
+        {
+            for (TxnState state : BTree.<TxnState>iterable(stateMap))
             {
-                for (TxnState state : BTree.<TxnState>iterable(stateMap))
-                {
-                    // clear the home state and let normal processing decide 
what to do
-                    if (state.homePhase() == Done)
-                        state.set(safeStore, this, Undecided, Queued);
-                }
+                // clear the home state and let normal processing decide what 
to do
+                if (state.homePhase() == Done)
+                    state.set(safeStore, this, Undecided, Queued);
             }
-        });
+        }
+        return true;
     }
 
     @VisibleForImplementation
     public void unsetMode(ModeFlag flag)
     {
         commandStore.executeMaybeImmediately(() -> {
-            modeFlags &= ~TinyEnumSet.encode(flag);
+            unsetModeExclusive(flag);
         });
     }
 
+    public void unsetModeExclusive(ModeFlag flag)
+    {
+        modeFlags &= ~TinyEnumSet.encode(flag);
+    }
+
     boolean isCatchingUp()
     {
         return TinyEnumSet.contains(modeFlags, ModeFlag.CATCH_UP);
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 94f0742b..c543b622 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -35,6 +35,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
+import accord.impl.AbstractReplayer;
 import accord.primitives.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSortedMap;
@@ -222,7 +223,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         return id;
     }
 
-    public abstract Journal.Replayer replayer();
+    public abstract Journal.Replayer replayer(AbstractReplayer.Mode mode);
     // expected to invoke safeStore.upsertRedundantBefore at some future 
point, when the commandStore state is durably persisted
     protected abstract void ensureDurable(Ranges ranges, RedundantBefore 
onCommandStoreDurable);
 
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index e7be67e6..f5348aa9 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -1236,17 +1236,22 @@ public class Commands
         boolean acceptInternal(SafeCommandStore safeStore)
         {
             SafeCommand waitingSafe = safeStore.get(waitingId);
+            PartialDeps partialDeps;
+            {
+                Command waiting = waitingSafe.current();
+                if (waiting.saveStatus().compareTo(Applying) >= 0)
+                    return false;
+
+                partialDeps = waiting.partialDeps();
+                Invariants.require(partialDeps != null, "Trying to execute 
command without partialDeps: %s", waiting);
+            }
+
             SafeCommand depSafe = null;
             if (loadDepId != null)
             {
                 depSafe = safeStore.ifInitialised(loadDepId);
                 if (depSafe == null) // TODO (required): slice to 
waiting.participants().waitsOn? can simplify method
-                {
-                    Command waiting = waitingSafe.current();
-                    if (waiting.saveStatus().compareTo(Applying) >= 0)
-                        return false; // nothing to do
-                    depSafe = initialiseOrRemoveDependency(safeStore, 
waitingSafe, loadDepId, waiting.partialDeps().participants(loadDepId));
-                }
+                    depSafe = initialiseOrRemoveDependency(safeStore, 
waitingSafe, loadDepId, partialDeps.participants(loadDepId));
             }
 
             while (true)
@@ -1257,7 +1262,7 @@ public class Commands
 
                 if (depSafe == null)
                 {
-                    WaitingOn waitingOn = waiting.asCommitted().waitingOn();
+                    WaitingOn waitingOn = waiting.waitingOn();
                     TxnId directlyBlockedOn = waitingOn.nextWaitingOn();
                     if (directlyBlockedOn == null)
                     {
@@ -1300,7 +1305,7 @@ public class Commands
                     if (depExecution.compareTo(WaitingToExecute) < 0 && 
dep.participants().owns().isEmpty())
                     {
                         // TODO (desired): slightly costly to invert a large 
partialDeps collection
-                        participants = 
waiting.partialDeps().participants(dep.txnId());
+                        participants = partialDeps.participants(dep.txnId());
                         Participants<?> waitsOn = 
participants.intersecting(waiting.participants().stillWaitsOn(), Minimal);
 
                         depSafe = maybeCleanupRedundantDependency(safeStore, 
waitingSafe, depSafe, waitsOn);
@@ -1352,7 +1357,7 @@ public class Commands
                         case CleaningUp:
                             updateDependencyAndMaybeExecute(safeStore, 
waitingSafe, depSafe, false);
                             waiting = waitingSafe.current();
-                            
Invariants.require(waiting.saveStatus().compareTo(Applying) >= 0 || 
!waiting.asCommitted().waitingOn().isWaitingOn(dep.txnId()));
+                            
Invariants.require(waiting.saveStatus().compareTo(Applying) >= 0 || 
!waiting.waitingOn().isWaitingOn(dep.txnId()));
                             depSafe = null;
                     }
                 }
diff --git a/accord-core/src/main/java/accord/messages/ParticipantsRequest.java 
b/accord-core/src/main/java/accord/messages/ParticipantsRequest.java
index acd6d3d1..7d3f240e 100644
--- a/accord-core/src/main/java/accord/messages/ParticipantsRequest.java
+++ b/accord-core/src/main/java/accord/messages/ParticipantsRequest.java
@@ -131,7 +131,6 @@ public abstract class ParticipantsRequest<P extends 
Participants<?>, R extends R
         if (i == mi)
             return topologies.oldestEpoch();
 
-
         Ranges latest;
         {
             Topology mostRecent = topologies.get(i - 1);
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index f57f339e..2b675625 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -269,7 +269,7 @@ public class Topology
 
     public Topology cloneEquivalentWithEpoch(long epoch)
     {
-        return new Topology(epoch, removed, hardRemoved, stale, shards);
+        return new Topology(null, epoch, shards, ranges, removed, hardRemoved, 
stale, nodes, nodeLookup, ranges, supersetIndexes);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java 
b/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
index 16c9c75c..d3672149 100644
--- a/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
+++ b/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
@@ -86,6 +86,15 @@ public abstract class IntrusivePriorityHeap<N extends 
IntrusivePriorityHeap.Node
         return i >= 0 && i < size && heap[i] == node;
     }
 
+    protected boolean removeIfContains(N node)
+    {
+        int i = node.heapIndex;
+        if (i < 0 || i >= heap.length || heap[i] != node)
+            return false;
+        removeInternal(i, node);
+        return true;
+    }
+
     /**
      * remove; can be used as a simple list
      */
@@ -93,6 +102,11 @@ public abstract class IntrusivePriorityHeap<N extends 
IntrusivePriorityHeap.Node
     {
         int i = node.heapIndex;
         Invariants.requireArgument(i >= 0 && i < heap.length && heap[i] == 
node);
+        removeInternal(i, node);
+    }
+
+    private void removeInternal(int i, N node)
+    {
         if (size > 1)
         {
             N tail = (N) heap[--size];
diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java 
b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
index d1bceda8..17c785d8 100644
--- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
@@ -404,7 +404,7 @@ public class RemoteListenersTest
         }
 
         @Override
-        public Journal.Replayer replayer()
+        public Journal.Replayer replayer(AbstractReplayer.Mode mode)
         {
             throw new UnsupportedOperationException();
         }
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 5d087354..6ab9edaa 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import accord.api.Journal;
 import accord.api.Result;
+import accord.impl.AbstractReplayer;
 import accord.impl.CommandChange;
 import accord.impl.InMemoryCommandStore;
 import accord.local.Cleanup;
@@ -70,6 +71,7 @@ import org.agrona.collections.Int2ObjectHashMap;
 import static accord.api.Journal.Load.ALL;
 import static accord.api.Journal.Load.MINIMAL;
 import static accord.api.Journal.Load.MINIMAL_WITH_DEPS;
+import static accord.impl.AbstractReplayer.Mode.PART_NON_DURABLE;
 import static accord.impl.CommandChange.Field;
 import static accord.impl.CommandChange.Field.ACCEPTED;
 import static accord.impl.CommandChange.Field.CLEANUP;
@@ -632,7 +634,7 @@ public class InMemoryJournal implements Journal
             Map<TxnId, List<Diff>> diffs = new TreeMap<>();
 
             InMemoryCommandStore commandStore = (InMemoryCommandStore) 
commandStores.forId(commandStoreId);
-            Replayer replayer = commandStore.replayer();
+            Replayer replayer = commandStore.replayer(PART_NON_DURABLE);
 
             for (Map.Entry<TxnId, Diffs> e : diffEntry.getValue().entrySet())
                 diffs.put(e.getKey(), e.getValue().sorted(true));
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index d4917930..55d566c5 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -49,6 +49,7 @@ import accord.api.Read;
 import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.api.Update;
+import accord.impl.AbstractReplayer;
 import accord.impl.DefaultLocalListeners;
 import accord.impl.DefaultLocalListeners.DefaultNotifySink;
 import accord.impl.DefaultRemoteListeners;
@@ -993,7 +994,7 @@ public class CommandsForKeyTest
             return true;
         }
 
-        public Journal.Replayer replayer()
+        public Journal.Replayer replayer(AbstractReplayer.Mode mode)
         {
             throw new UnsupportedOperationException();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to