This is an automated email from the ASF dual-hosted git repository. konstantinov pushed a commit to branch fixes-260226 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 720a89d8fa49b849390f0fcf5a9be24ba294c9e4 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Mon Jan 12 17:33:51 2026 +0000 - Do not invoke slowReplicaDelay or slowCoordinatorDelay during restore - Do not fail if slowReplicaDelay or slowCoordinatorDelay are invoked without knowing the transaction - DefaultRemoteListeners not correctly synchronised - Topology.cloneEquivalentWithEpoch should also share nodeLookup and ranges, especially to accelerate computeWaitForEpoch/computeScope --- .../java/accord/impl/DefaultRemoteListeners.java | 75 +++++++++++++--------- .../java/accord/impl/InMemoryCommandStore.java | 2 +- .../impl/progresslog/DefaultProgressLog.java | 35 +++++++--- .../src/main/java/accord/local/CommandStore.java | 3 +- .../src/main/java/accord/local/Commands.java | 23 ++++--- .../java/accord/messages/ParticipantsRequest.java | 1 - .../src/main/java/accord/topology/Topology.java | 2 +- .../java/accord/utils/IntrusivePriorityHeap.java | 14 ++++ .../test/java/accord/impl/RemoteListenersTest.java | 2 +- .../java/accord/impl/basic/InMemoryJournal.java | 4 +- .../java/accord/local/cfk/CommandsForKeyTest.java | 3 +- 11 files changed, 107 insertions(+), 57 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java b/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java index 4bf35380..33d0a8da 100644 --- a/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java +++ b/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java @@ -297,7 +297,7 @@ public class DefaultRemoteListeners implements RemoteListeners this.end = 1; } - Listeners merge(Listeners that) + synchronized Listeners merge(Listeners that) { int count = 0; int i = this.start, j = that.start; @@ -344,6 +344,41 @@ public class DefaultRemoteListeners implements RemoteListeners end = count; return this; } + + synchronized Listeners notify(NotifySink notifySink, SafeCommandStore safeStore, SafeCommand safeCommand, Command prev) + { + int storeId = safeStore.commandStore().id(); + SaveStatus newStatus = safeCommand.current().saveStatus(); + Durability newDurability = safeCommand.current().durability(); + + StatusListeners[] listeners = statusListeners; + for (int i = start ; i < end ; ++i) + { + StatusListeners listener = listeners[i]; + if (awaitSaveStatus(listener.await).compareTo(newStatus) > 0) + return this; + + if (awaitDurability(listener.await).compareTo(newDurability.decisionOrOutcome()) > 0) + continue; + + listener.removeWaitingOn(storeId); + if (listener.waitingOnCount == 0) + { + // if we get invalidated we don't save the route, so we take the combined route of before and after the new status + Route<?> route = Route.merge(safeCommand.current().route(), prev == null ? null : (Route)prev.route()); + notifySink.notify(safeCommand.txnId(), newStatus, route, listener.listeners, listener.listenerCount); + if (i != start) + System.arraycopy(listeners, start, listeners, start + 1, i - start); + listeners[start] = null; + ++start; + } + } + + if (start == end) + return null; + + return this; + } } class Register implements Registration @@ -396,7 +431,7 @@ public class DefaultRemoteListeners implements RemoteListeners } @Override - public int done() + public synchronized int done() { if (count == 0) return 0; @@ -435,36 +470,14 @@ public class DefaultRemoteListeners implements RemoteListeners public void notify(SafeCommandStore safeStore, SafeCommand safeCommand, Command prev) { TxnId txnId = safeCommand.txnId(); - Listeners entry = this.listeners.get(txnId); - if (entry == null) - return; - - int storeId = safeStore.commandStore().id(); - SaveStatus newStatus = safeCommand.current().saveStatus(); - Durability newDurability = safeCommand.current().durability(); - - int start = entry.start, end = entry.end; - StatusListeners[] listeners = entry.statusListeners; - for (int i = start ; i < end ; ++i) + Listeners entry = listeners.get(txnId); + if (entry != null && null == entry.notify(notifySink, safeStore, safeCommand, prev)) { - StatusListeners listener = listeners[i]; - if (awaitSaveStatus(listener.await).compareTo(newStatus) > 0) - return; - - if (awaitDurability(listener.await).compareTo(newDurability.decisionOrOutcome()) > 0) - continue; - - listener.removeWaitingOn(storeId); - if (listener.waitingOnCount == 0) - { - // if we get invalidated we don't save the route, so we take the combined route of before and after the new status - Route<?> route = Route.merge(safeCommand.current().route(), prev == null ? null : (Route)prev.route()); - notifySink.notify(txnId, newStatus, route, listener.listeners, listener.listenerCount); - if (i != start) - System.arraycopy(listeners, start, listeners, start + 1, i - start); - listeners[start] = null; - start = ++entry.start; - } + listeners.compute(txnId, (ignore, e) -> { + if (e == entry && e.start == e.end) + return null; + return e; + }); } } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index f0eb6611..be4a9f59 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -1234,7 +1234,7 @@ public abstract class InMemoryCommandStore extends CommandStore hasResumedBootstraps = false; } - public Journal.Replayer replayer() + public Journal.Replayer replayer(AbstractReplayer.Mode mode) { return new CommandReplayer(this); } diff --git a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java index fa0f9466..ec85d716 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java +++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java @@ -915,27 +915,42 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor public void setMode(ModeFlag flag) { commandStore.execute((PreLoadContext.Empty)() -> "Set ProgressLog ModeFlag", safeStore -> { - modeFlags |= TinyEnumSet.encode(flag); - if (flag == ModeFlag.HOME_EXPECTS_LOCALLY_APPLIED) + setModeExclusive(safeStore, flag); + }); + } + + public boolean setModeExclusive(SafeCommandStore safeStore, ModeFlag flag) + { + int encoded = TinyEnumSet.encode(flag); + if ((modeFlags & encoded) == encoded) + return false; + + modeFlags |= TinyEnumSet.encode(flag); + if (flag == ModeFlag.HOME_EXPECTS_LOCALLY_APPLIED) + { + for (TxnState state : BTree.<TxnState>iterable(stateMap)) { - for (TxnState state : BTree.<TxnState>iterable(stateMap)) - { - // clear the home state and let normal processing decide what to do - if (state.homePhase() == Done) - state.set(safeStore, this, Undecided, Queued); - } + // clear the home state and let normal processing decide what to do + if (state.homePhase() == Done) + state.set(safeStore, this, Undecided, Queued); } - }); + } + return true; } @VisibleForImplementation public void unsetMode(ModeFlag flag) { commandStore.executeMaybeImmediately(() -> { - modeFlags &= ~TinyEnumSet.encode(flag); + unsetModeExclusive(flag); }); } + public void unsetModeExclusive(ModeFlag flag) + { + modeFlags &= ~TinyEnumSet.encode(flag); + } + boolean isCatchingUp() { return TinyEnumSet.contains(modeFlags, ModeFlag.CATCH_UP); diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 94f0742b..c543b622 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -35,6 +35,7 @@ import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.Nullable; +import accord.impl.AbstractReplayer; import accord.primitives.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSortedMap; @@ -222,7 +223,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA return id; } - public abstract Journal.Replayer replayer(); + public abstract Journal.Replayer replayer(AbstractReplayer.Mode mode); // expected to invoke safeStore.upsertRedundantBefore at some future point, when the commandStore state is durably persisted protected abstract void ensureDurable(Ranges ranges, RedundantBefore onCommandStoreDurable); diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index e7be67e6..f5348aa9 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -1236,17 +1236,22 @@ public class Commands boolean acceptInternal(SafeCommandStore safeStore) { SafeCommand waitingSafe = safeStore.get(waitingId); + PartialDeps partialDeps; + { + Command waiting = waitingSafe.current(); + if (waiting.saveStatus().compareTo(Applying) >= 0) + return false; + + partialDeps = waiting.partialDeps(); + Invariants.require(partialDeps != null, "Trying to execute command without partialDeps: %s", waiting); + } + SafeCommand depSafe = null; if (loadDepId != null) { depSafe = safeStore.ifInitialised(loadDepId); if (depSafe == null) // TODO (required): slice to waiting.participants().waitsOn? can simplify method - { - Command waiting = waitingSafe.current(); - if (waiting.saveStatus().compareTo(Applying) >= 0) - return false; // nothing to do - depSafe = initialiseOrRemoveDependency(safeStore, waitingSafe, loadDepId, waiting.partialDeps().participants(loadDepId)); - } + depSafe = initialiseOrRemoveDependency(safeStore, waitingSafe, loadDepId, partialDeps.participants(loadDepId)); } while (true) @@ -1257,7 +1262,7 @@ public class Commands if (depSafe == null) { - WaitingOn waitingOn = waiting.asCommitted().waitingOn(); + WaitingOn waitingOn = waiting.waitingOn(); TxnId directlyBlockedOn = waitingOn.nextWaitingOn(); if (directlyBlockedOn == null) { @@ -1300,7 +1305,7 @@ public class Commands if (depExecution.compareTo(WaitingToExecute) < 0 && dep.participants().owns().isEmpty()) { // TODO (desired): slightly costly to invert a large partialDeps collection - participants = waiting.partialDeps().participants(dep.txnId()); + participants = partialDeps.participants(dep.txnId()); Participants<?> waitsOn = participants.intersecting(waiting.participants().stillWaitsOn(), Minimal); depSafe = maybeCleanupRedundantDependency(safeStore, waitingSafe, depSafe, waitsOn); @@ -1352,7 +1357,7 @@ public class Commands case CleaningUp: updateDependencyAndMaybeExecute(safeStore, waitingSafe, depSafe, false); waiting = waitingSafe.current(); - Invariants.require(waiting.saveStatus().compareTo(Applying) >= 0 || !waiting.asCommitted().waitingOn().isWaitingOn(dep.txnId())); + Invariants.require(waiting.saveStatus().compareTo(Applying) >= 0 || !waiting.waitingOn().isWaitingOn(dep.txnId())); depSafe = null; } } diff --git a/accord-core/src/main/java/accord/messages/ParticipantsRequest.java b/accord-core/src/main/java/accord/messages/ParticipantsRequest.java index acd6d3d1..7d3f240e 100644 --- a/accord-core/src/main/java/accord/messages/ParticipantsRequest.java +++ b/accord-core/src/main/java/accord/messages/ParticipantsRequest.java @@ -131,7 +131,6 @@ public abstract class ParticipantsRequest<P extends Participants<?>, R extends R if (i == mi) return topologies.oldestEpoch(); - Ranges latest; { Topology mostRecent = topologies.get(i - 1); diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index f57f339e..2b675625 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -269,7 +269,7 @@ public class Topology public Topology cloneEquivalentWithEpoch(long epoch) { - return new Topology(epoch, removed, hardRemoved, stale, shards); + return new Topology(null, epoch, shards, ranges, removed, hardRemoved, stale, nodes, nodeLookup, ranges, supersetIndexes); } @Override diff --git a/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java b/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java index 16c9c75c..d3672149 100644 --- a/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java +++ b/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java @@ -86,6 +86,15 @@ public abstract class IntrusivePriorityHeap<N extends IntrusivePriorityHeap.Node return i >= 0 && i < size && heap[i] == node; } + protected boolean removeIfContains(N node) + { + int i = node.heapIndex; + if (i < 0 || i >= heap.length || heap[i] != node) + return false; + removeInternal(i, node); + return true; + } + /** * remove; can be used as a simple list */ @@ -93,6 +102,11 @@ public abstract class IntrusivePriorityHeap<N extends IntrusivePriorityHeap.Node { int i = node.heapIndex; Invariants.requireArgument(i >= 0 && i < heap.length && heap[i] == node); + removeInternal(i, node); + } + + private void removeInternal(int i, N node) + { if (size > 1) { N tail = (N) heap[--size]; diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java index d1bceda8..17c785d8 100644 --- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java +++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java @@ -404,7 +404,7 @@ public class RemoteListenersTest } @Override - public Journal.Replayer replayer() + public Journal.Replayer replayer(AbstractReplayer.Mode mode) { throw new UnsupportedOperationException(); } diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index 5d087354..6ab9edaa 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import accord.api.Journal; import accord.api.Result; +import accord.impl.AbstractReplayer; import accord.impl.CommandChange; import accord.impl.InMemoryCommandStore; import accord.local.Cleanup; @@ -70,6 +71,7 @@ import org.agrona.collections.Int2ObjectHashMap; import static accord.api.Journal.Load.ALL; import static accord.api.Journal.Load.MINIMAL; import static accord.api.Journal.Load.MINIMAL_WITH_DEPS; +import static accord.impl.AbstractReplayer.Mode.PART_NON_DURABLE; import static accord.impl.CommandChange.Field; import static accord.impl.CommandChange.Field.ACCEPTED; import static accord.impl.CommandChange.Field.CLEANUP; @@ -632,7 +634,7 @@ public class InMemoryJournal implements Journal Map<TxnId, List<Diff>> diffs = new TreeMap<>(); InMemoryCommandStore commandStore = (InMemoryCommandStore) commandStores.forId(commandStoreId); - Replayer replayer = commandStore.replayer(); + Replayer replayer = commandStore.replayer(PART_NON_DURABLE); for (Map.Entry<TxnId, Diffs> e : diffEntry.getValue().entrySet()) diffs.put(e.getKey(), e.getValue().sorted(true)); diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index d4917930..55d566c5 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -49,6 +49,7 @@ import accord.api.Read; import accord.api.Result; import accord.api.RoutingKey; import accord.api.Update; +import accord.impl.AbstractReplayer; import accord.impl.DefaultLocalListeners; import accord.impl.DefaultLocalListeners.DefaultNotifySink; import accord.impl.DefaultRemoteListeners; @@ -993,7 +994,7 @@ public class CommandsForKeyTest return true; } - public Journal.Replayer replayer() + public Journal.Replayer replayer(AbstractReplayer.Mode mode) { throw new UnsupportedOperationException(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
