This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 756f2a1f (Accord) Fix: - Attempt to fix CommandsForKey StackOverflowError (presumed to be reachable via SaveStatus->InternalStatus map returning null) - Bound recursion with SafeCommandStore.tryRecurse() - IndexOutOfBoundsException in CINTIA checkpoint list encoding bounds logic - Maintain MaxDecidedRX to save majority of work when deciding RX that are newer than those we have previously agreed - Introduce IntervalBTree and use in CommandsForRanges to limit time spent in cri [...] 756f2a1f is described below commit 756f2a1f88f079e74c34fd8e0f3bb5aa98760bef Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Sun Jun 22 12:14:55 2025 +0100 (Accord) Fix: - Attempt to fix CommandsForKey StackOverflowError (presumed to be reachable via SaveStatus->InternalStatus map returning null) - Bound recursion with SafeCommandStore.tryRecurse() - IndexOutOfBoundsException in CINTIA checkpoint list encoding bounds logic - Maintain MaxDecidedRX to save majority of work when deciding RX that are newer than those we have previously agreed - Introduce IntervalBTree and use in CommandsForRanges to limit time spent in critical section when there are millions of RX patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20727 --- .../accord/coordinate/CoordinateTransaction.java | 12 +-- .../accord/impl/AbstractConfigurationService.java | 6 +- .../java/accord/impl/DefaultLocalListeners.java | 6 +- .../java/accord/impl/InMemoryCommandStore.java | 1 - .../src/main/java/accord/local/CommandStore.java | 20 ++++- .../main/java/accord/local/CommandSummaries.java | 8 +- .../src/main/java/accord/local/MaxDecidedRX.java | 66 ++++++++++++++++ .../main/java/accord/local/SafeCommandStore.java | 22 ++++++ .../main/java/accord/local/cfk/CommandsForKey.java | 91 +++++++++++++--------- .../src/main/java/accord/local/cfk/NotifySink.java | 17 +++- .../main/java/accord/local/cfk/PostProcess.java | 23 ++++-- .../src/main/java/accord/local/cfk/Updating.java | 12 ++- .../main/java/accord/topology/TopologyManager.java | 2 +- .../java/accord/utils/BTreeReducingRangeMap.java | 31 ++++++++ .../java/accord/utils/CheckpointIntervalArray.java | 15 +++- .../utils/CheckpointIntervalArrayBuilder.java | 19 +++-- 16 files changed, 274 insertions(+), 77 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java index 70b4884a..696f487c 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java @@ -195,15 +195,17 @@ public class CoordinateTransaction extends CoordinatePreAccept<Result> case PENDING: this.cancel = cancel; this.timeout = timeout; - return; + break; case TIMEOUT: - timeout = null; + if (cancel != null) + cancel.cancel(); + break; case SUCCESS: - cancel = null; + if (timeout != null) + timeout.cancel(); + break; } } - if (cancel != null) cancel.cancel(); - if (timeout != null) timeout.cancel(); } @Override diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java index 72b8119d..0c4d0e56 100644 --- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -345,9 +345,9 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo if (epochs.wasTruncated(ready.epoch)) return; - ready.metadata.invokeIfSuccess(() -> epochs.acknowledge(ready)); - ready.coordinate.invokeIfSuccess(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync)); - ready.reads.invokeIfSuccess(() -> localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology)); + ready.metadata.invokeIfSuccess(() -> epochs.acknowledge(ready)).begin(agent); + ready.coordinate.invokeIfSuccess(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync)).begin(agent); + ready.reads.invokeIfSuccess(() -> localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology)).begin(agent); } protected void topologyUpdatePostListenerNotify(Topology topology) {} diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java index 25e8b6ea..af7a2b02 100644 --- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java +++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java @@ -85,7 +85,11 @@ public class DefaultLocalListeners implements LocalListeners public void notify(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId listenerId) { SafeCommand listener = safeStore.ifLoadedAndInitialised(listenerId); - if (listener != null) Commands.listenerUpdate(safeStore, listener, safeCommand); + if (listener != null && safeStore.tryRecurse()) + { + try { Commands.listenerUpdate(safeStore, listener, safeCommand); } + finally { safeStore.unrecurse(); } + } else { //noinspection SillyAssignment,ConstantConditions diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 5cd5174e..950a7e3f 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -739,7 +739,6 @@ public abstract class InMemoryCommandStore extends CommandStore final Kinds kinds = new Kinds(Read, ExclusiveSyncPoint); return commandsForRanges = new ByTxnIdSnapshot() { - @Override public boolean mayContainAny(Txn.Kind kind) { return kinds.test(kind); } @Override public NavigableMap<Timestamp, Summary> byTxnId() { return summaries; } }; } diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index d02f0828..96830653 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -146,6 +146,7 @@ public abstract class CommandStore implements SequentialAsyncExecutor private transient NavigableMap<TxnId, Ranges> bootstrapBeganAt = emptyBootstrapBeganAt(); // additive (i.e. once inserted, rolled-over until invalidated, and the floor entry contains additions) private RedundantBefore redundantBefore = RedundantBefore.EMPTY; private MaxConflicts maxConflicts = MaxConflicts.EMPTY; + private MaxDecidedRX maxDecidedRX = MaxDecidedRX.EMPTY; private int maxConflictsUpdates = 0; protected RangesForEpoch rangesForEpoch; @@ -245,6 +246,11 @@ public abstract class CommandStore implements SequentialAsyncExecutor return rangesForEpoch; } + public MaxDecidedRX unsafeGetMaxDecidedRX() + { + return maxDecidedRX; + } + final void unsafeSetRangesForEpoch(RangesForEpoch newRangesForEpoch) { rangesForEpoch = nonNull(newRangesForEpoch); @@ -309,6 +315,11 @@ public abstract class CommandStore implements SequentialAsyncExecutor protected abstract void registerTransitive(SafeCommandStore safeStore, RangeDeps deps); + protected void unsafeSetMaxDecidedRX(MaxDecidedRX newMaxDecidedRX) + { + this.maxDecidedRX = newMaxDecidedRX; + } + protected void unsafeSetRejectBefore(RejectBefore newRejectBefore) { this.rejectBefore = newRejectBefore; @@ -453,7 +464,7 @@ public abstract class CommandStore implements SequentialAsyncExecutor setMaxConflicts(updatedMaxConflicts); } - public final void markExclusiveSyncPoint(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) + final void markExclusiveSyncPoint(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) { // TODO (desired): narrow ranges to those that are owned Invariants.requireArgument(txnId.is(ExclusiveSyncPoint)); @@ -462,7 +473,12 @@ public abstract class CommandStore implements SequentialAsyncExecutor unsafeSetRejectBefore(newRejectBefore); } - public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) + final void markExclusiveSyncPointDecided(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) + { + unsafeSetMaxDecidedRX(maxDecidedRX.update(ranges, txnId)); + } + + final void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) { // TODO (desired): narrow ranges to those that are owned Invariants.requireArgument(txnId.is(ExclusiveSyncPoint)); diff --git a/accord-core/src/main/java/accord/local/CommandSummaries.java b/accord-core/src/main/java/accord/local/CommandSummaries.java index 24afd6e6..f9724155 100644 --- a/accord-core/src/main/java/accord/local/CommandSummaries.java +++ b/accord-core/src/main/java/accord/local/CommandSummaries.java @@ -31,7 +31,6 @@ import accord.primitives.Ranges; import accord.primitives.SaveStatus; import accord.primitives.Status; import accord.primitives.Timestamp; -import accord.primitives.Txn; import accord.primitives.Txn.Kind.Kinds; import accord.primitives.TxnId; import accord.primitives.Unseekable; @@ -107,7 +106,7 @@ public interface CommandSummaries { public interface Factory<L extends Loader> { - L create(Unseekables<?> searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep); + L create(@Nullable TxnId primaryTxnId, Unseekables<?> searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep); } protected final Unseekables<?> searchKeysOrRanges; @@ -135,10 +134,10 @@ public interface CommandSummaries Timestamp maxTxnId = primaryTxnId == null || keyHistory == KeyHistory.RECOVER || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : primaryTxnId; TxnId findAsDep = primaryTxnId != null && keyHistory == KeyHistory.RECOVER ? primaryTxnId : null; Kinds kinds = primaryTxnId == null ? AnyGloballyVisible : primaryTxnId.witnesses().or(keyHistory == KeyHistory.RECOVER ? primaryTxnId.witnessedBy() : Nothing); - return factory.create(keysOrRanges, redundantBefore, kinds, minTxnId, maxTxnId, findAsDep); + return factory.create(primaryTxnId, keysOrRanges, redundantBefore, kinds, minTxnId, maxTxnId, findAsDep); } - public Loader(Unseekables<?> searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep) + public Loader(@Nullable TxnId primaryTxnId, Unseekables<?> searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep) { this.searchKeysOrRanges = searchKeysOrRanges; this.redundantBefore = redundantBefore; @@ -274,7 +273,6 @@ public interface CommandSummaries interface ByTxnIdSnapshot extends CommandSummaries { NavigableMap<Timestamp, Summary> byTxnId(); - default boolean mayContainAny(Txn.Kind kind) { return true; } default boolean visit(Unseekables<?> keysOrRanges, TxnId testTxnId, diff --git a/accord-core/src/main/java/accord/local/MaxDecidedRX.java b/accord-core/src/main/java/accord/local/MaxDecidedRX.java new file mode 100644 index 00000000..616df546 --- /dev/null +++ b/accord-core/src/main/java/accord/local/MaxDecidedRX.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package accord.local; + +import accord.api.RoutingKey; +import accord.primitives.Routables; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; +import accord.utils.BTreeReducingRangeMap; + +public class MaxDecidedRX extends BTreeReducingRangeMap<TxnId> +{ + public static final MaxDecidedRX EMPTY = new MaxDecidedRX(); + + private MaxDecidedRX() + { + super(); + } + + private MaxDecidedRX(boolean inclusiveEnds, Object[] tree) + { + super(inclusiveEnds, tree); + } + + TxnId get(Routables<?> keysOrRanges) + { + return foldl(keysOrRanges, TxnId::max, TxnId.NONE); + } + + public MaxDecidedRX update(Unseekables<?> keysOrRanges, TxnId maxId) + { + // note: we use mergeMax to ensure we take the maximum epoch and hlc independently from any conflict + // this is particularly essential for propagating unique HLCs, so that bootstrap recipients don't + // begin serving reads too early + return update(this, keysOrRanges, maxId, TxnId::max, MaxDecidedRX::new, Builder::new); + } + + private static class Builder extends AbstractBoundariesBuilder<RoutingKey, TxnId, MaxDecidedRX> + { + protected Builder(boolean inclusiveEnds, int capacity) + { + super(inclusiveEnds, capacity); + } + + @Override + protected MaxDecidedRX buildInternal(Object[] tree) + { + return new MaxDecidedRX(inclusiveEnds, tree); + } + } +} diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 86fc58cd..8543e472 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -58,6 +58,7 @@ import static accord.local.cfk.UpdateUnmanagedMode.REGISTER; import static accord.primitives.Routable.Domain.Range; import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.SaveStatus.Applied; +import static accord.primitives.SaveStatus.Committed; import static accord.primitives.SaveStatus.Uninitialised; import static accord.primitives.Timestamp.Flag.SHARD_BOUND; import static accord.utils.Invariants.illegalArgument; @@ -71,6 +72,21 @@ import static accord.utils.Invariants.illegalState; */ public abstract class SafeCommandStore implements RangesForEpochSupplier, RedundantBeforeSupplier, CommandSummaries { + private static final int MAX_REENTRANCY = 100; + private int reentrancyCounter; + public boolean tryRecurse() + { + if (reentrancyCounter == MAX_REENTRANCY) + return false; + ++reentrancyCounter; + return true; + } + public void unrecurse() + { + --reentrancyCounter; + Invariants.require(reentrancyCounter >= 0); + } + /** * If the transaction exists (with some associated data) in the CommandStore, return it. Otherwise return null. * @@ -259,6 +275,12 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund commandStore().markExclusiveSyncPoint(this, updated.txnId(), ranges); } + if (newSaveStatus.compareTo(Committed) >= 0 && oldSaveStatus.compareTo(Committed) < 0 && !newSaveStatus.hasBeen(Status.Truncated)) + { + Ranges ranges = updated.participants().owns().toRanges(); + commandStore().markExclusiveSyncPointDecided(this, updated.txnId(), ranges); + } + if (newSaveStatus == Applied && (force || oldSaveStatus != Applied)) { Ranges ranges = updated.participants().touches().toRanges(); diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java index 191c554c..5963902b 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java @@ -19,7 +19,6 @@ package accord.local.cfk; import java.util.Arrays; -import java.util.EnumMap; import java.util.Iterator; import java.util.Objects; import java.util.function.Predicate; @@ -307,11 +306,11 @@ public class CommandsForKey extends CommandsForKeyUpdate public static boolean needsUpdate(SafeCommandStore safeStore, Command prev, Command updated) { - InternalStatus newStatus = from(safeStore, updated); + InternalStatus newStatus = from(safeStore, updated, false); if (newStatus == null) return updated.known().is(ApplyAtKnown) && updated.executeAt().hasDistinctHlcAndUniqueHlc(); - InternalStatus prevStatus = from(safeStore, prev); + InternalStatus prevStatus = from(safeStore, prev, false); if (prevStatus != newStatus) return true; @@ -857,48 +856,64 @@ public class CommandsForKey extends CommandsForKeyUpdate APPLIED_NOT_EXECUTED /*(reserved)*/ (APPLIED, true, true, false, true), INVALIDATED (SummaryStatus.INVALIDATED, false,false,false, false), ERASED (SummaryStatus.NONE, false,false,false, false), - PRUNED (SummaryStatus.NONE, false,false,false, false) + PRUNED (SummaryStatus.NONE, false,false,false, false), ; - static final EnumMap<SaveStatus, InternalStatus> FROM_SAVE_STATUS = new EnumMap<>(SaveStatus.class); + static final InternalStatus[] FROM_SAVE_STATUS = new InternalStatus[SaveStatus.values().length]; + static final InternalStatus[] FROM_SAVE_STATUS_FILTERED = new InternalStatus[SaveStatus.values().length]; static final InternalStatus[] VALUES = values(); static final SummaryStatus[] TO_SUMMARY_STATUS; + private static void putFromSaveStatus(SaveStatus from, InternalStatus to) + { + putFromSaveStatus(from, to, false); + } + + private static void putFromSaveStatus(SaveStatus from, InternalStatus to, boolean intermediate) + { + FROM_SAVE_STATUS[from.ordinal()] = to; + if (!intermediate) + FROM_SAVE_STATUS_FILTERED[from.ordinal()] = to; + } + static { - FROM_SAVE_STATUS.put(SaveStatus.PreAccepted, PREACCEPTED_WITHOUT_DEPS); - FROM_SAVE_STATUS.put(SaveStatus.PreAcceptedWithVote, PREACCEPTED_WITHOUT_DEPS); - FROM_SAVE_STATUS.put(SaveStatus.PreAcceptedWithDeps, PREACCEPTED_WITH_DEPS); - FROM_SAVE_STATUS.put(SaveStatus.AcceptedInvalidate, NOTACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.AcceptedInvalidateWithDefinition, NOTACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.AcceptedMedium, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.AcceptedMediumWithDefinition, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.AcceptedMediumWithDefAndVote, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.AcceptedSlow, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.AcceptedSlowWithDefinition, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.AcceptedSlowWithDefAndVote, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.PreCommitted, PREACCEPTED_WITHOUT_DEPS); - FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithDefinition, PREACCEPTED_WITHOUT_DEPS); - FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithDeps, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithFixedDeps, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithDefAndDeps, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithDefAndFixedDeps, ACCEPTED); - FROM_SAVE_STATUS.put(SaveStatus.Committed, COMMITTED); - FROM_SAVE_STATUS.put(SaveStatus.Stable, STABLE); - FROM_SAVE_STATUS.put(SaveStatus.ReadyToExecute, STABLE); - FROM_SAVE_STATUS.put(SaveStatus.PreApplied, STABLE); - // SaveStatus.Applying is a transient state; let PreApplied and Applied handle updates - FROM_SAVE_STATUS.put(SaveStatus.Applied, APPLIED_NOT_DURABLE); - // We don't map TruncatedApplyX or Erased as we want to retain them as APPLIED + putFromSaveStatus(SaveStatus.PreAccepted, PREACCEPTED_WITHOUT_DEPS); + putFromSaveStatus(SaveStatus.PreAcceptedWithVote, PREACCEPTED_WITHOUT_DEPS); + putFromSaveStatus(SaveStatus.PreAcceptedWithDeps, PREACCEPTED_WITH_DEPS); + putFromSaveStatus(SaveStatus.AcceptedInvalidate, NOTACCEPTED); + putFromSaveStatus(SaveStatus.AcceptedInvalidateWithDefinition, NOTACCEPTED); + putFromSaveStatus(SaveStatus.AcceptedMedium, ACCEPTED); + putFromSaveStatus(SaveStatus.AcceptedMediumWithDefinition, ACCEPTED); + putFromSaveStatus(SaveStatus.AcceptedMediumWithDefAndVote, ACCEPTED); + putFromSaveStatus(SaveStatus.AcceptedSlow, ACCEPTED); + putFromSaveStatus(SaveStatus.AcceptedSlowWithDefinition, ACCEPTED); + putFromSaveStatus(SaveStatus.AcceptedSlowWithDefAndVote, ACCEPTED); + putFromSaveStatus(SaveStatus.PreCommitted, PREACCEPTED_WITHOUT_DEPS); + putFromSaveStatus(SaveStatus.PreCommittedWithDefinition, PREACCEPTED_WITHOUT_DEPS); + putFromSaveStatus(SaveStatus.PreCommittedWithDeps, ACCEPTED); + putFromSaveStatus(SaveStatus.PreCommittedWithFixedDeps, ACCEPTED); + putFromSaveStatus(SaveStatus.PreCommittedWithDefAndDeps, ACCEPTED); + putFromSaveStatus(SaveStatus.PreCommittedWithDefAndFixedDeps, ACCEPTED); + putFromSaveStatus(SaveStatus.Committed, COMMITTED); + putFromSaveStatus(SaveStatus.Stable, STABLE); + putFromSaveStatus(SaveStatus.ReadyToExecute, STABLE); + putFromSaveStatus(SaveStatus.PreApplied, STABLE); + putFromSaveStatus(SaveStatus.Applying, STABLE); + putFromSaveStatus(SaveStatus.Applied, APPLIED_NOT_DURABLE); + putFromSaveStatus(SaveStatus.TruncatedApplyWithOutcome, APPLIED_DURABLE, true); + putFromSaveStatus(SaveStatus.TruncatedApply, APPLIED_DURABLE, true); + putFromSaveStatus(SaveStatus.TruncatedUnapplied, APPLIED_DURABLE, true); + putFromSaveStatus(SaveStatus.Erased, ERASED, true); // esp. to support pruning where we expect the prunedBefore entr*ies* to be APPLIED // Note importantly that we have multiple logical pruned befores - the last APPLIED // write per epoch is retained to cleanly support // TODO (desired): can we improve our semantics here to at least PRUNE truncated commands if there's a superseding APPLIED command? // TODO (desired): move all truncated to ERASED, but we have to handle the case we erase our prunedBefore boundary - FROM_SAVE_STATUS.put(SaveStatus.Vestigial, ERASED); - FROM_SAVE_STATUS.put(SaveStatus.Invalidated, INVALIDATED); + putFromSaveStatus(SaveStatus.Vestigial, ERASED); + putFromSaveStatus(SaveStatus.Invalidated, INVALIDATED); for (SaveStatus saveStatus : SaveStatus.values()) - Invariants.require(FROM_SAVE_STATUS.get(saveStatus) != null || saveStatus.is(Status.Truncated) || saveStatus.is(Status.NotDefined) || saveStatus == SaveStatus.Applying); + Invariants.require(FROM_SAVE_STATUS[saveStatus.ordinal()] != null || saveStatus.is(Status.NotDefined)); SummaryStatus[] summaryStatuses = SummaryStatus.values(); TO_SUMMARY_STATUS = Arrays.copyOf(summaryStatuses, summaryStatuses.length + 1); @@ -964,7 +979,7 @@ public class CommandsForKey extends CommandsForKeyUpdate @VisibleForTesting public static InternalStatus from(SaveStatus status) { - return FROM_SAVE_STATUS.get(status); + return FROM_SAVE_STATUS[status.ordinal()]; } @VisibleForTesting @@ -984,9 +999,10 @@ public class CommandsForKey extends CommandsForKeyUpdate return result; } - public static InternalStatus from(SafeCommandStore safeStore, Command command) + public static InternalStatus from(SafeCommandStore safeStore, Command command, boolean includeIntermediate) { - InternalStatus status = from(command.saveStatus()); + SaveStatus saveStatus = command.saveStatus(); + InternalStatus status = (includeIntermediate ? FROM_SAVE_STATUS : FROM_SAVE_STATUS_FILTERED)[saveStatus.ordinal()]; if (status == APPLIED_NOT_DURABLE && command.durability().isDurable()) status = APPLIED_DURABLE; if (status == PREACCEPTED_WITH_DEPS && command.txnId().node.equals(safeStore.node().id()) && !Ballot.ZERO.equals(command.promised())) @@ -1345,7 +1361,6 @@ public class CommandsForKey extends CommandsForKeyUpdate { visitor.visitMaxAppliedHlc(maxUniqueHlc); - TxnId prunedBefore = prunedBefore(); int end = insertPos(startedBefore); // We only filter durable transactions less than BOTH the txnId and executeAt of our max preceding write. // This is to avoid the following pre-bootstrap edge case, so this filtering can be made stricter in future: @@ -1546,12 +1561,14 @@ public class CommandsForKey extends CommandsForKeyUpdate private CommandsForKeyUpdate update(SafeCommandStore safeStore, Command update, @Nullable LoadingPruned loading) { Invariants.require(manages(update.txnId())); - InternalStatus newStatus = from(safeStore, update); + InternalStatus newStatus = from(safeStore, update, true); if (newStatus == null) { if (loading == null) { // handle replay of TruncatedApply with uniqueHlc + // note: this is no longer needed because we will replay TruncatedApply entries, + // but we keep the logic in case we modify that behaviour later if (update.known().is(ApplyAtKnown)) return updateUniqueHlc(update.executeAt().uniqueHlc()); return this; diff --git a/accord-core/src/main/java/accord/local/cfk/NotifySink.java b/accord-core/src/main/java/accord/local/cfk/NotifySink.java index 3e929168..8a4df5a6 100644 --- a/accord-core/src/main/java/accord/local/cfk/NotifySink.java +++ b/accord-core/src/main/java/accord/local/cfk/NotifySink.java @@ -51,7 +51,11 @@ interface NotifySink public void notWaiting(SafeCommandStore safeStore, TxnId txnId, RoutingKey key, long uniqueHlc) { SafeCommand safeCommand = safeStore.ifLoadedAndInitialised(txnId); - if (safeCommand != null) notWaiting(safeStore, safeCommand, key, uniqueHlc); + if (safeCommand != null && safeStore.tryRecurse()) + { + try { notWaiting(safeStore, safeCommand, key, uniqueHlc); } + finally { safeStore.unrecurse(); } + } else { safeStore.commandStore().execute(txnId, safeStore0 -> { @@ -70,7 +74,11 @@ interface NotifySink { TxnId txnId = notify.plainTxnId(); - if (safeStore.canExecuteWith(txnId)) doNotifyWaitingOn(safeStore, txnId, key, waitingOnStatus, blockedUntil, notifyCfk); + if (safeStore.canExecuteWith(txnId) && safeStore.tryRecurse()) + { + try { doNotifyWaitingOn(safeStore, txnId, key, waitingOnStatus, blockedUntil, notifyCfk); } + finally { safeStore.unrecurse(); } + } else safeStore.commandStore().execute(txnId, safeStore0 -> { doNotifyWaitingOn(safeStore0, txnId, key, waitingOnStatus, blockedUntil, notifyCfk); }, safeStore.agent()); @@ -113,9 +121,10 @@ interface NotifySink private void doNotifyAlreadyReady(SafeCommandStore safeStore, TxnId txnId, RoutingKey key) { SafeCommandsForKey update = safeStore.ifLoadedAndInitialised(key); - if (update != null) + if (update != null && safeStore.tryRecurse()) { - update.callback(safeStore, safeStore.unsafeGet(txnId).current()); + try { update.callback(safeStore, safeStore.unsafeGet(txnId).current()); } + finally { safeStore.unrecurse(); } } else { diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java b/accord-core/src/main/java/accord/local/cfk/PostProcess.java index 4651bd4c..ec3046b6 100644 --- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java +++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java @@ -97,7 +97,11 @@ abstract class PostProcess { safeStore = safeStore; // make it unsafe for use in lambda SafeCommand safeCommand = safeStore.ifLoadedAndInitialised(txnId); - if (safeCommand != null) load(safeStore, safeCommand, safeCfk, notifySink); + if (safeCommand != null && safeStore.tryRecurse()) + { + try { load(safeStore, safeCommand, safeCfk, notifySink); } + finally { safeStore.unrecurse(); } + } else safeStore.commandStore().execute(contextFor(txnId, RoutingKeys.of(key), SYNC), safeStore0 -> { load(safeStore0, safeStore0.unsafeGet(txnId), safeStore0.get(key), notifySink); }, safeStore.agent()); @@ -205,13 +209,20 @@ abstract class PostProcess for (TxnId txnId : notify) { SafeCommand safeCommand = safeStore.ifLoadedAndInitialised(txnId); - if (safeCommand != null) + if (safeCommand != null && safeStore.tryRecurse()) { - CommandsForKeyUpdate update = updateUnmanaged(cfk, safeCommand, UPDATE, addUnmanageds); - if (update != cfk) + try + { + CommandsForKeyUpdate update = updateUnmanaged(cfk, safeCommand, UPDATE, addUnmanageds); + if (update != cfk) + { + Invariants.require(update.cfk() == cfk); + nestedNotify.add(update.postProcess()); + } + } + finally { - Invariants.require(update.cfk() == cfk); - nestedNotify.add(update.postProcess()); + safeStore.unrecurse(); } } else diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java b/accord-core/src/main/java/accord/local/cfk/Updating.java index d93f2735..fd3b0a43 100644 --- a/accord-core/src/main/java/accord/local/cfk/Updating.java +++ b/accord-core/src/main/java/accord/local/cfk/Updating.java @@ -287,20 +287,24 @@ class Updating return newMaxAppliedPreBootstrapWriteById; } - static Object computeInfoAndAdditions(CommandsForKey cfk, int insertPos, int updatePos, TxnId txnId, InternalStatus newStatus, boolean mayExecute, Command command) + static Object computeInfoAndAdditions(CommandsForKey cfk, int insertPos, int updatePos, TxnId plainTxnId, InternalStatus newStatus, boolean mayExecute, Command command) { Invariants.require(newStatus.hasDeps); Timestamp executeAt = command.executeAt(); - if (!newStatus.hasExecuteAt || executeAt.equals(txnId)) executeAt = txnId; + if (!newStatus.hasExecuteAt || executeAt.equals(plainTxnId)) executeAt = plainTxnId; Ballot ballot = Ballot.ZERO; if (newStatus.hasBallot) ballot = command.acceptedOrCommitted(); - Timestamp depsKnownBefore = newStatus.depsKnownBefore(txnId, executeAt); + // TODO (desired): is this the best place to encode this behaviour? + if (command.saveStatus().is(Status.Truncated)) + return TxnInfo.create(plainTxnId, newStatus, mayExecute, executeAt, NO_TXNIDS, ballot); + + Timestamp depsKnownBefore = newStatus.depsKnownBefore(plainTxnId, executeAt); MergeCursor<TxnId, DepList> deps = command.partialDeps().txnIds(cfk.key()); deps.find(cfk.redundantBefore()); - return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, txnId, newStatus, mayExecute, ballot, executeAt, cfk.prunedBefore(), depsKnownBefore, deps); + return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, plainTxnId, newStatus, mayExecute, ballot, executeAt, cfk.prunedBefore(), depsKnownBefore, deps); } /** diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index bad57fe2..4e055eb9 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -884,7 +884,7 @@ public class TopologyManager { Epochs epochs = this.epochs; // No epochs known to Accord - if (epochs.firstNonEmptyEpoch == -1) + if (epochs.firstNonEmptyEpoch == -1 || minEpoch > epochs.currentEpoch) return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch, epochs.firstNonEmptyEpoch, Collections.emptyList()); minEpoch = Math.max(minEpoch, epochs.minEpoch()); diff --git a/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java index 159124fb..5729bbb6 100644 --- a/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java +++ b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java @@ -156,6 +156,37 @@ public class BTreeReducingRangeMap<V> extends BTreeReducingIntervalMap<RoutingKe return accumulator; } + public <V2, P1, P2> V2 foldl(RoutingKey start, RoutingKey end, QuadFunction<V, V2, P1, P2, V2> fold, V2 accumulator, P1 p1, P2 p2) + { + if (isEmpty()) + return accumulator; + + int treeSize = BTree.size(tree); + int startIdx = findIndex(start); + int startPos = startIdx < 0 ? (-1 - startIdx) : startIdx; + if (startIdx == treeSize - 1 || startPos == treeSize) + return accumulator; // is last or out of bounds -> we are done + if (startIdx < 0) startPos = Math.max(0, startPos - 1); // inclusive + + int endIdx = findIndex(end); + int endPos = endIdx < 0 ? (-1 - endIdx) : endIdx; + if (endPos == 0) + return accumulator; + + endPos = Math.min(endPos - 1, treeSize - 2); // inclusive + + Iterator<Entry<RoutingKey,V>> iterator = BTree.iterator(tree, startPos, endPos, BTree.Dir.ASC); + + while (iterator.hasNext()) + { + Entry<RoutingKey, V> entry = iterator.next(); + if (entry.hasValue() && entry.value() != null) + accumulator = fold.apply(entry.value(), accumulator, p1, p2); + } + + return accumulator; + } + public int findIndex(RoutableKey key) { return BTree.findIndex(tree, EntryComparator.instance(), key); diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java index 7767ca12..92805dc3 100644 --- a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java +++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java @@ -172,13 +172,24 @@ public class CheckpointIntervalArray<Ranges, Range, Key> } else if ((ri & BIT29) != 0) { + // TODO (expected): remove this logic in near future; have disabled this optimisation in the builder subStart = ri & 0x1fffffff; - subEnd = Integer.MAX_VALUE; + subEnd = checkpointStart; + for (int prevCheckpoint = checkpoint - 1; prevCheckpoint >= 0; --prevCheckpoint) + { + int prevHeaderBaseIndex = (prevCheckpoint / 4) * 5; + int prevHeaderSubIndex = prevCheckpoint & 3; + int prevHeaderListIndex = prevHeaderBaseIndex + 1 + prevHeaderSubIndex; + int prevCheckpointStart = checkpointLists[prevHeaderListIndex]; + if (prevCheckpointStart <= subStart) + break; + subEnd = prevCheckpointStart; + } } else { int length = ri & 0x1fffffff; - subStart = checkpointLists[++i]; + subStart = checkpointLists[++i] & 0x7fffffff; subEnd = subStart + length; } diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java index d8df85c0..6f5aa394 100644 --- a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java +++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java @@ -85,6 +85,7 @@ public class CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> private static final int BIT31 = 0x80000000; private static final int BIT30 = 0x40000000; private static final int BIT29 = 0x20000000; + private static final int BIT28 = 0x10000000; static final int MIN_INDIRECT_LINK_LENGTH = 2; final Accessor<Ranges, Range, RoutingKey> accessor; @@ -448,18 +449,23 @@ public class CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> { int index = t.index & ~BIT31; int length = t.linkLength & ~BIT31; + Invariants.require(length < BIT30); if (length <= 0xff && index <= 0xfffff) { lists[listCount++] = BIT31 | BIT30 | (length << 20) | index; } - else if (t.linkLength >= 0 && length < BIT30) - { - lists[listCount++] = BIT31 | BIT29 | index; - } +// This logic has been disabled. +// It is possible for some pathological cases to have the checkpoint lists run into one another. +// So that the search is not guaranteed to stop based on comparison, and an end index can only be computed +// by scanning the checkpoint list array, which is fine but not worth the complexity or saving 4 bytes +// else if (t.linkLength >= 0) +// { +// lists[listCount++] = BIT31 | BIT29 | index; +// } else { lists[listCount++] = BIT31 | length; - lists[listCount++] = BIT31 | pending.count(); + lists[listCount++] = BIT31 | index; } } } @@ -1053,7 +1059,8 @@ public class CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> // we may want to reference this list from there // so track count and position of first one to make a determination ++openDirectCount; - if (firstOpenDirect < 0) firstOpenDirect = count; + if (firstOpenDirect < 0) + firstOpenDirect = count; } else hasClosedDirect = true; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org