This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch CASSANDRA-20112 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit cec31c0188383be231d87faa92f5b58b56ee6e8e Author: Benedict Elliott Smith <[email protected]> AuthorDate: Tue Nov 19 19:40:30 2024 +0000 fix PreacceptTrackerTest fixes to cleanup separate WAS_OWNED states --- .../coordinate/tracking/FastPathTracker.java | 5 -- .../src/main/java/accord/local/Cleanup.java | 23 ++++----- .../src/main/java/accord/local/Commands.java | 35 +++++++++++-- .../main/java/accord/local/RedundantBefore.java | 25 +++++++--- .../main/java/accord/local/RedundantStatus.java | 57 +++++++++++++++++++++- .../main/java/accord/local/cfk/PostProcess.java | 10 +++- .../src/main/java/accord/messages/Propagate.java | 6 +-- .../accord/coordinate/PreAcceptTrackerTest.java | 36 +++++++------- 8 files changed, 142 insertions(+), 55 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java index d02d8bb6..81e9d118 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java @@ -208,11 +208,6 @@ public class FastPathTracker extends PreAcceptTracker<FastPathTracker.FastPathSh return any(FastPathShardTracker::hasFailed); } - public boolean hasInFlight() - { - return any(FastPathShardTracker::hasInFlight); - } - public boolean hasReachedQuorum() { return all(FastPathShardTracker::hasReachedQuorum); diff --git a/accord-core/src/main/java/accord/local/Cleanup.java b/accord-core/src/main/java/accord/local/Cleanup.java index c7eaacc8..028f068e 100644 --- a/accord-core/src/main/java/accord/local/Cleanup.java +++ b/accord-core/src/main/java/accord/local/Cleanup.java @@ -31,10 +31,9 @@ import accord.primitives.TxnId; import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY; import static accord.local.RedundantStatus.GC_BEFORE; -import static accord.local.RedundantStatus.LIVE; import static accord.local.RedundantStatus.NOT_OWNED; import static accord.local.RedundantStatus.SHARD_REDUNDANT; -import static accord.local.RedundantStatus.WAS_OWNED_CLOSED; +import static accord.local.RedundantStatus.WAS_OWNED_RETIRED; import static accord.primitives.SaveStatus.Erased; import static accord.primitives.SaveStatus.ErasedOrVestigial; import static accord.primitives.SaveStatus.Invalidated; @@ -42,7 +41,6 @@ import static accord.primitives.SaveStatus.TruncatedApply; import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome; import static accord.primitives.SaveStatus.Uninitialised; import static accord.primitives.Status.Applied; -import static accord.primitives.Status.Durability.Majority; import static accord.primitives.Status.Durability.UniversalOrInvalidated; import static accord.primitives.Status.PreCommitted; import static accord.primitives.Txn.Kind.EphemeralRead; @@ -68,7 +66,6 @@ public enum Cleanup EXPUNGE(Erased); private static final Cleanup[] VALUES = values(); - private static boolean CLEANUP_SHARD_REDUNDANT = false; public final SaveStatus appliesIfNot; @@ -197,10 +194,11 @@ public enum Cleanup if (redundant == NOT_OWNED) illegalState("Command " + txnId + " that is being loaded is not owned by this shard on route " + route); - Cleanup max = ERASE; switch (redundant) { default: throw new AssertionError(); + case WAS_OWNED: + case WAS_OWNED_CLOSED: case LIVE: case PARTIALLY_PRE_BOOTSTRAP_OR_STALE: case PRE_BOOTSTRAP_OR_STALE: @@ -209,10 +207,7 @@ public enum Cleanup return NO; case SHARD_REDUNDANT: - // TODO (required): can we safely cleanup even before GC_BEFORE? - if (!CLEANUP_SHARD_REDUNDANT) - return NO; - max = TRUNCATE_WITH_OUTCOME; + return isPartial || saveStatus.hasBeen(PreCommitted) ? NO : INVALIDATE; case GC_BEFORE: if (!isPartial) @@ -223,12 +218,12 @@ public enum Cleanup if (!saveStatus.hasBeen(Applied) && redundantBefore.preBootstrapOrStale(txnId, participants.owns) != FULLY) { agent.onViolation(String.format("Loading SHARD_REDUNDANT command %s with status %s (that should have been Applied). Expected to be witnessed and executed by %s.", txnId, saveStatus, redundantBefore.max(participants.route, e -> e.shardAppliedOrInvalidatedBefore))); - return min(max, TRUNCATE); + return TRUNCATE; } } break; - case WAS_OWNED_CLOSED: + case WAS_OWNED_RETIRED: } durability = Durability.max(durability, durableBefore.min(txnId, participants.route)); @@ -238,18 +233,18 @@ public enum Cleanup case Local: case NotDurable: case ShardUniversal: - if (redundant == WAS_OWNED_CLOSED) + if (redundant == WAS_OWNED_RETIRED) return NO; // TODO (expected): document why we treat this differently return Cleanup.TRUNCATE_WITH_OUTCOME; case MajorityOrInvalidated: case Majority: - return min(max, TRUNCATE); + return TRUNCATE; case UniversalOrInvalidated: case Universal: // TODO (expected): can we EXPUNGE here? - return min(max, ERASE); + return ERASE; } } diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index e837e45f..357fa013 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -846,13 +846,13 @@ public class Commands case TRUNCATE_WITH_OUTCOME: Invariants.checkArgument(!command.hasBeen(Truncated), "%s", command); - Invariants.checkState(command.hasBeen(PreApplied)); + Invariants.checkState(command.hasBeen(PreApplied) || command.participants().touches().isEmpty()); result = truncatedApplyWithOutcome(command.asExecuted()); break; case TRUNCATE: Invariants.checkState(command.saveStatus().compareTo(TruncatedApply) < 0); - Invariants.checkState(command.hasBeen(PreApplied)); + Invariants.checkState(command.hasBeen(PreApplied) || command.participants().touches().isEmpty()); result = truncatedApply(command, participants); break; @@ -879,7 +879,9 @@ public class Commands { default: throw new AssertionError("Unhandled RedundantStatus: " + status); case LIVE: + case WAS_OWNED: case WAS_OWNED_CLOSED: + case WAS_OWNED_RETIRED: case REDUNDANT_PRE_BOOTSTRAP_OR_STALE: case PARTIALLY_PRE_BOOTSTRAP_OR_STALE: return false; @@ -958,7 +960,9 @@ public class Commands switch (redundantStatus) { default: throw new AssertionError("Unexpected redundant status: " + redundantStatus); - case NOT_OWNED: throw new AssertionError("Invalid state: waiting for execution of command that is not owned at the execution time"); + case NOT_OWNED: + throw new AssertionError("Invalid state: waiting for execution of command that is not owned at the execution time"); + case GC_BEFORE: case SHARD_REDUNDANT: case LOCALLY_REDUNDANT: @@ -966,6 +970,13 @@ public class Commands case PRE_BOOTSTRAP_OR_STALE: removeRedundantDependencies(safeStore, waitingSafe, loadDepId); break; + + case WAS_OWNED: + case WAS_OWNED_CLOSED: + case WAS_OWNED_RETIRED: + removeNoLongerOwnedDependency(safeStore, waitingSafe, loadDepId); + break; + case LIVE: case PARTIALLY_PRE_BOOTSTRAP_OR_STALE: } @@ -1035,7 +1046,12 @@ public class Commands switch (redundantStatus) { default: throw new AssertionError("Unknown redundant status: " + redundantStatus); - case NOT_OWNED: throw new AssertionError("Invalid state: waiting for execution of command that is not owned at the execution time"); + case NOT_OWNED: + case WAS_OWNED: + case WAS_OWNED_CLOSED: + case WAS_OWNED_RETIRED: + throw new AssertionError("Invalid state: waiting for execution of command that is not owned at the execution time"); + case LIVE: case PARTIALLY_PRE_BOOTSTRAP_OR_STALE: if (logger.isTraceEnabled()) logger.trace("{} blocked on {} until ReadyToExclude", waitingId, dep.txnId()); @@ -1134,6 +1150,17 @@ public class Commands return safeCommand.updateWaitingOn(update); } + static Command removeNoLongerOwnedDependency(SafeCommandStore safeStore, SafeCommand safeCommand, @Nonnull TxnId wasOwned) + { + Command.Committed current = safeCommand.current().asCommitted(); + if (!current.waitingOn.isWaitingOn(wasOwned)) + return current; + + WaitingOn.Update update = new WaitingOn.Update(current.waitingOn); + update.removeWaitingOn(wasOwned); + return safeCommand.updateWaitingOn(update); + } + /** * A key nominated to represent the "home" shard - only members of the home shard may be nominated to recover * a transaction, to reduce the cluster-wide overhead of ensuring progress. A transaction that has only been diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index 4af00581..c13a229e 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -56,7 +56,9 @@ import static accord.local.RedundantStatus.LOCALLY_REDUNDANT; import static accord.local.RedundantStatus.NOT_OWNED; import static accord.local.RedundantStatus.PRE_BOOTSTRAP_OR_STALE; import static accord.local.RedundantStatus.SHARD_REDUNDANT; +import static accord.local.RedundantStatus.WAS_OWNED; import static accord.local.RedundantStatus.WAS_OWNED_CLOSED; +import static accord.local.RedundantStatus.WAS_OWNED_RETIRED; import static accord.utils.Invariants.illegalState; public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> @@ -319,7 +321,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return prev; // TODO (required): consider all call-sites and confirm the answers when wasOwned and willBeOwned are reasonable - if (entry.wasOwned(txnId) && entry.isComplete()) + if (entry.wasOwned(txnId) && entry.isRetired()) return prev; boolean isPreBootstrapOrStale = entry.staleUntilAtLeast != null || entry.bootstrappedAt.compareTo(txnId) > 0; @@ -400,8 +402,8 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> RedundantStatus get(TxnId txnId) { - if (wasOwned(txnId) && isComplete()) - return WAS_OWNED_CLOSED; + if (wasOwned(txnId)) + return isRetired() ? WAS_OWNED_RETIRED : isClosed() ? WAS_OWNED_CLOSED : WAS_OWNED; return getIgnoringOwnership(txnId); } @@ -434,11 +436,16 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return aIsNull ? 0 : a.compareTo(b); } - public final TxnId shardRedundantBefore() + public final TxnId gcBefore() { return gcBefore; } + public final TxnId shardRedundantBefore() + { + return shardAppliedOrInvalidatedBefore; + } + public final TxnId locallyWitnessedBefore() { return locallyWitnessedOrInvalidatedBefore; @@ -465,10 +472,16 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> } // TODO (required): do we still need this, or can we stick to just the explicit endOwnershipEpoch - private boolean isComplete() + private boolean isRetired() + { + // TODO (required): carefully consider whether we should ALSO expect some local property to be met here + return endOwnershipEpoch <= shardAppliedOrInvalidatedBefore.epoch(); + } + + private boolean isClosed() { // TODO (required): carefully consider whether we should ALSO expect some local property to be met here - return endOwnershipEpoch <= gcBefore.epoch(); + return endOwnershipEpoch <= locallyWitnessedOrInvalidatedBefore.epoch(); } private boolean outOfBounds(Timestamp lb) diff --git a/accord-core/src/main/java/accord/local/RedundantStatus.java b/accord-core/src/main/java/accord/local/RedundantStatus.java index a3c0cf85..279621bb 100644 --- a/accord-core/src/main/java/accord/local/RedundantStatus.java +++ b/accord-core/src/main/java/accord/local/RedundantStatus.java @@ -29,12 +29,23 @@ public enum RedundantStatus */ NOT_OWNED, + /** + * None of the relevant ranges are owned by the command store anymore + */ + WAS_OWNED, + /** * None of the relevant ranges are owned by the command store anymore, and all of those ranges are closed - * (i.e. we should not participate in any decisions) + * (i.e. we should know all participating commands already) */ WAS_OWNED_CLOSED, + /** + * None of the relevant ranges are owned by the command store anymore, and all of those ranges are retired + * (i.e. we should not participate in any decisions) + */ + WAS_OWNED_RETIRED, + /** * Some of the relevant ranges are owned by the command store and valid for execution */ @@ -94,7 +105,9 @@ public enum RedundantStatus { NOT_OWNED.merge = new EnumMap<>(RedundantStatus.class); NOT_OWNED.merge.put(NOT_OWNED, NOT_OWNED); + NOT_OWNED.merge.put(WAS_OWNED, WAS_OWNED); NOT_OWNED.merge.put(WAS_OWNED_CLOSED, WAS_OWNED_CLOSED); + NOT_OWNED.merge.put(WAS_OWNED_RETIRED, WAS_OWNED_RETIRED); NOT_OWNED.merge.put(LIVE, LIVE); NOT_OWNED.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); NOT_OWNED.merge.put(PRE_BOOTSTRAP_OR_STALE, PRE_BOOTSTRAP_OR_STALE); @@ -102,19 +115,47 @@ public enum RedundantStatus NOT_OWNED.merge.put(LOCALLY_REDUNDANT, LOCALLY_REDUNDANT); NOT_OWNED.merge.put(SHARD_REDUNDANT, SHARD_REDUNDANT); NOT_OWNED.merge.put(GC_BEFORE, GC_BEFORE); + WAS_OWNED.merge = new EnumMap<>(RedundantStatus.class); + WAS_OWNED.merge.put(NOT_OWNED, NOT_OWNED); + WAS_OWNED.merge.put(WAS_OWNED, WAS_OWNED); + WAS_OWNED.merge.put(WAS_OWNED_CLOSED, WAS_OWNED); + WAS_OWNED.merge.put(WAS_OWNED_RETIRED, WAS_OWNED); + WAS_OWNED.merge.put(LIVE, LIVE); + WAS_OWNED.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); + WAS_OWNED.merge.put(PRE_BOOTSTRAP_OR_STALE, PRE_BOOTSTRAP_OR_STALE); + WAS_OWNED.merge.put(REDUNDANT_PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); + WAS_OWNED.merge.put(LOCALLY_REDUNDANT, LOCALLY_REDUNDANT); + WAS_OWNED.merge.put(SHARD_REDUNDANT, SHARD_REDUNDANT); + WAS_OWNED.merge.put(GC_BEFORE, GC_BEFORE); WAS_OWNED_CLOSED.merge = new EnumMap<>(RedundantStatus.class); WAS_OWNED_CLOSED.merge.put(NOT_OWNED, NOT_OWNED); + WAS_OWNED_CLOSED.merge.put(WAS_OWNED, WAS_OWNED); WAS_OWNED_CLOSED.merge.put(WAS_OWNED_CLOSED, WAS_OWNED_CLOSED); + WAS_OWNED_CLOSED.merge.put(WAS_OWNED_RETIRED, WAS_OWNED_CLOSED); WAS_OWNED_CLOSED.merge.put(LIVE, LIVE); WAS_OWNED_CLOSED.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); WAS_OWNED_CLOSED.merge.put(PRE_BOOTSTRAP_OR_STALE, PRE_BOOTSTRAP_OR_STALE); WAS_OWNED_CLOSED.merge.put(REDUNDANT_PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); WAS_OWNED_CLOSED.merge.put(LOCALLY_REDUNDANT, LOCALLY_REDUNDANT); WAS_OWNED_CLOSED.merge.put(SHARD_REDUNDANT, SHARD_REDUNDANT); - WAS_OWNED_CLOSED.merge.put(GC_BEFORE, GC_BEFORE); + WAS_OWNED_CLOSED.merge.put(GC_BEFORE, GC_BEFORE); + WAS_OWNED_RETIRED.merge = new EnumMap<>(RedundantStatus.class); + WAS_OWNED_RETIRED.merge.put(NOT_OWNED, NOT_OWNED); + WAS_OWNED_RETIRED.merge.put(WAS_OWNED, WAS_OWNED); + WAS_OWNED_RETIRED.merge.put(WAS_OWNED_CLOSED, WAS_OWNED_CLOSED); + WAS_OWNED_RETIRED.merge.put(WAS_OWNED_RETIRED, WAS_OWNED_RETIRED); + WAS_OWNED_RETIRED.merge.put(LIVE, LIVE); + WAS_OWNED_RETIRED.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); + WAS_OWNED_RETIRED.merge.put(PRE_BOOTSTRAP_OR_STALE, PRE_BOOTSTRAP_OR_STALE); + WAS_OWNED_RETIRED.merge.put(REDUNDANT_PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); + WAS_OWNED_RETIRED.merge.put(LOCALLY_REDUNDANT, LOCALLY_REDUNDANT); + WAS_OWNED_RETIRED.merge.put(SHARD_REDUNDANT, SHARD_REDUNDANT); + WAS_OWNED_RETIRED.merge.put(GC_BEFORE, GC_BEFORE); LIVE.merge = new EnumMap<>(RedundantStatus.class); LIVE.merge.put(NOT_OWNED, LIVE); + LIVE.merge.put(WAS_OWNED, LIVE); LIVE.merge.put(WAS_OWNED_CLOSED, LIVE); + LIVE.merge.put(WAS_OWNED_RETIRED, LIVE); LIVE.merge.put(LIVE, LIVE); LIVE.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); LIVE.merge.put(PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); @@ -124,7 +165,9 @@ public enum RedundantStatus LIVE.merge.put(GC_BEFORE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); PARTIALLY_PRE_BOOTSTRAP_OR_STALE.merge = new EnumMap<>(RedundantStatus.class); PARTIALLY_PRE_BOOTSTRAP_OR_STALE.merge.put(NOT_OWNED, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); + PARTIALLY_PRE_BOOTSTRAP_OR_STALE.merge.put(WAS_OWNED, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); PARTIALLY_PRE_BOOTSTRAP_OR_STALE.merge.put(WAS_OWNED_CLOSED, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); + PARTIALLY_PRE_BOOTSTRAP_OR_STALE.merge.put(WAS_OWNED_RETIRED, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); PARTIALLY_PRE_BOOTSTRAP_OR_STALE.merge.put(LIVE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); PARTIALLY_PRE_BOOTSTRAP_OR_STALE.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); PARTIALLY_PRE_BOOTSTRAP_OR_STALE.merge.put(PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); @@ -134,7 +177,9 @@ public enum RedundantStatus PARTIALLY_PRE_BOOTSTRAP_OR_STALE.merge.put(GC_BEFORE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); PRE_BOOTSTRAP_OR_STALE.merge = new EnumMap<>(RedundantStatus.class); PRE_BOOTSTRAP_OR_STALE.merge.put(NOT_OWNED, PRE_BOOTSTRAP_OR_STALE); + PRE_BOOTSTRAP_OR_STALE.merge.put(WAS_OWNED, PRE_BOOTSTRAP_OR_STALE); PRE_BOOTSTRAP_OR_STALE.merge.put(WAS_OWNED_CLOSED, PRE_BOOTSTRAP_OR_STALE); + PRE_BOOTSTRAP_OR_STALE.merge.put(WAS_OWNED_RETIRED, PRE_BOOTSTRAP_OR_STALE); PRE_BOOTSTRAP_OR_STALE.merge.put(LIVE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); PRE_BOOTSTRAP_OR_STALE.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); PRE_BOOTSTRAP_OR_STALE.merge.put(PRE_BOOTSTRAP_OR_STALE, PRE_BOOTSTRAP_OR_STALE); @@ -144,7 +189,9 @@ public enum RedundantStatus PRE_BOOTSTRAP_OR_STALE.merge.put(GC_BEFORE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); REDUNDANT_PRE_BOOTSTRAP_OR_STALE.merge = new EnumMap<>(RedundantStatus.class); REDUNDANT_PRE_BOOTSTRAP_OR_STALE.merge.put(NOT_OWNED, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); + REDUNDANT_PRE_BOOTSTRAP_OR_STALE.merge.put(WAS_OWNED, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); REDUNDANT_PRE_BOOTSTRAP_OR_STALE.merge.put(WAS_OWNED_CLOSED, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); + REDUNDANT_PRE_BOOTSTRAP_OR_STALE.merge.put(WAS_OWNED_RETIRED, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); REDUNDANT_PRE_BOOTSTRAP_OR_STALE.merge.put(LIVE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); // TODO (expected): should this be an invalid combination? REDUNDANT_PRE_BOOTSTRAP_OR_STALE.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); REDUNDANT_PRE_BOOTSTRAP_OR_STALE.merge.put(PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); @@ -154,7 +201,9 @@ public enum RedundantStatus REDUNDANT_PRE_BOOTSTRAP_OR_STALE.merge.put(GC_BEFORE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); LOCALLY_REDUNDANT.merge = new EnumMap<>(RedundantStatus.class); LOCALLY_REDUNDANT.merge.put(NOT_OWNED, LOCALLY_REDUNDANT); + LOCALLY_REDUNDANT.merge.put(WAS_OWNED, LOCALLY_REDUNDANT); LOCALLY_REDUNDANT.merge.put(WAS_OWNED_CLOSED, LOCALLY_REDUNDANT); + LOCALLY_REDUNDANT.merge.put(WAS_OWNED_RETIRED, LOCALLY_REDUNDANT); LOCALLY_REDUNDANT.merge.put(LIVE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); // TODO (expected): should this be an invalid combination? LOCALLY_REDUNDANT.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); LOCALLY_REDUNDANT.merge.put(PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); @@ -164,7 +213,9 @@ public enum RedundantStatus LOCALLY_REDUNDANT.merge.put(GC_BEFORE, LOCALLY_REDUNDANT); SHARD_REDUNDANT.merge = new EnumMap<>(RedundantStatus.class); SHARD_REDUNDANT.merge.put(NOT_OWNED, SHARD_REDUNDANT); + SHARD_REDUNDANT.merge.put(WAS_OWNED, SHARD_REDUNDANT); SHARD_REDUNDANT.merge.put(WAS_OWNED_CLOSED, SHARD_REDUNDANT); + SHARD_REDUNDANT.merge.put(WAS_OWNED_RETIRED, SHARD_REDUNDANT); SHARD_REDUNDANT.merge.put(LIVE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); // TODO (expected): should this be an invalid combination? SHARD_REDUNDANT.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); SHARD_REDUNDANT.merge.put(PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); @@ -174,7 +225,9 @@ public enum RedundantStatus SHARD_REDUNDANT.merge.put(GC_BEFORE, SHARD_REDUNDANT); GC_BEFORE.merge = new EnumMap<>(RedundantStatus.class); GC_BEFORE.merge.put(NOT_OWNED, SHARD_REDUNDANT); + GC_BEFORE.merge.put(WAS_OWNED, GC_BEFORE); GC_BEFORE.merge.put(WAS_OWNED_CLOSED, GC_BEFORE); + GC_BEFORE.merge.put(WAS_OWNED_RETIRED, GC_BEFORE); GC_BEFORE.merge.put(LIVE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); // TODO (expected): should this be an invalid combination? GC_BEFORE.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); GC_BEFORE.merge.put(PRE_BOOTSTRAP_OR_STALE, REDUNDANT_PRE_BOOTSTRAP_OR_STALE); diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java b/accord-core/src/main/java/accord/local/cfk/PostProcess.java index 4e3f222f..d49b39c6 100644 --- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java +++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java @@ -252,7 +252,8 @@ abstract class PostProcess @Nullable TxnInfo curInfo, @Nullable TxnInfo newInfo) { - TxnId redundantBefore = boundsInfo.shardRedundantBefore(); + // TODO (expected): can we relax this to shardRedundantBefore? + TxnId redundantBefore = boundsInfo.gcBefore(); TxnId bootstrappedAt = boundsInfo.bootstrappedAt; if (bootstrappedAt.compareTo(redundantBefore) <= 0) bootstrappedAt = null; @@ -305,7 +306,10 @@ abstract class PostProcess Predicate<Timestamp> rescheduleOrNotifyIf = null; if ((newInfo != null && newInfo.isAtLeast(INVALIDATED) && curInfo != null && curInfo.isCommittedToExecute())) + { rescheduleOrNotifyIf = curInfo.executeAt::equals; + } + if (isNewBoundsInfo && bootstrappedAt != null) { Timestamp maxPreBootstrap; @@ -320,8 +324,10 @@ abstract class PostProcess } maxPreBootstrap = tmp; } - if (rescheduleOrNotifyIf == null || Timestamp.nonNullOrMax(maxPreBootstrap, curInfo.executeAt) == maxPreBootstrap) + if (rescheduleOrNotifyIf == null) rescheduleOrNotifyIf = test -> test.compareTo(maxPreBootstrap) <= 0; + else + rescheduleOrNotifyIf = test -> curInfo.executeAt.equals(test) || test.compareTo(maxPreBootstrap) <= 0; } if (rescheduleOrNotifyIf != null) diff --git a/accord-core/src/main/java/accord/messages/Propagate.java b/accord-core/src/main/java/accord/messages/Propagate.java index d363e957..928a97dd 100644 --- a/accord-core/src/main/java/accord/messages/Propagate.java +++ b/accord-core/src/main/java/accord/messages/Propagate.java @@ -55,8 +55,6 @@ import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid; import static accord.local.Cleanup.ERASE; import static accord.local.Cleanup.VESTIGIAL; import static accord.local.Commands.purge; -import static accord.local.RedundantStatus.LOCALLY_REDUNDANT; -import static accord.local.RedundantStatus.REDUNDANT_PRE_BOOTSTRAP_OR_STALE; import static accord.primitives.SaveStatus.Stable; import static accord.primitives.Status.NotDefined; import static accord.primitives.Status.PreApplied; @@ -411,7 +409,7 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt { default: throw new AssertionError("Unhandled RedundantStatus: " + status); case NOT_OWNED: - case WAS_OWNED_CLOSED: + case WAS_OWNED_RETIRED: case GC_BEFORE: case SHARD_REDUNDANT: case LOCALLY_REDUNDANT: @@ -424,6 +422,8 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt return true; case PARTIALLY_PRE_BOOTSTRAP_OR_STALE: case LIVE: + case WAS_OWNED: + case WAS_OWNED_CLOSED: return false; } } diff --git a/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java b/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java index b528742c..179e0a72 100644 --- a/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java +++ b/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java @@ -43,13 +43,11 @@ public class PreAcceptTrackerTest private static void assertResponseState(FastPathTracker responses, boolean quorumReached, boolean fastPathAccepted, - boolean failed, - boolean hasOutstandingResponses) + boolean failed) { Assertions.assertEquals(quorumReached, responses.hasReachedQuorum()); Assertions.assertEquals(fastPathAccepted, responses.hasFastPathAccepted()); Assertions.assertEquals(failed, responses.hasFailed()); - Assertions.assertEquals(hasOutstandingResponses, responses.hasInFlight()); } @Test @@ -59,13 +57,13 @@ public class PreAcceptTrackerTest FastPathTracker responses = new FastPathTracker(topologies(subTopology)); responses.recordSuccess(ids[0], false); - assertResponseState(responses, false, false, false, true); + assertResponseState(responses, false, false, false); responses.recordSuccess(ids[1], false); - assertResponseState(responses, true, false, false, true); + assertResponseState(responses, true, false, false); responses.recordSuccess(ids[2], false); - assertResponseState(responses, true, false, false, false); + assertResponseState(responses, true, false, false); } @Test @@ -75,13 +73,13 @@ public class PreAcceptTrackerTest FastPathTracker responses = new FastPathTracker(topologies(subTopology)); responses.recordSuccess(ids[0], true); - assertResponseState(responses, false, false, false, true); + assertResponseState(responses, false, false, false); responses.recordSuccess(ids[1], true); - assertResponseState(responses, true, false, false, true); + assertResponseState(responses, true, false, false); responses.recordSuccess(ids[2], true); - assertResponseState(responses, true, true, false, false); + assertResponseState(responses, true, true, false); } /** @@ -94,14 +92,14 @@ public class PreAcceptTrackerTest FastPathTracker responses = new FastPathTracker(topologies(subTopology)); responses.recordSuccess(ids[0], false); - assertResponseState(responses, false, false, false, true); + assertResponseState(responses, false, false, false); responses.recordSuccess(ids[1], false); - assertResponseState(responses, true, false, false, true); + assertResponseState(responses, true, false, false); Assertions.assertFalse(subTopology.get(0).nodes.contains(ids[4])); responses.recordSuccess(ids[4], false); - assertResponseState(responses, true, false, false, true); + assertResponseState(responses, true, false, false); } @Test @@ -111,13 +109,13 @@ public class PreAcceptTrackerTest FastPathTracker responses = new FastPathTracker(topologies(subTopology)); responses.recordSuccess(ids[0], true); - assertResponseState(responses, false, false, false, true); + assertResponseState(responses, false, false, false); responses.recordFailure(ids[1]); - assertResponseState(responses, false, false, false, true); + assertResponseState(responses, false, false, false); responses.recordFailure(ids[2]); - assertResponseState(responses, false, false, true, false); + assertResponseState(responses, false, false, true); } @Test @@ -135,19 +133,19 @@ public class PreAcceptTrackerTest Assertions.assertSame(subTopology.get(2), responses.get(2).shard); responses.recordSuccess(ids[1], true); - assertResponseState(responses, false, false, false, true); + assertResponseState(responses, false, false, false); responses.recordSuccess(ids[2], true); - assertResponseState(responses, false, false, false, true); + assertResponseState(responses, false, false, false); responses.recordSuccess(ids[3], true); // the middle shard will have reached fast path Assertions.assertTrue(responses.get(1).hasMetFastPathCriteria()); // but since the others haven't, it won't report it as accepted - assertResponseState(responses, true, false, false, true); + assertResponseState(responses, true, false, false); responses.recordSuccess(ids[0], true); responses.recordSuccess(ids[4], true); - assertResponseState(responses, true, true, false, false); + assertResponseState(responses, true, true, false); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
