This is an automated email from the ASF dual-hosted git repository. belliottsmith pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 66cab49f1c9af471795339a5fdf9522e6e73b691 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Thu Jun 4 19:20:56 2026 +0100 Expunged records may be resurrected: CommandChanges.shouldCleanup short-circuits to NO if there is no data, but this is incorrect as Cleanup.EXPUNGE may have dropped the data and the record must receive cleanup EXPUNGE and be reported as ERASED. Also Fix: - ActiveEpochs.withNewEpochs should handle transition from 0 -> more than 1 - RedundantBefore.minGcBefore should be NONE if empty - Update RangesForEpoch directly, so that we cannot have race conditions where the ownership is unknown - Avoid reentrancy on local callbacks - Ensure ReadCoordinator callbacks are invoked on owning thread - Avoid deadlock when notifying ComplexListener(s) - Release IntrusivePriorityHeap memory from large capacity heaps when empty - Prevent SynchronousRecoverAwait reentrancy when invoking onDone (by exposing and invoking invokeOnDone that first sets isDone) - maybeExecute must invoke either notWaiting or notifyWaiting to ensure tryExecuteListening terminates patch by Benedict; reviewed by Alan Wang and Alex Petrov for CASSANDRA-21440 --- .../accord/coordinate/AbstractCoordination.java | 9 +- .../main/java/accord/coordinate/ExecuteTxn.java | 16 +- .../java/accord/coordinate/ReadCoordinator.java | 45 ++- .../accord/coordinate/SynchronousRecoverAwait.java | 4 +- .../src/main/java/accord/impl/CommandChange.java | 27 +- .../java/accord/impl/DefaultLocalListeners.java | 133 ++++--- .../java/accord/impl/InMemoryCommandStore.java | 21 +- .../src/main/java/accord/local/CommandBuilder.java | 2 + .../src/main/java/accord/local/CommandStore.java | 97 +---- .../src/main/java/accord/local/CommandStores.java | 66 ++-- .../src/main/java/accord/local/Commands.java | 4 +- .../main/java/accord/local/RedundantBefore.java | 15 +- .../src/main/java/accord/messages/Callback.java | 46 ++- .../main/java/accord/topology/ActiveEpochs.java | 9 +- .../java/accord/utils/IntrusivePriorityHeap.java | 23 +- .../java/accord/utils/ReducingIntervalMap.java | 1 + .../main/java/accord/utils/ReducingRangeMap.java | 5 + .../test/java/accord/impl/RemoteListenersTest.java | 2 +- .../src/test/java/accord/impl/basic/Cluster.java | 16 +- .../accord/impl/basic/DelayedCommandStores.java | 30 +- .../java/accord/impl/basic/InMemoryJournal.java | 35 +- .../java/accord/local/MaybeExecuteAdapterTest.java | 408 +++++++++++++++++++++ .../java/accord/local/cfk/CommandsForKeyTest.java | 5 +- .../src/test/java/accord/utils/AccordGens.java | 5 +- 24 files changed, 719 insertions(+), 305 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java index 59cbf720..626c64be 100644 --- a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java +++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java @@ -65,6 +65,7 @@ public abstract class AbstractCoordination<P extends Participants<?>, Result, Re private BiConsumer<? super Result, Throwable> callback; private Object[] replyState; private int replyCount; + private boolean unsafeToReplyImmediately; protected AbstractCoordination(Node node, SequentialAsyncExecutor executor, TxnId txnId, P scope, SortedArrayList<Node.Id> nodes, BiConsumer<? super Result, Throwable> callback) { @@ -197,6 +198,7 @@ public abstract class AbstractCoordination<P extends Participants<?>, Result, Re void contact(Function<Node.Id, Request> request, @Nullable Predicate<Node.Id> include) { executor.executeMaybeImmediately(() -> { + unsafeToReplyImmediately = true; AbstractTracker<?> tracker = tracker(); Topologies topologies = tracker.topologies(); if (tracing != null) @@ -227,6 +229,7 @@ public abstract class AbstractCoordination<P extends Participants<?>, Result, Re } } } + unsafeToReplyImmediately = false; }); } @@ -242,19 +245,19 @@ public abstract class AbstractCoordination<P extends Participants<?>, Result, Re @Override public final void onSuccess(Node.Id from, Reply reply) { - CallbackExclusive.onSuccess(executor, this, from, reply); + CallbackExclusive.onSuccess(executor, unsafeToReplyImmediately, this, from, reply); } @Override public final void onSlow(Node.Id from) { - CallbackExclusive.onSlow(executor, this, from); + CallbackExclusive.onSlow(executor, unsafeToReplyImmediately, this, from); } @Override public final void onFailure(Node.Id from, Throwable failure) { - CallbackExclusive.onFailure(executor, this, from, failure); + CallbackExclusive.onFailure(executor, unsafeToReplyImmediately, this, from, failure); } @Override diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java index 9f813631..b042af07 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java @@ -86,9 +86,7 @@ import static accord.coordinate.CoordinationAdapter.Factory.Kind.Standard; import static accord.coordinate.ExecuteFlag.READY_TO_EXECUTE; import static accord.coordinate.ExecutePath.EPHEMERAL; import static accord.coordinate.ExecutePath.FAST; -import static accord.coordinate.ExecutePath.MEDIUM; import static accord.coordinate.ExecutePath.RECOVER; -import static accord.coordinate.ExecutePath.SLOW; import static accord.coordinate.ReadCoordinator.Action.Approve; import static accord.coordinate.ReadCoordinator.Action.ApprovePartial; import static accord.local.CommandSummaries.SummaryStatus.STABLE; @@ -124,19 +122,19 @@ public class ExecuteTxn extends ReadCoordinator<Result, ReadReply> @Override public final void onSuccess(Node.Id from, ReadReply reply) { - CallbackExclusive.onSuccess(executor, this, from, reply); + CallbackExclusive.onSuccess(executor, unsafeToReplyImmediately, this, from, reply); } @Override public final void onSlow(Node.Id from) { - CallbackExclusive.onSlow(executor, this, from); + CallbackExclusive.onSlow(executor, unsafeToReplyImmediately, this, from); } @Override public final void onFailure(Node.Id from, Throwable failure) { - CallbackExclusive.onFailure(executor, this, from, failure); + CallbackExclusive.onFailure(executor, unsafeToReplyImmediately, this, from, failure); } @Override @@ -494,18 +492,18 @@ public class ExecuteTxn extends ReadCoordinator<Result, ReadReply> } @Override - public void exclusiveOnSlowResponse(Id from) + public void onSlowExclusive(Id from) { // send stable messages to everyone not yet contacted, and then inform decided, to avoid unnecessary recoveries stable.maybeInformStable(); - super.exclusiveOnSlowResponse(from); + super.onSlowExclusive(from); } @Override - public void exclusiveOnFailure(Id from, Throwable failure) + public void onFailureExclusive(Id from, Throwable failure) { if (isPrivilegedVoteCommitting && from.id == node.id().id && !isDone()) finishWithFailure(failure); - else super.exclusiveOnFailure(from, failure); + else super.onFailureExclusive(from, failure); } protected CoordinationAdapter<Result> adapter() diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java index 458ed2d9..f1783b33 100644 --- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java +++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java @@ -33,6 +33,7 @@ import accord.local.Node; import accord.local.Node.Id; import accord.local.SequentialAsyncExecutor; import accord.messages.Callback; +import accord.messages.Callback.CallbackExclusive; import accord.primitives.Participants; import accord.primitives.Route; import accord.primitives.WithQuorum; @@ -53,7 +54,7 @@ import static accord.utils.Invariants.debug; import static accord.utils.Invariants.illegalState; // TODO (expected): configure the number of initial requests we send -public abstract class ReadCoordinator<Result, Reply extends accord.messages.Reply> extends ReadTracker implements Callback<Reply>, Coordination +public abstract class ReadCoordinator<Result, Reply extends accord.messages.Reply> extends ReadTracker implements Callback<Reply>, CallbackExclusive<Reply>, Coordination { public enum Action { @@ -119,6 +120,7 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl private BiConsumer<? super Result, Throwable> callback; private boolean isDone; private Throwable failure; + boolean unsafeToReplyImmediately; protected ReadCoordinator(Node node, SequentialAsyncExecutor executor, Topologies topologies, TxnId txnId, Participants<?> participants, BiConsumer<? super Result, Throwable> callback) { @@ -142,24 +144,19 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl @Override public final void onSuccess(Node.Id from, Reply reply) { - executor.executeMaybeImmediately(() -> { - try { onSuccessExclusive(from, reply); } - catch (Throwable t) { exclusiveOnCallbackFailure(from, t); } - }); + CallbackExclusive.onSuccess(executor, unsafeToReplyImmediately, this, from, reply); } @Override public final void onSlow(Node.Id from) { - try { exclusiveOnSlowResponse(from); } - catch (Throwable t) { exclusiveOnCallbackFailure(from, t); } + CallbackExclusive.onSlow(executor, unsafeToReplyImmediately, this, from); } @Override public final void onFailure(Node.Id from, Throwable failure) { - try { exclusiveOnFailure(from, failure); } - catch (Throwable t) { exclusiveOnCallbackFailure(from, t); } + CallbackExclusive.onFailure(executor, unsafeToReplyImmediately, this, from, failure); } @Override @@ -169,7 +166,8 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl return true; } - protected void onSuccessExclusive(Id from, Reply reply) + @Override + public void onSuccessExclusive(Id from, Reply reply) { if (isDone) return; @@ -184,7 +182,7 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl { default: throw new UnhandledEnum(action); case Aborted: - setDone(); + trySetDone(); case None: break; @@ -212,7 +210,8 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl } } - protected void exclusiveOnSlowResponse(Id from) + @Override + public void onSlowExclusive(Id from) { if (isDone) return; @@ -225,11 +224,12 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl } catch (Throwable t) { - exclusiveOnCallbackFailure(from, t); + onCallbackFailureExclusive(from, t); } } - protected void exclusiveOnFailure(Id from, Throwable failure) + @Override + public void onFailureExclusive(Id from, Throwable failure) { if (isDone) return; @@ -250,11 +250,12 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl } catch (Throwable t) { - exclusiveOnCallbackFailure(from, t); + onCallbackFailureExclusive(from, t); } } - protected void exclusiveOnCallbackFailure(Id from, Throwable failure) + @Override + public void onCallbackFailureExclusive(Id from, Throwable failure) { try { @@ -319,12 +320,12 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl finishOnFailure(); } - boolean isDone() + final boolean isDone() { return isDone; } - void setDone() + final void setDone() { Invariants.require(!isDone); isDone = true; @@ -333,7 +334,7 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl Coordination.traceStop(tracing, this); } - boolean trySetDone() + final boolean trySetDone() { if (isDone) return false; @@ -341,7 +342,7 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl return true; } - private void invokeOnDone(Success success, Throwable failure) + final void invokeOnDone(Success success, Throwable failure) { setDone(); try { onDone(success, failure); } @@ -374,7 +375,9 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl protected void start(List<Id> to) { + unsafeToReplyImmediately = true; to.forEach(this::contact); + unsafeToReplyImmediately = false; } public final void start() @@ -407,7 +410,9 @@ public abstract class ReadCoordinator<Result, Reply extends accord.messages.Repl RequestStatus status = trySendMore(List::add, contact); if (tracing != null) tracing.trace(null, "contacting %s", contact); + unsafeToReplyImmediately = true; contact.forEach(this::contact); + unsafeToReplyImmediately = false; return status; } diff --git a/accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java b/accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java index 2a07ced9..33e23c73 100644 --- a/accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java +++ b/accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java @@ -97,7 +97,7 @@ public class SynchronousRecoverAwait extends ReadCoordinator<InferredFastPath, R case Reject: outcome = Reject; - onDone(null, null); + invokeOnDone(null, null); return Action.Aborted; case Accept: @@ -105,7 +105,7 @@ public class SynchronousRecoverAwait extends ReadCoordinator<InferredFastPath, R if (waitingOn.isEmpty()) { outcome = Accept; - onDone(null, null); + invokeOnDone(null, null); return Action.Aborted; } return Action.Approve; diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java b/accord-core/src/main/java/accord/impl/CommandChange.java index a3a05960..8729eedb 100644 --- a/accord-core/src/main/java/accord/impl/CommandChange.java +++ b/accord-core/src/main/java/accord/impl/CommandChange.java @@ -357,11 +357,6 @@ public class CommandChange public Cleanup shouldCleanup(Input input, RedundantBefore redundantBefore, DurableBefore durableBefore) { - // Early return: No cleanup needed when no updates have been made to this command - if (!hasUpdate) - return NO; - - // Honor previously set cleanup requirements - cleanup levels are ordered by aggressiveness if (cleanup != null) { switch (cleanup) @@ -381,16 +376,22 @@ public class CommandChange Durability durability = this.durability; if (durability == null) durability = NotDurable; StoreParticipants participants = this.participants; + // TODO (expected): we need to filter participants to correctly compute doesStillExecute in Cleanup.shouldCleanup; // would be better to break this dependency, or otherwise encode it better. // In particular it would be nice to avoid doing this twice for each command on load, as we also do this in SafeCommandStore. // Perhaps we can special-case loading, and simply update the participants here so we can avoid doing it again on access + SaveStatus saveStatus = this.saveStatus; if (input == Input.FULL) { // During full compaction, commands without save status can be completely expunged if (saveStatus == null) - return EXPUNGE; - + { + if (hasUpdate) + return EXPUNGE; + saveStatus = SaveStatus.NotDefined; + } + if (participants != null) participants = participants.filter(LOAD, redundantBefore, txnId, saveStatus.known.isExecuteAtKnown() ? executeAt : null); } @@ -401,6 +402,7 @@ public class CommandChange return cleanup; } + @SuppressWarnings("UnusedReturnValue") // used by implementation public Cleanup maybeCleanup(boolean clearFields, Input input, RedundantBefore redundantBefore, DurableBefore durableBefore) { Cleanup cleanup = shouldCleanup(input, redundantBefore, durableBefore); @@ -509,6 +511,7 @@ public class CommandChange // returns true if we made a material update to the Builder; // that is, if we cleared a non-null field or if we are already mask-only + @SuppressWarnings("UnusedReturnValue") // return value is used by implementation public boolean clearSuperseded(boolean clearFields, Builder superseding) { int unset = flags & setFieldsMask(superseding.flags & ~setChanged(CLEANUP)); @@ -597,9 +600,7 @@ public class CommandChange // TODO (expected): we shouldn't need to filter participants here, we will do it anyway before using in SafeCommandStore public Command construct(RedundantBefore redundantBefore) { - if (!hasUpdate) - return null; - + // we cannot short-circuit !hasUpdate, as we might have been expunged and should return Erased Invariants.require(txnId != null); if (cleanup != null) { @@ -610,7 +611,7 @@ public class CommandChange { default: throw new UnhandledEnum(cleanup); case NO: break; - case EXPUNGE: return null; + case EXPUNGE: case ERASE: return Command.Truncated.erased(txnId); case INVALIDATE: return Command.Truncated.invalidated(txnId, participants); case VESTIGIAL: return Command.Truncated.vestigial(txnId, participants); @@ -625,6 +626,9 @@ public class CommandChange } } + if (!hasUpdate) + return null; + // TODO (expected): bitset of expected known fields for cheap and comprehensive expunge check if (executeAt == null && saveStatus != null && saveStatus.known.isExecuteAtKnown()) return null; @@ -680,7 +684,6 @@ public class CommandChange case Vestigial: return vestigial(txnId, participants); case Erased: - // TODO (expected): why are we saving Durability here for erased commands? return erased(txnId); case Invalidated: return invalidated(txnId, participants); diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java index ef5fd853..02bd617b 100644 --- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java +++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java @@ -42,6 +42,7 @@ import accord.local.SafeCommand; import accord.local.SafeCommandStore; import accord.primitives.SaveStatus; import accord.primitives.TxnId; +import accord.utils.ArrayBuffers.BufferList; import accord.utils.AsymmetricComparator; import accord.utils.Invariants; import accord.utils.btree.BTree; @@ -292,7 +293,6 @@ public class DefaultLocalListeners implements LocalListeners static final RegisteredComplexListener[] NO_LISTENERS = new RegisteredComplexListener[0]; RegisteredComplexListener[] listeners = NO_LISTENERS; int count, length; - boolean notifying; /** * Append to the end of the list; if we aren't reentering from notify then if the next position @@ -307,10 +307,11 @@ public class DefaultLocalListeners implements LocalListeners Invariants.require(listeners[index] == remove); listeners[index] = null; remove.index = -1; - // we don't decrement length even if count==length so as to simplify reentry - --count; - if (Invariants.isParanoid() && !notifying) checkIntegrity(); - return count > 0 || notifying ? this : null; + if (--count == 0) + return null; + + if (Invariants.isParanoid()) checkIntegrity(); + return this; } /** @@ -323,10 +324,10 @@ public class DefaultLocalListeners implements LocalListeners if (listeners.length == length) { RegisteredComplexListener[] oldListeners = listeners; - if (length >= count / 2 || notifying) + if (length >= count / 2) listeners = new RegisteredComplexListener[Math.max(2, length * 2)]; - if (count == length || notifying) + if (count == length) { // copy to same positions System.arraycopy(oldListeners, 0, listeners, 0, length); @@ -353,7 +354,7 @@ public class DefaultLocalListeners implements LocalListeners add.index = length; length++; count++; - if (Invariants.isParanoid() && !notifying) checkIntegrity(); + if (Invariants.isParanoid()) checkIntegrity(); } /** @@ -362,61 +363,15 @@ public class DefaultLocalListeners implements LocalListeners * listeners that were present when we started. We compact the listener collection as we go, though given * reentry there is no guarantee the list at exit is compacted. */ - RegisteredComplexListeners notify(SafeCommandStore safeStore, SafeCommand safeCommand, NotifySink notifySink) + void collect(List<RegisteredComplexListener> notify) { - int count = 0; - int length = this.length; - - notifying = true; for (int i = 0 ; i < length ; ++i) { RegisteredComplexListener next = listeners[i]; if (next == null) continue; Invariants.require(next.index == i); - if (!notifySink.notify(safeStore, safeCommand, listeners[i].listener)) - { - if (next.index >= 0) - --this.count; - next.index = -1; - } - else if (next.index >= 0) // can be cancelled by notify, without notify return false - { - Invariants.requireArgument(next.index == i); - if (i != count) - { - listeners[count] = next; - next.index = count; - } - ++count; - } - else Invariants.require(listeners[i] == null); - } - notifying = false; - - if (length != this.length) - { - // we have had some concurrent insertions (concurrent removals do not alter length) - // we also have some empty slots, so compact the new entries - for (int i = length ; i < this.length ; ++i) - { - RegisteredComplexListener next = listeners[i]; - if (next == null) - continue; - - Invariants.require(next.index == i); - listeners[count] = next; - next.index = count; - count++; - } - Invariants.require(this.count <= count); // we could have already removed some items from the compacted section - length = this.length; + notify.add(listeners[i]); } - - Arrays.fill(listeners, count, length, null); - this.length = count; - - if (Invariants.isParanoid()) checkIntegrity(); - return count == 0 ? null : this; } private void checkIntegrity() @@ -520,13 +475,59 @@ public class DefaultLocalListeners implements LocalListeners this.txnListeners = txnListeners; } - private void notifyComplexListeners(SafeCommandStore safeStore, SafeCommand safeCommand) + static class NotifyComplex extends BufferList<RegisteredComplexListener> implements BiFunction<TxnId, RegisteredComplexListeners, RegisteredComplexListeners> { - complexListeners.compute(safeCommand.txnId(), (id, cur) -> { + boolean remove; + @Override + public RegisteredComplexListeners apply(TxnId txnId, RegisteredComplexListeners cur) + { if (cur == null) return null; - return cur.notify(safeStore, safeCommand, notifySink); - }); + + if (remove) + { + for (RegisteredComplexListener listener : this) + { + if (listener != null && null == cur.remove(listener)) + return null; // can only return early if remaining listeners already removed + } + } + else + { + cur.collect(this); + } + + return cur; + } + } + + private void notifyComplexListeners(SafeCommandStore safeStore, SafeCommand safeCommand) + { + try (NotifyComplex notify = new NotifyComplex()) + { + complexListeners.compute(safeCommand.txnId(), notify); + + int size = notify.size(), count = size; + for (int i = 0 ; i < size ; ++i) + { + RegisteredComplexListener registered = notify.get(i); + if (registered.index < 0) continue; + boolean noChange = notifySink.notify(safeStore, safeCommand, registered.listener) + || registered.index < 0; // check we haven't removed ourselves during notify, to avoid wasted work + + if (noChange) + { + notify.set(i, null); + --count; + } + } + + if (count > 0) + { + notify.remove = true; + complexListeners.compute(safeCommand.txnId(), notify); + } + } } @Override @@ -555,9 +556,9 @@ public class DefaultLocalListeners implements LocalListeners complexListeners.forEach((key, value) -> { // the listener registration needs to be invalidated so that a caller does not try to cancel it RegisteredComplexListeners listeners = complexListeners.remove(key); - for (int i = 0 ; i < listeners.length ; i++) + if (listeners != null) { - if (listeners.listeners[i] != null) + for (int i = 0 ; i < listeners.count ; i++) listeners.listeners[i].index = -1; } }); @@ -716,13 +717,7 @@ public class DefaultLocalListeners implements LocalListeners maxBufferCount = bufferCount; } - int count = 0; - for (int i = 0 ; i < cur.length ; ++i) - { - if (cur.listeners[i] != null) - buffer[count++] = cur.listeners[i]; - } - Invariants.expect(count == bufferCount); + System.arraycopy(cur.listeners, 0, buffer, 0, bufferCount); return cur; }); diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index daa2c9f5..41be863d 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -41,6 +41,7 @@ import java.util.function.Predicate; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.local.CommandStores.RangesForEpoch; import accord.local.cfk.NotifySink; import accord.primitives.*; import com.google.common.annotations.VisibleForTesting; @@ -216,9 +217,9 @@ public abstract class InMemoryCommandStore extends CommandStore private InMemorySafeStore current; private final Journal journal; - public InMemoryCommandStore(int id, NodeCommandStoreService node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal) + public InMemoryCommandStore(int id, NodeCommandStoreService node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, RangesForEpoch rangesForEpoch, Journal journal) { - super(id, node, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); + super(id, node, agent, store, progressLogFactory, listenersFactory, rangesForEpoch); this.journal = journal; this.commandsForRanges = new InMemoryRangeSummaryIndex(); progressLog.unsafeStart(); @@ -475,9 +476,7 @@ public abstract class InMemoryCommandStore extends CommandStore { if (current != null) throw illegalState("Another operation is in progress or it's store was not cleared"); - current = createSafeStore(context, cfrLoad); - updateRangesForEpoch(current); - return current; + return current = createSafeStore(context, cfrLoad); } public void completeOperation(SafeCommandStore store) @@ -951,9 +950,9 @@ public abstract class InMemoryCommandStore extends CommandStore Thread activeThread; final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); - public Synchronized(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal) + public Synchronized(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, RangesForEpoch rangesForEpoch, Journal journal) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, journal); + super(id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, journal); } private synchronized void maybeRun() @@ -1026,9 +1025,9 @@ public abstract class InMemoryCommandStore extends CommandStore private Thread thread; // when run in the executor this will be non-null, null implies not running in this store private final ExecutorService executor; - public SingleThread(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal) + public SingleThread(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, RangesForEpoch rangesForEpoch, Journal journal) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, journal); + super(id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, journal); this.executor = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ']'); @@ -1110,9 +1109,9 @@ public abstract class InMemoryCommandStore extends CommandStore } } - public Debug(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal) + public Debug(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, RangesForEpoch rangesForEpoch, Journal journal) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, journal); + super(id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, journal); } @Override diff --git a/accord-core/src/main/java/accord/local/CommandBuilder.java b/accord-core/src/main/java/accord/local/CommandBuilder.java index ecfc8cdd..44c71ac3 100644 --- a/accord-core/src/main/java/accord/local/CommandBuilder.java +++ b/accord-core/src/main/java/accord/local/CommandBuilder.java @@ -229,6 +229,8 @@ public class CommandBuilder case TruncatedApplyWithOutcome: case TruncatedApply: case TruncatedUnapplied: + if (txnId.awaitsOnlyDeps()) + return Command.Truncated.truncated(txnId, saveStatus, durability, participants, executeAt, partialDeps, writes, result, null); return Command.Truncated.truncated(txnId, saveStatus, durability, participants, executeAt, partialDeps, writes, result); case Erased: return Command.Truncated.erased(txnId); diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index d6a99a42..a07a6559 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -29,7 +29,6 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -104,43 +103,6 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA { private static final Logger logger = LoggerFactory.getLogger(CommandStore.class); - public static class EpochUpdate - { - public final RangesForEpoch newRangesForEpoch; - public final RedundantBefore addRedundantBefore; - - EpochUpdate(RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore) - { - this.newRangesForEpoch = newRangesForEpoch; - this.addRedundantBefore = addRedundantBefore; - } - } - - // TODO (required): we only REMOVE ranges now, so it should be possible to simplify this - public static class EpochUpdateHolder extends AtomicReference<EpochUpdate> - { - // TODO (desired): can better encapsulate by accepting only the newRangesForEpoch and deriving the add/remove ranges - public void add(long epoch, RangesForEpoch newRangesForEpoch, Ranges addRanges) - { - RedundantBefore addRedundantBefore = RedundantBefore.create(addRanges, epoch, Long.MAX_VALUE, TxnId.minForEpoch(epoch), UNREADY_ONLY); - update(newRangesForEpoch, addRedundantBefore); - } - - public void remove(long epoch, RangesForEpoch newRangesForEpoch, Ranges removeRanges) - { - RedundantBefore addRedundantBefore = RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, SomeStatus.NONE); - update(newRangesForEpoch, addRedundantBefore); - } - - private void update(RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore) - { - EpochUpdate baseUpdate = new EpochUpdate(newRangesForEpoch, addRedundantBefore); - EpochUpdate cur = get(); - if (cur == null || !compareAndSet(cur, new EpochUpdate(newRangesForEpoch, RedundantBefore.merge(cur.addRedundantBefore, addRedundantBefore)))) - set(baseUpdate); - } - } - public interface Factory { CommandStore create(int id, @@ -149,7 +111,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, - EpochUpdateHolder rangesForEpoch, + RangesForEpoch rangesForEpoch, Journal journal); } @@ -159,7 +121,6 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA protected final DataStore dataStore; protected final ProgressLog progressLog; protected final LocalListeners listeners; - protected final EpochUpdateHolder epochUpdateHolder; // Used in markShardStale to make sure the staleness includes in progress bootstraps // TODO (desired): migrate to BTree @@ -214,7 +175,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA DataStore dataStore, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, - EpochUpdateHolder epochUpdateHolder) + RangesForEpoch rangesForEpoch) { this.id = id; this.node = node; @@ -222,7 +183,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA this.dataStore = dataStore; this.progressLog = progressLogFactory.create(this); this.listeners = listenersFactory.create(this); - this.epochUpdateHolder = epochUpdateHolder; + loadRangesForEpoch(rangesForEpoch); } public final int id() @@ -255,34 +216,6 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA waitingOnVisibility.clear(); } - public void updateRangesForEpoch(SafeCommandStore safeStore) - { - EpochUpdate update = epochUpdateHolder.get(); - if (update == null) - return; - - update = epochUpdateHolder.getAndSet(null); - if (update.addRedundantBefore.size() > 0) - safeStore.upsertRedundantBefore(update.addRedundantBefore); - if (update.newRangesForEpoch != null) - safeStore.setRangesForEpoch(update.newRangesForEpoch); - - safeStore.persistFieldUpdates(); - } - - @VisibleForTesting - public void unsafeUpdateRangesForEpoch() - { - EpochUpdate update = epochUpdateHolder.getAndSet(null); - if (update == null) - return; - - if (update.addRedundantBefore.size() > 0) - unsafeUpsertRedundantBefore(update.addRedundantBefore); - if (update.newRangesForEpoch != null) - unsafeSetRangesForEpoch(update.newRangesForEpoch); - } - public RangesForEpoch unsafeGetRangesForEpoch() { return rangesForEpoch; @@ -304,15 +237,15 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA rangesForEpoch = nonNull(newRangesForEpoch); } - protected final void unsafeClearRangesForEpoch() - { - rangesForEpoch = null; - } - protected void loadRangesForEpoch(RangesForEpoch newRangesForEpoch) { - Invariants.require(this.rangesForEpoch == null); + Invariants.require(this.rangesForEpoch == null || rangesForEpoch.isPrefixOf(newRangesForEpoch)); unsafeSetRangesForEpoch(newRangesForEpoch); + if (redundantBefore.isEmpty() && newRangesForEpoch.size() > 0) + { + long minEpoch = rangesForEpoch.epochAtIndex(0); + loadRedundantBefore(RedundantBefore.create(rangesForEpoch.all(), minEpoch, Long.MAX_VALUE, TxnId.minForEpoch(minEpoch), UNREADY_ONLY)); + } } protected final void unsafeClearPermanentlyUnsafeToRead() @@ -390,8 +323,8 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA protected void loadRedundantBefore(RedundantBefore newRedundantBefore) { - Invariants.require(redundantBefore == null || redundantBefore.equals(RedundantBefore.EMPTY)); - Invariants.require(newRedundantBefore != null); + Invariants.require(redundantBefore == null || redundantBefore.foldl((b, v) -> v && b.bounds.length == 1 && b.status(0) == 0 && b.status(1) == UNREADY_ONLY.encoded, true)); + Invariants.require(newRedundantBefore != null && (redundantBefore == null || newRedundantBefore.isAtLeast(redundantBefore))); unsafeSetRedundantBefore(newRedundantBefore); } @@ -850,16 +783,20 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA }); } - Supplier<EpochReady> unbootstrap(long epoch, Ranges removedRanges) + Supplier<EpochReady> unbootstrap(long epoch, RangesForEpoch newRangesForEpoch, Ranges removeRanges) { return () -> { AsyncResult<Void> done = submit((Empty) () -> "Unbootstrap", safeStore -> { for (Bootstrap prev : bootstraps) { - Ranges abort = prev.allValid.slice(removedRanges, Minimal); + Ranges abort = prev.allValid.slice(removeRanges, Minimal); if (!abort.isEmpty()) prev.invalidate(abort); } + Invariants.require(rangesForEpoch.isPrefixOf(newRangesForEpoch)); + RedundantBefore addRedundantBefore = RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, SomeStatus.NONE); + safeStore.setRangesForEpoch(newRangesForEpoch); + safeStore.upsertRedundantBefore(addRedundantBefore); return null; }); diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index dd2e353d..168b7df4 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -48,7 +48,6 @@ import accord.api.Journal; import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.RoutingKey; -import accord.local.CommandStore.EpochUpdateHolder; import accord.primitives.AbstractRanges; import accord.primitives.AbstractUnseekableKeys; import accord.primitives.EpochSupplier; @@ -127,7 +126,7 @@ public abstract class CommandStores implements AsyncExecutorFactory */ Ranges ranges(ShardHolder shard); - default @Nullable long minEpoch() { return -1L; }; + default long minEpoch() { return -1L; }; } public interface UnrestrictedStoreSelector extends StoreSelector @@ -354,7 +353,7 @@ public abstract class CommandStores implements AsyncExecutorFactory this.journal = journal; } - CommandStore create(int id, EpochUpdateHolder rangesForEpoch) + CommandStore create(int id, RangesForEpoch rangesForEpoch) { return shardFactory.create(id, node, agent, this.store, progressLogFactory, listenersFactory, rangesForEpoch, journal); } @@ -721,6 +720,20 @@ public abstract class CommandStores implements AsyncExecutorFactory } return removed; } + + public boolean isPrefixOf(RangesForEpoch that) + { + if (this.size() > that.size()) + return false; + + for (int i = 0 ; i < size() ; ++i) + { + if (this.epochs[i] != that.epochs[i] || !this.ranges[i].equals(that.ranges[i])) + return false; + } + + return true; + } } protected void loadSnapshot(Snapshot toLoad) @@ -927,9 +940,7 @@ public abstract class CommandStores implements AsyncExecutorFactory { // TODO (required): This is updating the a non-volatile field in the previous Snapshot, why modify it at all, even with volatile the guaranteed visibility is weak even with mutual exclusion shard.ranges = shard.ranges().withRanges(newTopology.epoch(), current.without(subtracted)); - shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges); - - bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); + bootstrapUpdates.add(shard.store.unbootstrap(epoch, shard.ranges, removeRanges)); } Ranges regainedRanges = shard.ranges().all().slice(added, Minimal); @@ -953,11 +964,12 @@ public abstract class CommandStores implements AsyncExecutorFactory logger.info("Epoch {} adding {} to local command stores", epoch, added); for (Ranges addRanges : shardDistributor.split(added)) { - EpochUpdateHolder updateHolder = new EpochUpdateHolder(); RangesForEpoch rangesForEpoch = new RangesForEpoch(epoch, addRanges); - updateHolder.add(epoch, rangesForEpoch, addRanges); - ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder), previouslyOwned.regains(addRanges)); + ShardHolder shard = new ShardHolder(supplier.create(nextId++, rangesForEpoch), previouslyOwned.regains(addRanges)); shard.ranges = rangesForEpoch; + bootstrapUpdates.add(() -> EpochReady.all(epoch, shard.store.execute((PreLoadContext.Empty)() -> "Saving RangesForEpoch to journal for " + shard.store, safeStore -> { + safeStore.setRangesForEpoch(rangesForEpoch); // to persist it + }))); Map<BootstrapRangeAction, Ranges> partitioned = addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range), BootstrapRangeAction.class); for (Map.Entry<BootstrapRangeAction, Ranges> entry : partitioned.entrySet()) @@ -1176,11 +1188,7 @@ public abstract class CommandStores implements AsyncExecutorFactory { RangesForEpoch rfe = e.getValue(); Invariants.require(rfe != null); - EpochUpdateHolder holder = new EpochUpdateHolder(); - ShardHolder shard = new ShardHolder(supplier.create(e.getKey(), holder), rfe, update.previouslyOwned.regains(rfe.currentRanges())); - // TODO (required): if the add is necessary (highly unlikely) it needs to be done once journal is writeable so we NEED to move this - if (!shard.ranges.equals(shard.store.rangesForEpoch)) - holder.add(1, e.getValue(), rfe.all()); + ShardHolder shard = new ShardHolder(supplier.create(e.getKey(), rfe), rfe, update.previouslyOwned.regains(rfe.currentRanges())); maxId = Math.max(maxId, e.getKey()); shards[i++] = shard; } @@ -1202,35 +1210,7 @@ public abstract class CommandStores implements AsyncExecutorFactory RangesForEpoch rfe = e.getValue(); Invariants.require(rfe != null); ShardHolder shard = new ShardHolder(current.byId(storeId), rfe, update.previouslyOwned.regains(rfe.all())); - EpochUpdateHolder holder = shard.store.epochUpdateHolder; - rfe.forEach(new BiConsumer<>() - { - RangesForEpoch accumulator = null; - Ranges prev = null; - public void accept(Long epoch, Ranges ranges) - { - if (accumulator == null) - accumulator = new RangesForEpoch(epoch, ranges); - else - accumulator = accumulator.withRanges(epoch, ranges); - - Ranges additions = ranges; - Ranges removals = Ranges.EMPTY; - if (prev != null) - { - additions = ranges.without(prev); - removals = prev.without(ranges); - } - - if (!additions.isEmpty()) - holder.add(epoch, accumulator, additions); - if (!removals.isEmpty()) - holder.remove(epoch, accumulator, removals); - shard.store.unsafeUpdateRangesForEpoch(); - prev = ranges; - } - }); - + shard.store.unsafeSetRangesForEpoch(rfe); shards[storeId] = shard; maxId = Math.max(maxId, storeId); } diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 3bfc31a9..78fa9f5f 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -838,13 +838,15 @@ public class Commands WaitingOn waitingOn = command.waitingOn(); if (waitingOn.isWaiting()) { - if (!removeRedundantDependencies(safeStore, safeCommand) || safeCommand.current().waitingOn().isWaiting()) + if (!removeRedundantDependencies(safeStore, safeCommand) || (waitingOn = safeCommand.current().waitingOn()).isWaiting()) { if (alwaysNotifyListeners) safeStore.notifyListeners(safeCommand, command); if (notifyWaitingOn && waitingOn.isWaitingOnCommand()) adapter.notifyWaiting(safeStore, safeCommand); + else + adapter.notWaiting(safeStore); return false; } diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index 56f35923..3cc16d08 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -87,6 +87,7 @@ import static accord.primitives.Timestamp.Flag.SHARD_BOUND; import static accord.utils.ArrayBuffers.cachedAny; import static accord.utils.ArrayBuffers.cachedInts; import static accord.utils.Functions.alwaysFalse; +import static accord.utils.Functions.alwaysTrue; import static accord.utils.Invariants.illegalState; import static accord.utils.Invariants.require; import static accord.utils.Invariants.requireStrictlyOrdered; @@ -874,7 +875,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> { staleRanges = lostRanges = Ranges.EMPTY; maxStale = maxShardAppliedBefore = maxGcBefore = TxnId.NONE; - minShardAndLocallyAppliedBefore = minGcBefore = TxnId.MAX; + minShardAndLocallyAppliedBefore = minGcBefore = TxnId.NONE; minGcHlcBefore = 0L; maxStartEpoch = 0; minEndEpoch = Long.MAX_VALUE; @@ -920,6 +921,8 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> if (bounds.endEpoch < minEndEpoch) minEndEpoch = bounds.endEpoch; } + + Invariants.require(minGcHlcBefore < Long.MAX_VALUE); this.maxStale = maxUnready; this.maxShardAppliedBefore = maxShardAppliedBefore; this.maxGcBefore = maxGcBefore; @@ -1129,8 +1132,14 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> public boolean isAtLeast(RedundantBefore atLeast) { + if (!ranges().containsAll(atLeast.ranges())) + return false; + return foldl((b, v, al, e) -> { return al.foldl(Ranges.of(b.range), (lb, v2, ub) -> { + if (!v2 || ub.endEpoch > lb.endEpoch || ub.startEpoch != lb.startEpoch) + return false; + int j = 0; for (int i = 0 ; i < lb.bounds.length ; ++i) { @@ -1142,9 +1151,9 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> || (lb.status(i*2+1) & ~ub.status(j*2+1)) != 0) return false; } - return v2; + return true; }, v, b); - }, true, atLeast, null, Functions.alwaysFalse()); + }, true, atLeast, null, i -> !i); } /** diff --git a/accord-core/src/main/java/accord/messages/Callback.java b/accord-core/src/main/java/accord/messages/Callback.java index dd30288f..ca9b5230 100644 --- a/accord-core/src/main/java/accord/messages/Callback.java +++ b/accord-core/src/main/java/accord/messages/Callback.java @@ -39,17 +39,29 @@ public interface Callback<R> interface CallbackExclusive<R> { - static <R> void onSuccess(AsyncExecutor executor, CallbackExclusive<R> callback, Node.Id from, R success) + private static void replyMaybeImmediately(AsyncExecutor executor, boolean doNotReplyImmediately, Runnable run) { - executor.executeMaybeImmediately(() -> { + if (doNotReplyImmediately || !executor.tryExecuteImmediately(run)) + executor.execute(run); + } + + static <R> Runnable runOnSuccess(CallbackExclusive<R> callback, Node.Id from, R success) + { + return () -> { try { callback.onSuccessExclusive(from, success); } catch (Throwable t) { callback.onCallbackFailureExclusive(from, t); } - }); + }; } - static <R> void onFailure(AsyncExecutor executor, CallbackExclusive<R> callback, Node.Id from, Throwable failure) + static <R> void onSuccess(AsyncExecutor executor, boolean doNotReplyImmediately, CallbackExclusive<R> callback, Node.Id from, R success) + { + replyMaybeImmediately(executor, doNotReplyImmediately, runOnSuccess(callback, from, success)); + } + + + static <R> Runnable runOnFailure(CallbackExclusive<R> callback, Node.Id from, Throwable failure) { - executor.executeMaybeImmediately(() -> { + return () -> { try { callback.onFailureExclusive(from, failure); } catch (Throwable t) { @@ -60,15 +72,25 @@ public interface Callback<R> } callback.onCallbackFailureExclusive(from, t); } - }); + }; } - static <R> void onSlow(AsyncExecutor executor, CallbackExclusive<R> callback, Node.Id from) + static <R> void onFailure(AsyncExecutor executor, boolean doNotReplyImmediately, CallbackExclusive<R> callback, Node.Id from, Throwable failure) { - executor.executeMaybeImmediately(() -> { + replyMaybeImmediately(executor, doNotReplyImmediately, runOnFailure(callback, from, failure)); + } + + static <R> Runnable runOnSlow(CallbackExclusive<R> callback, Node.Id from) + { + return () -> { try { callback.onSlowExclusive(from); } catch (Throwable t) { callback.onCallbackFailureExclusive(from, t); } - }); + }; + } + + static <R> void onSlow(AsyncExecutor executor, boolean unsafeToReplyImmediately, CallbackExclusive<R> callback, Node.Id from) + { + replyMaybeImmediately(executor, unsafeToReplyImmediately, runOnSlow(callback, from)); } void onSuccessExclusive(Node.Id from, R reply); @@ -89,19 +111,19 @@ public interface Callback<R> @Override public final void onSuccess(Node.Id from, R reply) { - CallbackExclusive.onSuccess(executor, this, from, reply); + CallbackExclusive.onSuccess(executor, true, this, from, reply); } @Override public final void onSlow(Node.Id from) { - CallbackExclusive.onSlow(executor, this, from); + CallbackExclusive.onSlow(executor, true, this, from); } @Override public final void onFailure(Node.Id from, Throwable failure) { - CallbackExclusive.onFailure(executor, this, from, failure); + CallbackExclusive.onFailure(executor, true, this, from, failure); } } } diff --git a/accord-core/src/main/java/accord/topology/ActiveEpochs.java b/accord-core/src/main/java/accord/topology/ActiveEpochs.java index 228c7f9b..0831536d 100644 --- a/accord-core/src/main/java/accord/topology/ActiveEpochs.java +++ b/accord-core/src/main/java/accord/topology/ActiveEpochs.java @@ -79,10 +79,13 @@ public final class ActiveEpochs implements Iterable<ActiveEpoch> ActiveEpochs withNewEpochs(ActiveEpoch[] epochs) { long firstNonEmptyEpoch = this.firstNonEmptyEpoch; - if (firstNonEmptyEpoch == -1 && epochs.length > 0 && !epochs[0].all().isEmpty()) + if (firstNonEmptyEpoch == -1) { - Invariants.require(epochs.length == 1); - firstNonEmptyEpoch = epochs[0].epoch(); + for (int i = epochs.length - 1; firstNonEmptyEpoch == -1 && i >= 0 ; --i) + { + if (!epochs[i].all().isEmpty()) + firstNonEmptyEpoch = epochs[i].epoch(); + } } return new ActiveEpochs(manager, epochs, firstNonEmptyEpoch); } diff --git a/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java b/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java index 30ce2973..de000b48 100644 --- a/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java +++ b/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java @@ -33,6 +33,7 @@ import java.util.stream.Stream; public abstract class IntrusivePriorityHeap<N extends IntrusivePriorityHeap.Node> implements Comparator<N> { private static final int NORMAL_MIN_SIZE = 8; + private static final int MAX_EMPTY_SIZE = 1024; private static final Node[] EMPTY = new Node[0]; private static final Node[] TINY_EMPTY = new Node[0]; @@ -146,7 +147,11 @@ public abstract class IntrusivePriorityHeap<N extends IntrusivePriorityHeap.Node tail.setHeapIndex(i); } } - else size = heapifiedSize = 0; + else + { + size = heapifiedSize = 0; + maybeShrink(); + } heap[size] = null; node.setHeapIndex(-1); @@ -187,7 +192,8 @@ public abstract class IntrusivePriorityHeap<N extends IntrusivePriorityHeap.Node --heapifiedSize; if (size == 0) { - heap[0] = null; + if (!maybeShrink()) + heap[0] = null; return; } @@ -196,6 +202,15 @@ public abstract class IntrusivePriorityHeap<N extends IntrusivePriorityHeap.Node siftDown(siftDown, 0); } + private boolean maybeShrink() + { + if (heap.length <= MAX_EMPTY_SIZE) + return false; + + heap = new Node[MAX_EMPTY_SIZE]; + return true; + } + /** * {@code i} is a free position in the heap, siftDown must be safely inserted at a position >= i */ @@ -303,6 +318,7 @@ public abstract class IntrusivePriorityHeap<N extends IntrusivePriorityHeap.Node { Arrays.fill(heap, 0, size, null); heapifiedSize = size = 0; + maybeShrink(); } protected <P> void drain(P param, BiConsumer<P, N> consumer) @@ -315,6 +331,7 @@ public abstract class IntrusivePriorityHeap<N extends IntrusivePriorityHeap.Node } Arrays.fill(heap, 0, size, null); heapifiedSize = size = 0; + maybeShrink(); } /** @@ -343,6 +360,8 @@ public abstract class IntrusivePriorityHeap<N extends IntrusivePriorityHeap.Node { Arrays.fill(heap, size - removedCount, size, null); size -= removedCount; + if (size == 0) + maybeShrink(); } } diff --git a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java index 9bf506ff..1cc17d39 100644 --- a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java +++ b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java @@ -70,6 +70,7 @@ public class ReducingIntervalMap<K extends Comparable<? super K>, V> ReducingIntervalMap(K[] starts, V[] values) { Invariants.requireArgument(starts.length == values.length + 1 || (starts.length + values.length) == 0); + Invariants.require(values.length != 1 || values[0] != null); this.starts = starts; this.values = values; } diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java index 3ae1ab0a..7b3135d5 100644 --- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java +++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java @@ -502,6 +502,11 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> } } + public Ranges ranges() + { + return ranges(Objects::nonNull); + } + public Ranges ranges(Predicate<V> include) { Range[] ranges = new Range[values.length]; diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java index 17c785d8..85ccedc3 100644 --- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java +++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java @@ -399,7 +399,7 @@ public class RemoteListenersTest ignore -> new ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(null, new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultLocalListeners.DefaultNotifySink.INSTANCE), - new EpochUpdateHolder()); + CommandStores.RangesForEpoch.EMPTY); this.storeId = id; } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index ce54760e..43cf74c8 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -441,7 +441,7 @@ public class Cluster return stats; } - static class RandomLoader + public static class RandomLoader { private final BooleanSupplier cacheEmptyChance; private final BooleanSupplier cacheFullChance; @@ -451,22 +451,27 @@ public class Cluster final BooleanSupplier cmdCheckChance; final BooleanSupplier cfkCheckChance; + + public static float CMD_BASE_CHECK_CHANCE = 0.01f; static int cmdCounter, cfkCounter; RandomLoader(RandomSource random) { - this(random.nextBoolean() ? 1.0f : random.nextFloat(), random); + this(random.nextBoolean() ? 1.0f : random.nextFloat(), + random.nextFloat() * CMD_BASE_CHECK_CHANCE, + random.nextFloat() * 0.2f, + random); } - RandomLoader(float presentChance, RandomSource random) + RandomLoader(float presentChance, float cmdCheckChance, float cfkCheckChance, RandomSource random) { this(Gens.supplier(Gens.bools().mixedDistribution().next(random), random), Gens.supplier(Gens.bools().mixedDistribution().next(random), random), random.biasedUniformBools(presentChance), random.biasedUniformBools(presentChance), random.biasedUniformBools(presentChance), - Invariants.testParanoia(LINEAR, LINEAR, HIGH) ? Gens.supplier(Gens.bools().mixedDistribution().next(random), random) : () -> random.decide(0.001f), - () -> random.decide(0.1f) + random.biasedUniformBools(cmdCheckChance), + random.biasedUniformBools(cfkCheckChance) ); } @@ -578,7 +583,6 @@ public class Cluster } }; } - } public static Map<MessageType, Stats> run(Id[] nodes, int[] prefixes, MessageListener messageListener, Supplier<PendingQueue> queueSupplier, diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index b80b9824..bdc52ee0 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -107,6 +107,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread ranges, Arrays.toString(shards)) .restore(); Invariants.require(previouslyOwned.regains(ranges.currentRanges()).equals(regainingRanges)); + commandStore.loadRangesForEpoch(ranges); ShardHolder shard = new ShardHolder(commandStore, ranges, previouslyOwned.regains(ranges.currentRanges())); shards[i++] = shard; } @@ -224,26 +225,21 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread private Task<?> active; private Thread activeThread; - public DelayedCommandStore(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, SimulatedDelayedExecutorService executor, CacheLoading cacheLoading, Journal journal) + public DelayedCommandStore(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, RangesForEpoch rangesForEpoch, SimulatedDelayedExecutorService executor, CacheLoading cacheLoading, Journal journal) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, journal); + super(id, time, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, journal); this.executor = executor; this.cacheLoading = cacheLoading; this.journal = journal; restore(); } - protected void loadRedundantBefore(RedundantBefore redundantBefore) + protected void loadRedundantBefore(RedundantBefore newRedundantBefore) { - if (redundantBefore == null) - { - Invariants.require(unsafeGetRedundantBefore().size() == 0); - } - else - { - unsafeClearRedundantBefore(); - super.loadRedundantBefore(redundantBefore); - } + if (newRedundantBefore == null || newRedundantBefore.isEmpty()) + return; + + super.loadRedundantBefore(newRedundantBefore); } protected void loadBootstrapBeganAt(NavigableMap<TxnId, Ranges> bootstrapBeganAt) @@ -263,12 +259,10 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread @Override protected void loadRangesForEpoch(RangesForEpoch newRangesForEpoch) { - if (newRangesForEpoch == null) Invariants.require(super.rangesForEpoch == null); - else - { - unsafeClearRangesForEpoch(); - super.loadRangesForEpoch(newRangesForEpoch); - } + if (newRangesForEpoch == null) + return; + + super.loadRangesForEpoch(newRangesForEpoch); } @Override diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index a4aa44f1..394cd12e 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -217,9 +217,6 @@ public class InMemoryJournal implements Journal private Builder reconstruct(Diffs files, Load load) { - if (files == null) - return null; - Builder builder = null; List<Diff> saved = files.sorted(false); for (int i = saved.size() - 1; i >= 0; i--) @@ -500,7 +497,19 @@ public class InMemoryJournal implements Journal for (Map.Entry<TxnId, Diffs> e2 : localJournal.entrySet()) { Diffs diffs = e2.getValue(); - if (diffs.isEmpty()) continue; + if (diffs.isEmpty()) + { + if (diffs.flushed instanceof FinalList) + { + Builder builder = new Builder(e2.getKey(), ALL); + Cleanup cleanup; + try { cleanup = builder.shouldCleanup(FULL, store.unsafeGetRedundantBefore(), store.durableBefore()); } + catch (LogUnavailableException ignore) { continue; } + + Invariants.require(cleanup.compareTo(((FinalList) diffs.flushed).cleanup()) >= 0); + } + continue; + } Diffs subset = diffs; { @@ -562,7 +571,7 @@ public class InMemoryJournal implements Journal ++counter; Cleanup cleanup; - try {cleanup = builder.shouldCleanup(input, store.unsafeGetRedundantBefore(), store.durableBefore()); } + try { cleanup = builder.shouldCleanup(input, store.unsafeGetRedundantBefore(), store.durableBefore()); } catch (LogUnavailableException ignore) {cleanup = ERASE; } cleanup = builder.maybeCleanup(true, cleanup); @@ -682,7 +691,7 @@ public class InMemoryJournal implements Journal private static abstract class FinalList extends AbstractList<Diff> { - + abstract Cleanup cleanup(); } private static class ErasedList extends FinalList @@ -728,10 +737,18 @@ public class InMemoryJournal implements Journal } return super.set(index, diff); } + + @Override + Cleanup cleanup() + { + return ERASE; + } } private static class PurgedList extends FinalList { + PurgedList() {} + @Override public Diff get(int index) { @@ -752,6 +769,12 @@ public class InMemoryJournal implements Journal return false; throw illegalState(); } + + @Override + Cleanup cleanup() + { + return EXPUNGE; + } } private static Diff toDiff(@Nonnull CommandUpdate update) diff --git a/accord-core/src/test/java/accord/local/MaybeExecuteAdapterTest.java b/accord-core/src/test/java/accord/local/MaybeExecuteAdapterTest.java new file mode 100644 index 00000000..b0ea9bd3 --- /dev/null +++ b/accord-core/src/test/java/accord/local/MaybeExecuteAdapterTest.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.local; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import accord.api.Agent; +import accord.api.ProgressLog.NoOpProgressLog; +import accord.api.RoutingKey; +import accord.api.Scheduler; +import accord.coordinate.CoordinationAdapter; +import accord.impl.DefaultLocalListeners; +import accord.impl.DefaultRemoteListeners; +import accord.impl.DefaultTimeouts; +import accord.impl.InMemoryCommandStore; +import accord.impl.InMemoryCommandStores; +import accord.impl.IntKey; +import accord.impl.SizeOfIntersectionSorter; +import accord.impl.TestAgent; +import accord.impl.TopologyFactory; +import accord.impl.basic.InMemoryJournal; +import accord.impl.mock.MockCluster; +import accord.impl.mock.MockStore; +import accord.impl.mock.MockTopologyService; +import accord.local.Node.Id; +import accord.local.UniqueTimeService.AtomicUniqueTime; +import accord.primitives.Ballot; +import accord.primitives.Deps; +import accord.primitives.FullKeyRoute; +import accord.primitives.Keys; +import accord.primitives.Range; +import accord.primitives.RangeDeps; +import accord.primitives.Ranges; +import accord.primitives.RoutingKeys; +import accord.primitives.SaveStatus; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.topology.Topology; +import accord.utils.DefaultRandom; +import accord.utils.ImmutableBitSet; +import accord.utils.LargeBitSet; +import accord.utils.RandomSource; +import accord.utils.async.AsyncChainUtils; + +import static accord.Utils.id; +import static accord.Utils.writeTxn; +import static accord.primitives.Status.Durability.NotDurable; + +/** + * Locks down the {@link Commands.MaybeExecuteAdapter} contract: + * + * <em>Every invocation of + * {@link Commands#maybeExecute(SafeCommandStore, SafeCommand, Command, boolean, boolean, Commands.MaybeExecuteAdapter)} + * must invoke exactly one of {@code adapter.notifyWaiting} or {@code adapter.notWaiting} before + * returning.</em> + * + * <p>Some adapters (notably {@code NotifyWaitingOnPlus.adapter(continuation,...)} used by + * {@link CommandStore#tryToExecuteListeningTxns(boolean)}) chain a continuation off of + * {@code notWaiting}; a return path that fires neither callback silently abandons the + * continuation and can deadlock the {@code AsyncResult} returned by + * {@code tryToExecuteListeningTxns}. That bug used to manifest on startup of nodes with a large + * backlog of registered listeners whose target txns were {@code Stable}/{@code PreApplied} with + * only outstanding *key* waits (no command waits), because + * {@code waitingOn.isWaitingOnCommand()} was the only condition under which the adapter was + * contacted on that return path. + * + * <p>This test intentionally bypasses the + * {@link SafeCommand#update(SafeCommandStore, Command, boolean)} machinery (and the + * {@code CommandsForKey}, progress log, etc. side effects it triggers) by installing the + * synthetic command directly into the underlying {@link InMemoryCommandStore.GlobalCommand}. + * That keeps the test scoped to the branching inside {@code Commands.maybeExecute}. + */ +public class MaybeExecuteAdapterTest +{ + // Topology / route / key plumbing. + private static final Id ID1 = id(1); + private static final Id ID2 = id(2); + private static final Id ID3 = id(3); + private static final List<Id> IDS = Lists.newArrayList(ID1, ID2, ID3); + private static final Range FULL_RANGE = IntKey.range(0, 100); + private static final Ranges FULL_RANGES = Ranges.single(FULL_RANGE); + private static final Topology TOPOLOGY = TopologyFactory.toTopology(IDS, 3, FULL_RANGE); + private static final IntKey.Raw KEY = IntKey.key(10); + private static final RoutingKey HOME_KEY = KEY.toUnseekable(); + private static final FullKeyRoute ROUTE = RoutingKeys.of(HOME_KEY).toRoute(HOME_KEY); + private static final Keys KEYS = Keys.of(KEY); + + /** Counts adapter callbacks, ignoring their inputs. We are only verifying the contract. */ + private static class Recorder implements Commands.MaybeExecuteAdapter + { + int notifyWaiting; + int notWaiting; + + @Override public void notifyWaiting(SafeCommandStore safeStore, SafeCommand safeCommand) { ++notifyWaiting; } + @Override public void notWaiting(SafeCommandStore safeStore) { ++notWaiting; } + } + + + private static class CommandStoreSupport + { + final AtomicReference<Topology> local = new AtomicReference<>(TOPOLOGY); + final MockStore data = new MockStore(); + } + + private static InMemoryCommandStore.Synchronized createStore(CommandStoreSupport support) + { + MockCluster.Clock clock = new MockCluster.Clock(100); + Agent agent = new TestAgent(clock); + RandomSource random = new DefaultRandom(); + Node node = new Node(ID1, null, new MockTopologyService(ignore -> null, support.local.get()), + clock, new AtomicUniqueTime(clock), + () -> support.data, + new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), + agent, random.fork(), Scheduler.NEVER_RUN_SCHEDULED, + SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, + time -> new DefaultTimeouts(time, Runnable::run), + ignore -> ignore2 -> new NoOpProgressLog(), + DefaultLocalListeners.Factory::new, + InMemoryCommandStores.Synchronized::new, + new CoordinationAdapter.DefaultFactory(), + DurableBefore.NOOP_PERSISTER, + new InMemoryJournal(ID1, random.fork())); + AsyncChainUtils.awaitUninterruptibly(node.unsafeStart().chain()); + return (InMemoryCommandStore.Synchronized) node.unsafeByIndex(0); + } + + /** + * Build a {@code Stable} {@link Command} with the supplied {@link Command.WaitingOn}. The + * {@code waitingOn} can be any shape (key-only, command-only, mixed, empty) regardless of + * whether it is semantically consistent with the rest of the command: {@link Command#validate} + * only requires {@code waitingOn != null} for stable/committed statuses, and we are testing + * pure {@code maybeExecute} branching here. + */ + private static Command buildStable(TxnId txnId, Command.WaitingOn waitingOn) + { + Txn txn = writeTxn(KEYS); + return new CommandBuilder(txnId) + .durability(NotDurable) + .participants(StoreParticipants.all(ROUTE)) + .promised(Ballot.ZERO) + .acceptedOrCommitted(Ballot.ZERO) + .partialTxn(txn.slice(FULL_RANGES, true)) + .partialDeps(Deps.NONE.intersecting(ROUTE)) + .executeAt(txnId) + .waitingOn(waitingOn) + .build(SaveStatus.Stable); + } + + private static Command buildPreAccepted(TxnId txnId) + { + Txn txn = writeTxn(KEYS); + return new CommandBuilder(txnId) + .durability(NotDurable) + .participants(StoreParticipants.all(ROUTE)) + .promised(Ballot.ZERO) + .acceptedOrCommitted(Ballot.ZERO) + .partialTxn(txn.slice(FULL_RANGES, true)) + .executeAt(txnId) + .build(SaveStatus.PreAccepted); + } + + private static Command.WaitingOn waitingOnKeysOnly(int numKeyBits) + { + // 0 txn waits + N key waits => bitset of size numKeyBits; all bits live in the key region + // (because txnIdCount() = directRangeDeps.txnIdCount() = 0). This is the data shape that + // tripped the old maybeExecute key-only branch. + RoutingKey[] keys = new RoutingKey[numKeyBits]; + for (int i = 0; i < numKeyBits; ++i) + keys[i] = IntKey.routing(20 + i); + RoutingKeys waitKeys = RoutingKeys.of(keys); + LargeBitSet bits = new LargeBitSet(waitKeys.size()); + bits.setRange(0, waitKeys.size()); + return new Command.WaitingOn(waitKeys, RangeDeps.NONE, + new ImmutableBitSet(bits), + new ImmutableBitSet(0)); + } + + private static Command.WaitingOn waitingOnCommandsOnly(TxnId... depTxnIds) + { + Range[] ranges = new Range[]{ IntKey.range(0, 5) }; + // SerializerSupport's create() takes a flat int[] of [range0_end, range1_end, ..., dep0_idx, dep1_idx, ...] + // (per the existing AccordCommandStoreTryExecuteListeningTest helper). + int[] rangesToTxnIds = new int[ranges.length + depTxnIds.length]; + rangesToTxnIds[0] = rangesToTxnIds.length; // a single range covers all dep ids + for (int i = 0; i < depTxnIds.length; ++i) + rangesToTxnIds[ranges.length + i] = i; + RangeDeps directRangeDeps = RangeDeps.SerializerSupport.create(ranges, depTxnIds, rangesToTxnIds, null); + + // bitset size = directRangeDeps.txnIdCount() + 0 keys = depTxnIds.length; set all command bits + LargeBitSet bits = new LargeBitSet(depTxnIds.length); + bits.setRange(0, depTxnIds.length); + return new Command.WaitingOn(RoutingKeys.EMPTY, directRangeDeps, + new ImmutableBitSet(bits), + new ImmutableBitSet(0)); + } + + private static Command.WaitingOn waitingOnMixed(TxnId depTxnId, RoutingKey... keys) + { + Range[] ranges = new Range[]{ IntKey.range(0, 5) }; + TxnId[] depTxnIds = new TxnId[]{ depTxnId }; + int[] rangesToTxnIds = new int[]{ 2, 0 }; + RangeDeps directRangeDeps = RangeDeps.SerializerSupport.create(ranges, depTxnIds, rangesToTxnIds, null); + + RoutingKeys waitKeys = RoutingKeys.of(keys); + int size = directRangeDeps.txnIdCount() + waitKeys.size(); + LargeBitSet bits = new LargeBitSet(size); + bits.setRange(0, size); + return new Command.WaitingOn(waitKeys, directRangeDeps, + new ImmutableBitSet(bits), + new ImmutableBitSet(0)); + } + + private static TxnId nextTxnId(MockCluster.Clock clock, int n) + { + return clock.idForNode(1, n); + } + + /** Directly inject {@code command} into the store as the current state of {@code txnId}. */ + private static void installCommand(InMemoryCommandStore.Synchronized commands, TxnId txnId, Command command) + { + commands.command(txnId).value(command); + } + + /** + * Open a SafeCommandStore against {@code txnId} and invoke + * {@link Commands#maybeExecute(SafeCommandStore, SafeCommand, Command, boolean, boolean, Commands.MaybeExecuteAdapter)} + * with the supplied command and recorder. + */ + private static void runMaybeExecute(InMemoryCommandStore.Synchronized commands, + TxnId txnId, + Command command, + boolean alwaysNotifyListeners, + boolean notifyWaitingOn, + Recorder rec) + { + commands.execute(() -> { + SafeCommandStore safeStore = commands.beginOperation(PreLoadContext.contextFor(txnId, "Test"), null); + try + { + SafeCommand safeCommand = safeStore.unsafeGet(txnId); + Commands.maybeExecute(safeStore, safeCommand, command, alwaysNotifyListeners, notifyWaitingOn, rec); + } + finally + { + commands.completeOperation(safeStore); + } + }); + } + + // -------- The tests -------- + + /** + * Sanity: confirm the helpers produce {@code WaitingOn} instances with the bit-shape we + * intend to drive {@code maybeExecute} into. If this ever stops holding (e.g. because of an + * encoding change to {@code WaitingOn}), the dependent tests below would silently fail to + * exercise the buggy branch. + */ + @Test + void sanityHelpersProduceExpectedShapes() + { + Command.WaitingOn keyOnly = waitingOnKeysOnly(2); + Assertions.assertTrue(keyOnly.isWaiting(), "keyOnly should be waiting"); + Assertions.assertFalse(keyOnly.isWaitingOnCommand(),"keyOnly should NOT be waiting on a command"); + Assertions.assertTrue(keyOnly.isWaitingOnKey(), "keyOnly should be waiting on a key"); + + MockCluster.Clock clock = new MockCluster.Clock(100); + TxnId dep = nextTxnId(clock, 2); + Command.WaitingOn cmdOnly = waitingOnCommandsOnly(dep); + Assertions.assertTrue(cmdOnly.isWaiting()); + Assertions.assertTrue(cmdOnly.isWaitingOnCommand()); + Assertions.assertFalse(cmdOnly.isWaitingOnKey()); + + Command.WaitingOn mixed = waitingOnMixed(dep, IntKey.routing(20)); + Assertions.assertTrue(mixed.isWaiting()); + Assertions.assertTrue(mixed.isWaitingOnCommand()); + Assertions.assertTrue(mixed.isWaitingOnKey()); + } + + /** + * Regression test for the deadlock: a {@code Stable} target with only key waits used to fire + * neither adapter callback, silently dropping any continuation chained off of {@code notWaiting}. + */ + @Test + void notWaitingFiresForStableWithOnlyKeyWaits() + { + InMemoryCommandStore.Synchronized commands = createStore(new CommandStoreSupport()); + MockCluster.Clock clock = new MockCluster.Clock(100); + TxnId txnId = nextTxnId(clock, 1); + + Command stable = buildStable(txnId, waitingOnKeysOnly(1)); + installCommand(commands, txnId, stable); + + Recorder rec = new Recorder(); + runMaybeExecute(commands, txnId, stable, false, true, rec); + + Assertions.assertEquals(0, rec.notifyWaiting, + "notifyWaiting must NOT be called when waitingOn has no command bits"); + Assertions.assertEquals(1, rec.notWaiting, + "notWaiting must be called exactly once when waitingOn has no command bits"); + } + + /** {@code Stable} with only command waits should report via {@code notifyWaiting}. */ + @Test + void notifyWaitingFiresForStableWithOnlyCommandWaits() + { + InMemoryCommandStore.Synchronized commands = createStore(new CommandStoreSupport()); + MockCluster.Clock clock = new MockCluster.Clock(100); + TxnId txnId = nextTxnId(clock, 1); + TxnId depTxnId = nextTxnId(clock, 2); + + Command stable = buildStable(txnId, waitingOnCommandsOnly(depTxnId)); + installCommand(commands, txnId, stable); + + Recorder rec = new Recorder(); + runMaybeExecute(commands, txnId, stable, false, true, rec); + + Assertions.assertEquals(1, rec.notifyWaiting, + "notifyWaiting must be called exactly once when waitingOn still has command bits"); + Assertions.assertEquals(0, rec.notWaiting, + "notWaiting must NOT be called when we are still waiting on a command"); + } + + /** {@code Stable} with both command and key waits is still "waiting on a command" overall. */ + @Test + void notifyWaitingFiresForStableWithMixedWaits() + { + InMemoryCommandStore.Synchronized commands = createStore(new CommandStoreSupport()); + MockCluster.Clock clock = new MockCluster.Clock(100); + TxnId txnId = nextTxnId(clock, 1); + TxnId depTxnId = nextTxnId(clock, 2); + + Command stable = buildStable(txnId, waitingOnMixed(depTxnId, IntKey.routing(20))); + installCommand(commands, txnId, stable); + + Recorder rec = new Recorder(); + runMaybeExecute(commands, txnId, stable, false, true, rec); + + Assertions.assertEquals(1, rec.notifyWaiting); + Assertions.assertEquals(0, rec.notWaiting); + } + + /** + * When the caller asks for "do not propagate to dependencies" ({@code notifyWaitingOn = false}), + * we still owe the adapter a {@code notWaiting} signal on the waiting path; otherwise any + * adapter that chains a continuation off of {@code notWaiting} would silently lose it. + */ + @Test + void notWaitingFiresWhenNotifyWaitingOnIsFalse() + { + InMemoryCommandStore.Synchronized commands = createStore(new CommandStoreSupport()); + MockCluster.Clock clock = new MockCluster.Clock(100); + TxnId txnId = nextTxnId(clock, 1); + TxnId depTxnId = nextTxnId(clock, 2); + + // notifyWaitingOn = false suppresses notifyWaiting; the symmetric notWaiting still must fire. + Command stable = buildStable(txnId, waitingOnCommandsOnly(depTxnId)); + installCommand(commands, txnId, stable); + + Recorder rec = new Recorder(); + runMaybeExecute(commands, txnId, stable, false, false, rec); + + Assertions.assertEquals(0, rec.notifyWaiting); + Assertions.assertEquals(1, rec.notWaiting); + } + + /** + * A non-{@code Stable}/{@code PreApplied} target hits the early-return branch; that branch + * must also fire {@code notWaiting} (existing behaviour; protect against regression). + */ + @Test + void notWaitingFiresForNonExecutingSaveStatus() + { + InMemoryCommandStore.Synchronized commands = createStore(new CommandStoreSupport()); + MockCluster.Clock clock = new MockCluster.Clock(100); + TxnId txnId = nextTxnId(clock, 1); + + Command preaccepted = buildPreAccepted(txnId); + installCommand(commands, txnId, preaccepted); + + Recorder rec = new Recorder(); + runMaybeExecute(commands, txnId, preaccepted, false, true, rec); + + Assertions.assertEquals(0, rec.notifyWaiting); + Assertions.assertEquals(1, rec.notWaiting, + "notWaiting must fire from the non-Stable/PreApplied early-return branch"); + } +} \ No newline at end of file diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index b48128e5..5836a44b 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -59,6 +59,7 @@ import accord.local.CommandBuilder; import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores; +import accord.local.CommandStores.RangesForEpoch; import accord.local.Node; import accord.local.NodeCommandStoreService; import accord.local.PreLoadContext; @@ -898,7 +899,7 @@ public class CommandsForKeyTest } @Override - public CommandStores.RangesForEpoch ranges() + public RangesForEpoch ranges() { throw new UnsupportedOperationException(); } @@ -980,7 +981,7 @@ public class CommandsForKeyTest null, ignore -> new ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(null, new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE), - new EpochUpdateHolder()); + RangesForEpoch.EMPTY); this.pruneInterval = pruneInterval; this.pruneHlcDelta = pruneHlcDelta; this.maxConflictsHlcDelta = maxConflictsHlcDelta; diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java b/accord-core/src/test/java/accord/utils/AccordGens.java index d4040a74..81cc93c1 100644 --- a/accord-core/src/test/java/accord/utils/AccordGens.java +++ b/accord-core/src/test/java/accord/utils/AccordGens.java @@ -151,9 +151,10 @@ public class AccordGens public static Gen<TxnId> txnIds(Gen.LongGen epochs, Gen.LongGen hlcs, Gen.IntGen nodes, Gen<Txn.Kind> kinds, Gen<Domain> domains, Gen<Cardinality> cardinalities) { return rs -> { - Domain domain = domains.next(rs); + Txn.Kind kind = kinds.next(rs); + Domain domain = kind.isSyncPoint() ? Domain.Range : domains.next(rs); Cardinality cardinality = domain == Domain.Range ? Any : cardinalities.next(rs); - return new TxnId(epochs.nextLong(rs), hlcs.nextLong(rs), 0, kinds.next(rs), domain, cardinality, new Node.Id(nodes.nextInt(rs))); + return new TxnId(epochs.nextLong(rs), hlcs.nextLong(rs), 0, kind, domain, cardinality, new Node.Id(nodes.nextInt(rs))); }; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
