This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new fc14a154 Various fixes and improvements Improve: - InMemoryJournal compaction should simulate files to ensure we compact the same cohorts of records (so shadowing applied correctly) - Don't update CFK deps if already known - avoid unnecessary heapification of LogGroupTimers - begin removal of Guava (mostly unused) - consider medium path on fast path delayed - add Seekables.indexOf to support faster serialization - unboxed Invariant.requires variant - AsyncChains.addCallba [...] fc14a154 is described below commit fc14a154fd514d4ab40b37508fb9497f786835e0 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Sat Mar 15 09:06:01 2025 +0000 Various fixes and improvements Improve: - InMemoryJournal compaction should simulate files to ensure we compact the same cohorts of records (so shadowing applied correctly) - Don't update CFK deps if already known - avoid unnecessary heapification of LogGroupTimers - begin removal of Guava (mostly unused) - consider medium path on fast path delayed - add Seekables.indexOf to support faster serialization - unboxed Invariant.requires variant - AsyncChains.addCallback -> invoke Fix: - Recovery must wait for earlier transactions on shards that have not yet been accepted, even if we recover a fast Stable record on another shard - only skip Dep calculation on PreAccept if vote is required by coordinator - ReadTracker must slice Minimal the first unavailable collection - If PreLoadContext cannot acquire shared caches, still consider existing contents - CFK: make sure to insert UNSTABLE into missing array - Fix failed task stops DelayedCommandStore task queue processing further tasks - short circuit AbstractRanges.equals() - Handle edge case where we can take fast/medium path but one of the Deps replies contains a future TxnId - update executeAtEpoch when retryInFutureEpoch - Fix incorrect validation in validateMissing (should only validate newInfo is not in missing when in witnessedBy; all other additions should be included) patch by Benedict; reviewed by David Capwell for CASSANDRA-20522 --- accord-core/src/main/java/accord/api/Journal.java | 2 +- .../accord/coordinate/CoordinateTransaction.java | 71 ++++-- .../java/accord/coordinate/ExecuteSyncPoint.java | 4 +- .../main/java/accord/coordinate/MaybeRecover.java | 2 +- .../src/main/java/accord/coordinate/Recover.java | 131 ++++++---- .../coordinate/tracking/FastPathTracker.java | 11 +- .../accord/coordinate/tracking/ReadTracker.java | 2 +- .../accord/impl/AbstractConfigurationService.java | 12 +- .../java/accord/impl/AbstractSafeCommandStore.java | 17 +- .../src/main/java/accord/local/CommandStores.java | 2 +- accord-core/src/main/java/accord/local/Node.java | 10 +- .../main/java/accord/local/RedundantBefore.java | 2 +- .../main/java/accord/local/cfk/CommandsForKey.java | 64 +++-- .../src/main/java/accord/local/cfk/Pruning.java | 4 +- .../src/main/java/accord/local/cfk/Updating.java | 79 +++--- .../src/main/java/accord/local/cfk/Utils.java | 37 ++- .../local/durability/ConcurrencyControl.java | 4 +- .../accord/local/durability/DurabilityQueue.java | 2 +- .../accord/local/durability/ShardDurability.java | 2 +- .../src/main/java/accord/messages/Apply.java | 13 +- .../main/java/accord/messages/BeginRecovery.java | 2 +- .../src/main/java/accord/messages/PreAccept.java | 2 +- .../java/accord/messages/SetGloballyDurable.java | 2 +- .../main/java/accord/messages/SetShardDurable.java | 2 +- .../java/accord/messages/WaitUntilApplied.java | 2 + .../java/accord/primitives/AbstractRanges.java | 8 + .../main/java/accord/primitives/LatestDeps.java | 14 +- .../src/main/java/accord/primitives/Routables.java | 1 + .../src/main/java/accord/primitives/Timestamp.java | 3 + .../main/java/accord/topology/TopologyManager.java | 4 +- .../src/main/java/accord/utils/Functions.java | 17 -- .../src/main/java/accord/utils/Invariants.java | 6 + .../src/main/java/accord/utils/LogGroupTimers.java | 2 +- .../main/java/accord/utils/PersistentField.java | 2 +- .../java/accord/utils/async/AsyncCallbacks.java | 4 +- .../main/java/accord/utils/async/AsyncChain.java | 40 ++- .../main/java/accord/utils/async/AsyncChains.java | 43 +++- .../main/java/accord/utils/async/AsyncResult.java | 10 +- .../main/java/accord/utils/async/AsyncResults.java | 8 +- .../src/main/java/accord/utils/btree/BTreeSet.java | 6 +- .../src/test/java/accord/impl/basic/Cluster.java | 2 +- .../accord/impl/basic/DelayedCommandStores.java | 4 +- .../java/accord/impl/basic/InMemoryJournal.java | 272 +++++++++++++++++---- .../java/accord/impl/basic/LoggingJournal.java | 4 +- .../accord/impl/list/ListFetchCoordinator.java | 2 +- .../test/java/accord/impl/list/ListRequest.java | 2 +- .../src/test/java/accord/local/CommandsTest.java | 2 +- .../java/accord/utils/async/AsyncChainsTest.java | 10 +- .../src/main/java/accord/maelstrom/Cluster.java | 2 +- .../java/accord/maelstrom/MaelstromRequest.java | 2 +- 50 files changed, 619 insertions(+), 332 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index a1246f76..1be59925 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -43,7 +43,7 @@ import org.agrona.collections.Int2ObjectHashMap; */ public interface Journal { - Journal start(Node node); + void start(Node node); Command loadCommand(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore); Command.Minimal loadMinimal(int store, TxnId txnId, Load load, RedundantBefore redundantBefore, DurableBefore durableBefore); diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java index aceb8ee1..b27eeac9 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java @@ -104,38 +104,57 @@ public class CoordinateTransaction extends CoordinatePreAccept<Result> { if (tracker.hasFastPathAccepted()) { - Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), List::get, ok -> ok.deps); - ExecuteFlags executeFlags = Functions.foldl(oks.valuesAsNullableList(), (ok, v) -> ok == null ? v : v.and(ok.flags), ExecuteFlags.all()); - // note: we merge all Deps regardless of witnessedAt. While we only need fast path votes, - // we must include Deps from fast path votes from earlier epochs that may have witnessed later transactions - // TODO (desired): we might mask some bugs by merging more responses than we strictly need, so optimise this to optionally merge minimal deps - executeAdapter().execute(node, topologies, route, FAST, executeFlags, txnId, txn, txnId, deps, deps, settingCallback()); - node.agent().eventListener().onFastPathTaken(txnId, deps); + Deps deps = mergeFastOrMediumDeps(oks); + if (deps != null) + { + ExecuteFlags executeFlags = Functions.foldl(oks.valuesAsNullableList(), (ok, v) -> ok == null ? v : v.and(ok.flags), ExecuteFlags.all()); + // note: we merge all Deps regardless of witnessedAt. While we only need fast path votes, + // we must include Deps from fast path votes from earlier epochs that may have witnessed later transactions + // TODO (desired): we might mask some bugs by merging more responses than we strictly need, so optimise this to optionally merge minimal deps + executeAdapter().execute(node, topologies, route, FAST, executeFlags, txnId, txn, txnId, deps, deps, settingCallback()); + node.agent().eventListener().onFastPathTaken(txnId, deps); + return; + } } else if (tracker.hasMediumPathAccepted() && txnId.hasMediumPath()) { - Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), List::get, ok -> ok.deps); - proposeAdapter().propose(node, topologies, route, Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, this); - node.agent().eventListener().onMediumPathTaken(txnId, deps); - } - else - { - // TODO (low priority, efficiency): perhaps don't submit Accept immediately if we almost have enough for fast-path, - // but by sending accept we rule out hybrid fast-path - // TODO (low priority, efficiency): if we receive an expired response, perhaps defer to permit at least one other - // node to respond before invalidating - if (executeAt.is(REJECTED)) + Deps deps = mergeFastOrMediumDeps(oks); + if (deps != null) { - proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt,this); - node.agent().eventListener().onRejected(txnId); - } - else - { - Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), List::get, ok -> ok.deps); - proposeAdapter().propose(node, topologies, route, Accept.Kind.SLOW, Ballot.ZERO, txnId, txn, executeAt, deps, this); - node.agent().eventListener().onSlowPathTaken(txnId, deps); + proposeAdapter().propose(node, topologies, route, Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, this); + node.agent().eventListener().onMediumPathTaken(txnId, deps); + return; } } + else if (executeAt.is(REJECTED)) + { + proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt,this); + node.agent().eventListener().onRejected(txnId); + return; + } + + Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), List::get, ok -> ok.deps); + proposeAdapter().propose(node, topologies, route, Accept.Kind.SLOW, Ballot.ZERO, txnId, txn, executeAt, deps, this); + node.agent().eventListener().onSlowPathTaken(txnId, deps); + } + + private Deps mergeFastOrMediumDeps(SortedListMap<?, PreAcceptOk> oks) + { + // we must merge all Deps replies from prior topologies, but from the latest topology we can safely merge only those replies that voted for the fast path + // TODO (desired): actually merge these topologies separately, rather than just switching behaviour when multiple topologies + if (tracker.topologies().size() == 1) + return Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), List::get, ok -> ok.witnessedAt.equals(ok.txnId) ? ok.deps : null); + + Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), List::get, ok -> ok.deps); + // it is possible that one of the earlier epochs that did not need to vote for the fast path + // was also unable to compute valid dependencies, and returned a future TxnId as a proxy. + // In this case while it is still in principle safe to propose the fast path, it is simpler not to, + // as it permits us to maintain safety validation logic that detects unsafe behaviour and execution will + // need to wait for the future transaction to be agreed anyway (so we can use its dependency calculation). + if (deps.maxTxnId(txnId).compareTo(txnId) > 0) + return null; + + return deps; } protected CoordinationAdapter<Result> proposeAdapter() diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java index 0f37728c..79482b51 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java @@ -173,7 +173,7 @@ public class ExecuteSyncPoint extends SettableResult<DurabilityResult> implement { node.withEpoch(retryInFutureEpoch, (ignore, failure) -> tryFailure(WrappableException.wrap(failure)), () -> { ExecuteSyncPoint continuation = new ExecuteSyncPoint(node, syncPoint, node.topology().preciseEpochs(syncPoint.route(), tracker.topologies().currentEpoch(), retryInFutureEpoch, SHARE), excludeSuccess, executor, attempt, current()); - continuation.addCallback((success, failure) -> { + continuation.invoke((success, failure) -> { if (failure == null) trySuccess(success); else tryFailure(failure); }); @@ -249,6 +249,6 @@ public class ExecuteSyncPoint extends SettableResult<DurabilityResult> implement { SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty(); if (contact == null) tryFailure(new Exhausted(syncPoint.syncId, syncPoint.route.homeKey(), null)); - else node.send(contact, to -> new WaitUntilApplied(to, tracker.topologies(), syncPoint.syncId, syncPoint.route, syncPoint.syncId.epoch()), executor, this); + else node.send(contact, to -> new WaitUntilApplied(to, tracker.topologies(), syncPoint.syncId, syncPoint.route, tracker.topologies().currentEpoch()), executor, this); } } diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java index 81232d89..1a50486c 100644 --- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java +++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java @@ -122,7 +122,7 @@ public class MaybeRecover extends CheckShards<Route<?>> else { Invariants.require(Route.isFullRoute(someRoute), "Require a full route but given %s", full.route); - node.recover(txnId, full.invalidIf, Route.castToFullRoute(someRoute), reportLowEpoch, reportHighEpoch).addCallback(callback); + node.recover(txnId, full.invalidIf, Route.castToFullRoute(someRoute), reportLowEpoch, reportHighEpoch).invoke(callback); } break; diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java index d3d095d4..74e74f3a 100644 --- a/accord-core/src/main/java/accord/coordinate/Recover.java +++ b/accord-core/src/main/java/accord/coordinate/Recover.java @@ -67,6 +67,7 @@ import accord.utils.async.AsyncResults; import static accord.api.ProgressLog.BlockedUntil.CommittedOrNotFastPathCommit; import static accord.api.ProgressLog.BlockedUntil.HasCommittedDeps; +import static accord.api.ProgressLog.BlockedUntil.HasDecidedExecuteAt; import static accord.api.ProtocolModifiers.QuorumEpochIntersections; import static accord.coordinate.CoordinationAdapter.Factory.Kind.Recovery; import static accord.coordinate.ExecutePath.RECOVER; @@ -253,18 +254,21 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw if (acceptOrCommitNotTruncated != null) { - Status status = acceptOrCommitNotTruncated.status; Timestamp executeAt = acceptOrCommitNotTruncated.executeAt; - if (committedExecuteAt != null) - { - Invariants.require(acceptOrCommitNotTruncated.status.compareTo(Status.PreCommitted) < 0 || executeAt.equals(committedExecuteAt)); - // if we know from a prior Accept attempt that this is committed we can go straight to the commit phase - if (status == AcceptedMedium || status == AcceptedSlow) - status = Status.Committed; + Status status; { + Status tmp = acceptOrCommitNotTruncated.status; + if (committedExecuteAt != null) + { + Invariants.require(acceptOrCommitNotTruncated.status.compareTo(Status.PreCommitted) < 0 || executeAt.equals(committedExecuteAt)); + // if we know from a prior Accept attempt that this is committed we can go straight to the commit phase + if (tmp == AcceptedMedium || tmp == AcceptedSlow) + tmp = Status.Committed; + } + status = tmp; } + switch (status) { - default: throw new UnhandledEnum(status); case Truncated: throw illegalState("Truncate should be filtered"); case Invalidated: { @@ -272,45 +276,6 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw return; } - case Applied: - case PreApplied: - { - withStableDeps(recoverOkList, executeAt, (i, t) -> node.agent().acceptAndWrap(i, t), stableDeps -> { - adapter.persist(node, tracker.topologies(), route, txnId, txn, executeAt, stableDeps, acceptOrCommitNotTruncated.writes, acceptOrCommitNotTruncated.result, (i, t) -> node.agent().acceptAndWrap(i, t)); - }); - accept(acceptOrCommitNotTruncated.result, null); - return; - } - - case Stable: - { - withStableDeps(recoverOkList, executeAt, this, stableDeps -> { - adapter.execute(node, tracker.topologies(), route, RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stableDeps, stableDeps, this); - }); - return; - } - - case PreCommitted: - case Committed: - { - withCommittedDeps(recoverOkList, executeAt, this, committedDeps -> { - adapter.stabilise(node, tracker.topologies(), route, ballot, txnId, txn, executeAt, committedDeps, this); - }); - return; - } - - case AcceptedSlow: - case AcceptedMedium: - { - // TODO (desired): if we have a quorum of Accept with matching ballot or proposal we can go straight to Commit - // TODO (desired): if we didn't find Accepted in *every* shard, consider invalidating for consistency of behaviour - // however, note that we may have taken the fast path and recovered, so we can only do this if acceptedOrCommitted=Ballot.ZERO - // (otherwise recovery was attempted and did not invalidate, so it must have determined it needed to complete) - Deps proposeDeps = LatestDeps.mergeProposal(recoverOkList, ok -> ok == null ? null : ok.deps); - propose(SLOW, acceptOrCommitNotTruncated.executeAt, proposeDeps); - return; - } - case AcceptedInvalidate: { invalidate(recoverOks); @@ -321,6 +286,53 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw case PreAccepted: throw illegalState("Should only be possible to have Accepted or later commands"); } + + LatestDeps.Merge merge = mergeDeps(recoverOkList); + Participants<?> await = merge.notAccepted(route); + awaitPartialEarlier(recoverOkList, await, () -> { + switch (status) + { + default: throw new UnhandledEnum(status); + case Applied: + case PreApplied: + { + withStableDeps(merge, executeAt, (i, t) -> node.agent().acceptAndWrap(i, t), stableDeps -> { + adapter.persist(node, tracker.topologies(), route, txnId, txn, executeAt, stableDeps, acceptOrCommitNotTruncated.writes, acceptOrCommitNotTruncated.result, (i, t) -> node.agent().acceptAndWrap(i, t)); + }); + accept(acceptOrCommitNotTruncated.result, null); + return; + } + + case Stable: + { + withStableDeps(merge, executeAt, this, stableDeps -> { + adapter.execute(node, tracker.topologies(), route, RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stableDeps, stableDeps, this); + }); + return; + } + + case PreCommitted: + case Committed: + { + withCommittedDeps(merge, executeAt, this, committedDeps -> { + adapter.stabilise(node, tracker.topologies(), route, ballot, txnId, txn, executeAt, committedDeps, this); + }); + return; + } + + case AcceptedSlow: + case AcceptedMedium: + { + // TODO (desired): if we have a quorum of Accept with matching ballot or proposal we can go straight to Commit + // TODO (desired): if we didn't find Accepted in *every* shard, consider invalidating for consistency of behaviour + // however, note that we may have taken the fast path and recovered, so we can only do this if acceptedOrCommitted=Ballot.ZERO + // (otherwise recovery was attempted and did not invalidate, so it must have determined it needed to complete) + Deps proposeDeps = merge.mergeProposal(); + propose(SLOW, acceptOrCommitNotTruncated.executeAt, proposeDeps); + } + } + }); + return; } if (acceptOrCommit != null && acceptOrCommit != acceptOrCommitNotTruncated) @@ -417,6 +429,23 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw } } + private static LatestDeps.Merge mergeDeps(List<RecoverOk> nullableRecoverOkList) + { + return LatestDeps.merge(nullableRecoverOkList, ok -> ok == null ? null : ok.deps); + } + + private void awaitPartialEarlier(List<RecoverOk> nullableRecoverOkList, Participants<?> participants, Runnable whenReady) + { + Deps earlierWait = Deps.merge(nullableRecoverOkList, nullableRecoverOkList.size(), List::get, ok -> ok.earlierWait); + Deps earlierNoWait = Deps.merge(nullableRecoverOkList, nullableRecoverOkList.size(), List::get, ok -> ok.earlierNoWait); + earlierWait = earlierWait.without(earlierNoWait); + earlierWait = earlierWait.intersecting(participants); + awaitEarlier(node, earlierWait, HasDecidedExecuteAt).begin((success, fail) -> { + if (fail != null) accept(null, fail); + else whenReady.run(); + }); + } + private static boolean supersedingRejects(List<RecoverOk> oks) { for (RecoverOk ok : oks) @@ -464,15 +493,13 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw return ok.laterCoordRejects.with(id -> from.equals(id.node)); } - private void withCommittedDeps(List<RecoverOk> nullableRecoverOkList, Timestamp executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps> withDeps) + private void withCommittedDeps(LatestDeps.Merge merge, Timestamp executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps> withDeps) { - LatestDeps.Merge merge = LatestDeps.merge(nullableRecoverOkList, ok -> ok == null ? null : ok.deps); LatestDeps.withCommitted(adapter, node, merge, route, ballot, txnId, executeAt, txn, failureCallback, withDeps); } - private void withStableDeps(List<RecoverOk> nullableRecoverOkList, Timestamp executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps> withDeps) + private void withStableDeps(LatestDeps.Merge merge, Timestamp executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps> withDeps) { - LatestDeps.Merge merge = LatestDeps.merge(nullableRecoverOkList, ok -> ok == null ? null : ok.deps); LatestDeps.withStable(adapter, node, merge, Deps.NONE, route, null, null, route, ballot, txnId, executeAt, txn, failureCallback, withDeps); } @@ -506,7 +533,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw private void propose(Accept.Kind kind, Timestamp executeAt, List<RecoverOk> recoverOkList) { - Deps proposeDeps = LatestDeps.mergeProposal(recoverOkList, ok -> ok == null ? null : ok.deps); + Deps proposeDeps = mergeDeps(recoverOkList).mergeProposal(); propose(kind, executeAt, proposeDeps); } diff --git a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java index d2168d0d..f1fa1eae 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java @@ -100,7 +100,7 @@ public class FastPathTracker extends PreAcceptTracker<FastPathTracker.FastPathSh ++fastPathFailures; if (hasRejectedFastPath() && hasReachedQuorum()) - return complete(Success); + return mediumOrSlowSuccess(); } return NoChange; @@ -116,7 +116,7 @@ public class FastPathTracker extends PreAcceptTracker<FastPathTracker.FastPathSh ++fastPathDelayed; if (isFastPathDelayed() && hasReachedQuorum()) - return complete(Success); + return mediumOrSlowSuccess(); } return NoChange; @@ -125,10 +125,15 @@ public class FastPathTracker extends PreAcceptTracker<FastPathTracker.FastPathSh final ShardOutcome<? super FastPathTracker> quorumIfHasRejectedFastPath() { return hasReachedQuorum() && hasRejectedFastPath() - ? hasMetMediumPathCriteria() ? complete(NewMediumPathSuccess) : complete(Success) + ? mediumOrSlowSuccess() : NoChange; } + final ShardOutcome<? super FastPathTracker> mediumOrSlowSuccess() + { + return hasMetMediumPathCriteria() ? complete(NewMediumPathSuccess) : complete(Success); + } + final boolean isFastPathDelayed() { return shard.rejectsFastPath(fastQuorumSize, fastPathDelayed); diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java index ddec94a2..dd9c2c90 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java @@ -117,7 +117,7 @@ public class ReadTracker extends AbstractTracker<ReadTracker.ReadShardTracker> return NoChange; // TODO (low priority, efficiency): support slice method accepting a single Range - if (unavailable == null) unavailable = partialSuccess.unavailable.slice(Ranges.of(shard.range)); + if (unavailable == null) unavailable = partialSuccess.unavailable.slice(Ranges.of(shard.range), Minimal); else unavailable = unavailable.slice(partialSuccess.unavailable, Minimal); if (!unavailable.isEmpty()) return ensureProgressOrFail(); diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java index 3cf7df2a..4fc927f0 100644 --- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -334,9 +334,9 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo if (epochs.wasTruncated(ready.epoch)) return; - ready.metadata.addCallback(() -> epochs.acknowledge(ready)); - ready.coordinate.addCallback(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync)); - ready.reads.addCallback(() -> localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology)); + ready.metadata.invokeIfSuccess(() -> epochs.acknowledge(ready)); + ready.coordinate.invokeIfSuccess(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync)); + ready.reads.invokeIfSuccess(() -> localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology)); } protected void topologyUpdatePostListenerNotify(Topology topology) {} @@ -350,7 +350,7 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo if (lastReceived > 0 && topology.epoch() > lastReceived + 1) { logger.debug("Epoch {} received; waiting to receive {} before reporting", topology.epoch(), lastReceived + 1); - epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology, isLoad, startSync)); + epochs.receiveFuture(lastReceived + 1).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync)); fetchTopologyForEpoch(lastReceived + 1); return; } @@ -359,14 +359,14 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo if (lastAcked == 0 && lastReceived > 0) { logger.debug("Epoch {} received; waiting for {} to ack before reporting", topology.epoch(), epochs.minEpoch()); - epochs.acknowledgeFuture(epochs.minEpoch()).addCallback(() -> reportTopology(topology, isLoad, startSync)); + epochs.acknowledgeFuture(epochs.minEpoch()).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync)); return; } if (lastAcked > 0 && topology.epoch() > lastAcked + 1) { logger.debug("Epoch {} received; waiting for {} to ack before reporting", topology.epoch(), lastAcked + 1); - epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology, isLoad, startSync)); + epochs.acknowledgeFuture(lastAcked + 1).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync)); return; } diff --git a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java index d67b4105..0e9a1ebb 100644 --- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java +++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java @@ -86,14 +86,14 @@ extends SafeCommandStore try (Caches caches = tryGetCaches()) { - if (caches == null) - return with.isSubsetOf(this.context) ? with : null; - for (TxnId txnId : with.txnIds()) { if (null != getInternal(txnId)) continue; + if (caches == null) + return null; + C safeCommand = caches.acquireIfLoaded(txnId); if (safeCommand == null) return null; @@ -116,11 +116,14 @@ extends SafeCommandStore if (null != getInternal(key)) continue; // already in working set - CFK safeCfk = caches.acquireIfLoaded(key); - if (safeCfk != null) + if (caches != null) { - add(safeCfk, caches); - continue; + CFK safeCfk = caches.acquireIfLoaded(key); + if (safeCfk != null) + { + add(safeCfk, caches); + continue; + } } if (unavailable == null) unavailable = new ArrayList<>(); diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index abe95748..4ce5e59a 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -781,7 +781,7 @@ public abstract class CommandStores for (ShardHolder shard : shards) results.add(shard.store.build(context, mapper)); - return AsyncChains.all(results); + return AsyncChains.allOf(results); } protected <O> AsyncChain<O> mapReduce(PreLoadContext context, IntStream commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce) diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 446633c5..a5c421c2 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -238,7 +238,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ public AsyncResult<Void> unsafeStart() { EpochReady ready = onTopologyUpdateInternal(configService.currentTopology(), false); - ready.coordinate.addCallback(() -> this.topology.onEpochSyncComplete(id, topology.epoch())); + ready.coordinate.invokeIfSuccess(() -> this.topology.onEpochSyncComplete(id, topology.epoch())); configService.acknowledgeEpoch(ready, false); return ready.metadata; } @@ -357,7 +357,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ if (topology.epoch() <= this.topology.epoch()) return AsyncResults.success(null); EpochReady ready = onTopologyUpdateInternal(topology, startSync); - ready.coordinate.addCallback(() -> this.topology.onEpochSyncComplete(id, topology.epoch())); + ready.coordinate.invokeIfSuccess(() -> this.topology.onEpochSyncComplete(id, topology.epoch())); configService.acknowledgeEpoch(ready, startSync); return ready.coordinate; } @@ -790,7 +790,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ { AsyncResult<Result> result = withEpoch(Math.max(txnId.epoch(), minEpoch), () -> initiateCoordination(txnId, txn)).beginAsResult(); coordinating.putIfAbsent(txnId, result); - result.addCallback((success, fail) -> coordinating.remove(txnId, result)); + result.invoke((success, fail) -> coordinating.remove(txnId, result)); return result; } @@ -855,7 +855,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ return future; }).beginAsResult(); coordinating.putIfAbsent(txnId, result); - result.addCallback((success, fail) -> coordinating.remove(txnId, result)); + result.invoke((success, fail) -> coordinating.remove(txnId, result)); return result; } @@ -865,7 +865,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ if (waitForEpoch > topology.epoch()) { configService.fetchTopologyForEpoch(waitForEpoch); - topology().awaitEpoch(waitForEpoch).addCallback((ignored, failure) -> { + topology().awaitEpoch(waitForEpoch).invoke((ignored, failure) -> { if (failure != null) agent().onUncaughtException(WrappableException.wrap(failure)); else diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index 7efe5509..3a86e5cc 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -519,7 +519,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> if (bounds == null) return execute; - Invariants.require(executeAt == null ? !bounds.outOfBounds(txnId) : !bounds.outOfBounds(txnId, executeAt)); + Invariants.require(txnId.isSyncPoint() || (executeAt == null ? !bounds.outOfBounds(txnId) : !bounds.outOfBounds(txnId, executeAt))); if (bounds.is(txnId, PRE_BOOTSTRAP_OR_STALE)) return without.apply(execute, Ranges.of(bounds.range)); diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java index c30a140f..628b5718 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java @@ -660,7 +660,7 @@ public class CommandsForKey extends CommandsForKeyUpdate return encoded; } - private static int encode(TxnId txnId, InternalStatus internalStatus, boolean mayExecute) + static int encode(TxnId txnId, InternalStatus internalStatus, boolean mayExecute) { int encoded = internalStatus.txnInfoEncoded | (mayExecute ? MAY_EXECUTE : 0); if (txnId.is(Key)) encoded |= MANAGED; @@ -1262,10 +1262,10 @@ public class CommandsForKey extends CommandsForKeyUpdate int insertPos = Arrays.binarySearch(byId, testStartedAtTimestamp); if (insertPos < 0) { - loadingFor = NO_TXNIDS; + loadingFor = NOT_LOADING_PRUNED; insertPos = -1 - insertPos; if (computeIsDep != IGNORE && testTxnId.compareTo(prunedBefore) < 0) - loadingFor = loadingPrunedFor(loadingPruned, testTxnId, NO_TXNIDS); + loadingFor = loadingPrunedFor(loadingPruned, testTxnId, NOT_LOADING_PRUNED); } switch (testStartedAt) @@ -1300,7 +1300,7 @@ public class CommandsForKey extends CommandsForKeyUpdate TxnId[] missing = txn.missing(); hasAsDep = missing == NO_TXNIDS || Arrays.binarySearch(txn.missing(), testTxnId) < 0; } - else if (loadingFor == NO_TXNIDS) + else if (loadingFor == NOT_LOADING_PRUNED) { hasAsDep = false; } @@ -1603,35 +1603,33 @@ public class CommandsForKey extends CommandsForKeyUpdate { // update TxnInfo cur = byId[pos]; - if (cur != null) + int c = cur.compareTo(newStatus); + if (c > 0) { - int c = cur.compareTo(newStatus); - if (c > 0) - { - // newStatus moves us backwards; we only permit this for (Pre)?(Not)?Accepted states - if (cur.compareTo(COMMITTED) >= 0 || newStatus.compareTo(PREACCEPTED) <= 0) - return this; + // newStatus moves us backwards; we only permit this for (Pre)?(Not)?Accepted states + if (cur.compareTo(COMMITTED) >= 0 || newStatus.compareTo(PREACCEPTED) <= 0) + return this; - // and only when the new ballot is strictly greater - if (updated.acceptedOrCommitted().compareTo(cur.ballot()) <= 0) - return this; - } - else if (c == 0) - { - // we're updating to the same state; we only do this with a strictly greater ballot; - // even so, if we have no executeAt or deps there's nothing to record - if (updated.acceptedOrCommitted().compareTo(cur.ballot()) <= 0 || !newStatus.hasExecuteAtOrDeps()) - return this; - } - else - { - // we're advancing to a higher status, but this is only permitted either if the new state is stable or the ballot is higher - if (cur.compareTo(STABLE) < 0 && updated.acceptedOrCommitted().compareTo(cur.ballot()) < 0) - return this; - } + // and only when the new ballot is strictly greater + if (updated.acceptedOrCommitted().compareTo(cur.ballot()) <= 0) + return this; + } + else if (c == 0) + { + // we're updating to the same state; we only do this with a strictly greater ballot; + // even so, if we have no executeAt or deps there's nothing to record + if (updated.acceptedOrCommitted().compareTo(cur.ballot()) <= 0 || !newStatus.hasExecuteAtOrDeps()) + return this; + } + else + { + // we're advancing to a higher status, but this is only permitted either if the new state is stable or the ballot is higher + if (cur.compareTo(STABLE) < 0 && updated.acceptedOrCommitted().compareTo(cur.ballot()) < 0) + return this; } if (isOutOfRange) result = insertOrUpdateOutOfRange(pos, txnId, cur, newStatus, mayExecute, updated, witnessedBy); + else if (cur.compareTo(STABLE) >= 0) result = update(pos, txnId, cur, cur.withEncodedStatus(TxnInfo.encode(txnId, newStatus, cur.mayExecute(), cur.statusOverrides())), updated, witnessedBy); else if (newStatus.hasDeps()) result = update(pos, txnId, cur, newStatus, mayExecute, updated, witnessedBy); else result = update(pos, txnId, cur, TxnInfo.create(txnId, newStatus, mayExecute, updated), updated, witnessedBy); } @@ -2000,10 +1998,10 @@ public class CommandsForKey extends CommandsForKeyUpdate return new CommandsForKeyUpdateWithPostProcess(newCfk, newPostProcess); } - TxnInfo[] newById = removeRedundantById(byId, bounds, newBounds); + Object[] newLoadingPruned = Pruning.removeRedundantLoadingPruned(loadingPruned, redundantBefore(newBounds)); + TxnInfo[] newById = removeRedundantById(byId, newLoadingPruned != loadingPruned, bounds, newBounds); int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(), redundantBefore(newBounds)); Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 || byId[prunedBeforeById].compareTo(newBounds.gcBefore) < 0 : newById[newPrunedBeforeById].equals(byId[prunedBeforeById])); - Object[] newLoadingPruned = Pruning.removeRedundantLoadingPruned(loadingPruned, redundantBefore(newBounds)); long maxUniqueHlc = this.maxUniqueHlc; if (maxUniqueHlc <= newBounds.gcBefore.hlc() && newBounds.gcBefore.is(HLC_BOUND)) @@ -2022,10 +2020,10 @@ public class CommandsForKey extends CommandsForKeyUpdate { QuickBounds newBoundsInfo = bounds.withGcBeforeBeforeAtLeast(newRedundantBefore); - TxnInfo[] newById = removeRedundantById(byId, bounds, newBoundsInfo); + Object[] newLoadingPruned = Pruning.removeRedundantLoadingPruned(loadingPruned, newRedundantBefore); + TxnInfo[] newById = removeRedundantById(byId, newLoadingPruned != loadingPruned, bounds, newBoundsInfo); int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(), newRedundantBefore); Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 : newById[newPrunedBeforeById].equals(byId[prunedBeforeById])); - Object[] newLoadingPruned = Pruning.removeRedundantLoadingPruned(loadingPruned, newRedundantBefore); return reconstruct(key, newBoundsInfo, true, newById, maxUniqueHlc, newLoadingPruned, newPrunedBeforeById, unmanageds); } @@ -2216,7 +2214,7 @@ public class CommandsForKey extends CommandsForKeyUpdate { Invariants.require(txn.witnesses(missingId)); TxnInfo missingInfo = get(missingId, byId); - Invariants.require(missingInfo.status().compareTo(InternalStatus.COMMITTED) < 0); + Invariants.require(missingInfo == null ? missingId.is(UNSTABLE) && find(loadingPruned, missingId) != null : missingInfo.status().compareTo(InternalStatus.COMMITTED) < 0); Invariants.require(txn.depsKnownBefore().compareTo(missingId) >= 0); } if (txn.isCommittedAndExecutes()) diff --git a/accord-core/src/main/java/accord/local/cfk/Pruning.java b/accord-core/src/main/java/accord/local/cfk/Pruning.java index 08788c05..29aec370 100644 --- a/accord-core/src/main/java/accord/local/cfk/Pruning.java +++ b/accord-core/src/main/java/accord/local/cfk/Pruning.java @@ -527,7 +527,7 @@ public class Pruning return epochPrunedBefores; } - static TxnInfo[] removeRedundantById(TxnInfo[] byId, QuickBounds prevBounds, QuickBounds newBounds) + static TxnInfo[] removeRedundantById(TxnInfo[] byId, boolean hasRedundantLoadingPruned, QuickBounds prevBounds, QuickBounds newBounds) { TxnId newRedundantBefore = redundantBefore(newBounds); TxnId newBootstrappedAt = bootstrappedAt(newBounds); @@ -538,7 +538,7 @@ public class Pruning TxnInfo[] newById = byId; int pos = insertPos(byId, newRedundantBefore); - if (pos != 0) + if (pos != 0 || hasRedundantLoadingPruned) { if (Invariants.isParanoid() && testParanoia(LINEAR, NONE, LOW)) { diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java b/accord-core/src/main/java/accord/local/cfk/Updating.java index a33d3b5b..d93f2735 100644 --- a/accord-core/src/main/java/accord/local/cfk/Updating.java +++ b/accord-core/src/main/java/accord/local/cfk/Updating.java @@ -47,6 +47,7 @@ import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; +import accord.utils.ArrayBuffers.ObjectBuffers; import accord.utils.Invariants; import accord.utils.RelationMultiMap; import accord.utils.SortedArrays; @@ -65,6 +66,7 @@ import static accord.local.cfk.CommandsForKey.NO_INFOS; import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY; import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.COMMIT; import static accord.local.cfk.CommandsForKey.executesIgnoringBootstrap; +import static accord.local.cfk.CommandsForKey.manages; import static accord.local.cfk.CommandsForKey.reportLinearizabilityViolation; import static accord.local.cfk.CommandsForKey.mayExecute; import static accord.local.cfk.Pruning.removeLoadingPruned; @@ -120,7 +122,6 @@ class Updating static CommandsForKeyUpdate insertOrUpdate(CommandsForKey cfk, int insertPos, int updatePos, TxnId plainTxnId, TxnInfo curInfo, InternalStatus newStatus, boolean mayExecute, Command command, @Nonnull TxnId[] witnessedBy) { - // TODO (expected): do not calculate any deps or additions if we're transitioning from Stable to Applied; wasted effort and might trigger LoadPruned Object newInfoObj = computeInfoAndAdditions(cfk, insertPos, updatePos, plainTxnId, newStatus, mayExecute, command); if (newInfoObj.getClass() != InfoWithAdditions.class) return insertOrUpdate(cfk, insertPos, plainTxnId, curInfo, (TxnInfo)newInfoObj, command, witnessedBy); @@ -157,7 +158,7 @@ class Updating TxnInfo[] newById = new TxnInfo[byId.length + additionCount + (updatePos < 0 ? 1 : 0)]; insertOrUpdateWithAdditions(byId, insertPos, updatePos, plainTxnId, newInfo, additions, additionCount, newById, newCommittedByExecuteAt, witnessedBy, cfk.bounds); if (testParanoia(SUPERLINEAR, NONE, LOW)) - validateMissing(newById, additions, additionCount, curInfo, newInfo, NO_TXNIDS); + validateMissing(newById, additions, additionCount, curInfo, newInfo, witnessedBy); int newMinUndecidedById = updateMinUndecidedById(newInfo, insertPos, updatePos, cfk, byId, newById, additions, additionCount); // we don't insert anything before prunedBeforeById (we LoadPruned instead), so we can simply bump it by 0 or 1 @@ -299,14 +300,14 @@ class Updating MergeCursor<TxnId, DepList> deps = command.partialDeps().txnIds(cfk.key()); deps.find(cfk.redundantBefore()); - return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, txnId, newStatus, mayExecute, ballot, executeAt, depsKnownBefore, deps); + return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, txnId, newStatus, mayExecute, ballot, executeAt, cfk.prunedBefore(), depsKnownBefore, deps); } /** * We return an Object here to avoid wasting allocations; most of the time we expect a new TxnInfo to be returned, * but if we have transitive dependencies to insert we return an InfoWithAdditions */ - static Object computeInfoAndAdditions(TxnInfo[] byId, int insertPos, int updatePos, TxnId plainTxnId, InternalStatus newStatus, boolean mayExecute, Ballot ballot, Timestamp executeAt, Timestamp depsKnownBefore, MergeCursor<TxnId, DepList> deps) + static Object computeInfoAndAdditions(TxnInfo[] byId, int insertPos, int updatePos, TxnId plainTxnId, InternalStatus newStatus, boolean mayExecute, Ballot ballot, Timestamp executeAt, TxnInfo prunedBefore, Timestamp depsKnownBefore, MergeCursor<TxnId, DepList> deps) { TxnId[] additions = NO_TXNIDS, missing = NO_TXNIDS; int additionCount = 0, missingCount = 0; @@ -335,12 +336,8 @@ class Updating // we should ensure any existing TRANSITIVE entries are upgraded. // OR we should remove TRANSITIVE for simplicity, // OR document/enforce that TRANSITIVE_VISIBLE can only be applied to dependencies of unmanaged transactions - if (d.is(UNSTABLE) && t.compareTo(COMMITTED) < 0 && t.witnesses(d)) - { - if (missingCount == missing.length) - missing = cachedTxnIds().resize(missing, missingCount, Math.max(8, missingCount * 2)); - missing[missingCount++] = d; - } + if (d.is(UNSTABLE) && txnIdsIndex < depsKnownBeforePos && t.compareTo(COMMITTED) < 0 && plainTxnId.witnesses(d)) + missing = append(missing, missingCount++, d, cachedTxnIds()); ++txnIdsIndex; deps.advance(); @@ -350,21 +347,20 @@ class Updating // we expect to be missing ourselves // we also permit any transaction we have recorded as COMMITTED or later to be missing, as recovery will not need to consult our information if (txnIdsIndex != updatePos && txnIdsIndex < depsKnownBeforePos && t.compareTo(COMMITTED) < 0 && plainTxnId.witnesses(t)) - { - if (missingCount == missing.length) - missing = cachedTxnIds().resize(missing, missingCount, Math.max(8, missingCount * 2)); - missing[missingCount++] = t.plainTxnId(); - } + missing = append(missing, missingCount++, t.plainTxnId(), cachedTxnIds()); txnIdsIndex++; } else { if (plainTxnId.witnesses(d)) { - if (additionCount >= additions.length) - additions = cachedTxnIds().resize(additions, additionCount, Math.max(8, additionCount * 2)); - - additions[additionCount++] = d; + if (d.is(UNSTABLE)) + { + if (d.compareTo(depsKnownBefore) < 0 && (manages(d) || d.compareTo(prunedBefore) > 0)) + missing = append(missing, missingCount++, d, cachedTxnIds()); + d = d.withoutNonIdentityFlags(); + } + additions = append(additions, additionCount++, d, cachedTxnIds()); } else { @@ -384,9 +380,13 @@ class Updating TxnId d = deps.cur(); if (plainTxnId.witnesses(d)) { - if (additionCount >= additions.length) - additions = cachedTxnIds().resize(additions, additionCount, Math.max(8, additionCount * 2)); - additions[additionCount++] = deps.cur().withoutNonIdentityFlags(); + if (d.is(UNSTABLE)) + { + if (d.compareTo(depsKnownBefore) < 0 && (manages(d) || d.compareTo(prunedBefore) > 0)) + missing = append(missing, missingCount++, d, cachedTxnIds()); + d = d.withoutNonIdentityFlags(); + } + additions = append(additions, additionCount++, d, cachedTxnIds()); } deps.advance(); } @@ -398,13 +398,9 @@ class Updating { if (txnIdsIndex != updatePos && byId[txnIdsIndex].compareTo(COMMITTED) < 0) { - TxnId txnId = byId[txnIdsIndex].plainTxnId(); - if ((plainTxnId.witnesses(txnId))) - { - if (missingCount == missing.length) - missing = cachedTxnIds().resize(missing, missingCount, Math.max(8, missingCount * 2)); - missing[missingCount++] = txnId; - } + TxnInfo txn = byId[txnIdsIndex]; + if (plainTxnId.witnesses(txn)) + missing = append(missing, missingCount++, txn.plainTxnId(), cachedTxnIds()); } txnIdsIndex++; } @@ -417,6 +413,14 @@ class Updating return new InfoWithAdditions(info, additions, additionCount); } + private static <T> T[] append(T[] array, int index, T add, ObjectBuffers<T> cached) + { + if (index == array.length) + array = cached.resize(array, index, Math.max(8, index * 2)); + array[index] = add; + return array; + } + static CommandsForKeyUpdate insertOrUpdate(CommandsForKey cfk, int pos, TxnId plainTxnId, TxnInfo curInfo, TxnInfo newInfo, Command command, @Nullable TxnId[] witnessedBy) { if (curInfo == newInfo) @@ -475,6 +479,10 @@ class Updating // TODO (desired): for consistency, move this to insertOrUpdate (without additions), while maintaining the efficiency Utils.addToMissingArrays(newById, newCommittedByExecuteAt, newInfo, plainTxnId, witnessedBy); } + else if (witnessedBy != null && newInfo.compareTo(COMMITTED) >= 0) + { + Utils.removeFromWitnessMissingArrays(newById, newCommittedByExecuteAt, plainTxnId, witnessedBy); + } if (testParanoia(SUPERLINEAR, NONE, LOW) && curInfo == null && newInfo.compareTo(COMMITTED) < 0) validateMissing(newById, NO_TXNIDS, 0, curInfo, newInfo, witnessedBy); @@ -499,7 +507,8 @@ class Updating // we may need to insert or remove ourselves, depending on the new and prior status boolean insertSelfMissing = sourceUpdatePos < 0 && newInfo.compareTo(COMMITTED) < 0; - boolean removeSelfMissing = sourceUpdatePos >= 0 && newInfo.compareTo(COMMITTED) >= 0 && byId[sourceUpdatePos].compareTo(COMMITTED) < 0; + boolean removeSelfMissing = newInfo.compareTo(COMMITTED) >= 0 && sourceUpdatePos >= 0 && byId[sourceUpdatePos].compareTo(COMMITTED) < 0; + boolean removeWitnessedMissing = newInfo.compareTo(COMMITTED) >= 0 && witnessedBy.length > 0; // missingSource starts as additions, but if we insertSelfMissing at the relevant moment it becomes the merge of additions and plainTxnId TxnId[] missingSource = additions; @@ -536,20 +545,24 @@ class Updating } int to = missingTo(txn, depsKnownBefore, missingSource, missingCount, missingLimit); - if (to > 0 || removeSelfMissing) + if (to > 0 || removeSelfMissing || removeWitnessedMissing) { + int witnessedByIndex = -1; + if (witnessedBy != NOT_LOADING_PRUNED && (to > 0 || !removeSelfMissing)) + witnessedByIndex = Arrays.binarySearch(witnessedBy, txn); + TxnId[] curMissing = txn.missing(); TxnId[] newMissing = curMissing; if (to > 0) { TxnId skipInsertMissing = null; - if (Arrays.binarySearch(witnessedBy, plainTxnId) >= 0) + if (insertSelfMissing && witnessedByIndex >= 0) skipInsertMissing = plainTxnId; newMissing = mergeAndFilterMissing(txn, curMissing, missingSource, to, skipInsertMissing); } - if (removeSelfMissing) + if (removeSelfMissing || (removeWitnessedMissing && witnessedByIndex >= 0)) newMissing = removeOneMissing(newMissing, plainTxnId); if (newMissing != curMissing) diff --git a/accord-core/src/main/java/accord/local/cfk/Utils.java b/accord-core/src/main/java/accord/local/cfk/Utils.java index a8a0ae39..695e8f88 100644 --- a/accord-core/src/main/java/accord/local/cfk/Utils.java +++ b/accord-core/src/main/java/accord/local/cfk/Utils.java @@ -42,6 +42,7 @@ class Utils { static void validateMissing(TxnInfo[] byId, TxnId[] additions, int additionCount, TxnInfo curInfo, TxnInfo newInfo, @Nonnull TxnId[] shouldNotHaveMissing) { + int newInfoAdditionIndex = Arrays.binarySearch(additions, 0, additionCount, newInfo); for (TxnInfo txn : byId) { if (txn == newInfo) continue; @@ -54,7 +55,7 @@ class Utils { if (!txn.witnesses(additions[i])) continue; j = SortedArrays.exponentialSearch(missing, j, missing.length, additions[i]); - if (shouldNotHaveMissing != NO_TXNIDS && Arrays.binarySearch(shouldNotHaveMissing, txn) >= 0) Invariants.require(j < 0); + if (shouldNotHaveMissing != NO_TXNIDS && i == newInfoAdditionIndex && Arrays.binarySearch(shouldNotHaveMissing, txn) >= 0) Invariants.require(j < 0); else Invariants.require(j >= 0); } if (curInfo == null && newInfo.compareTo(COMMITTED) < 0 && txn.witnesses(newInfo) && txn.depsKnownBefore().compareTo(newInfo) > 0 && (shouldNotHaveMissing == NO_TXNIDS || Arrays.binarySearch(shouldNotHaveMissing, txn) < 0)) @@ -95,6 +96,40 @@ class Utils removeFromMissingArraysById(byId, minSearchIndex, byId.length, removeTxnId); } + /** + * {@code removeTxnId} no longer needs to be tracked in missing arrays; + * remove it from byId and committedByExecuteAt, ensuring both arrays still reference the same TxnInfo where updated + */ + static void removeFromWitnessMissingArrays(TxnInfo[] byId, TxnInfo[] committedByExecuteAt, TxnId removeTxnId, TxnId[] witnessedBy) + { + if (witnessedBy.length == 0) + return; + + int byIdIndex = Arrays.binarySearch(byId, witnessedBy[0]); + if (byIdIndex < 0) + byIdIndex = -1 - byIdIndex; + + for (TxnId txnId : witnessedBy) + { + byIdIndex = SortedArrays.exponentialSearch(byId, byIdIndex, byId.length, txnId); + if (byIdIndex < 0) + { + byIdIndex = -1 - byIdIndex; + continue; + } + TxnInfo curTxn = byId[byIdIndex]; + TxnId[] curMissing = curTxn.missing(); + if (curMissing == NO_TXNIDS) continue; + TxnId[] newMissing = removeOneMissing(curMissing, removeTxnId); + if (newMissing == curMissing) continue; + TxnInfo newTxn = curTxn.withMissing(newMissing); + byId[byIdIndex] = newTxn; + if (!curTxn.isCommittedAndExecutes()) continue; + int byExecuteAtIndex = Arrays.binarySearch(committedByExecuteAt, curTxn, TxnInfo::compareExecuteAt); + committedByExecuteAt[byExecuteAtIndex] = newTxn; + } + } + /** * {@code removeTxnId} no longer needs to be tracked in missing arrays; * remove it from a range of byId ACCEPTED status entries only, that could not be tracked via committedByExecuteAt diff --git a/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java b/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java index ea840f09..202e7eff 100644 --- a/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java +++ b/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java @@ -49,7 +49,7 @@ class ConcurrencyControl implements BiConsumer<Object, Throwable> void start(ConcurrencyControl concurrencyControl) { - supplier.get().addCallback(result).begin(concurrencyControl); + supplier.get().invoke(result).begin(concurrencyControl); } } @@ -80,7 +80,7 @@ class ConcurrencyControl implements BiConsumer<Object, Throwable> } ++inflight; } - return supplier.get().addCallback(this); + return supplier.get().invoke(this); } synchronized void setMaxConcurrency(int newMaxConcurrency) diff --git a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java index 431a396c..9f209e52 100644 --- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java +++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java @@ -248,7 +248,7 @@ public class DurabilityQueue if (request != null) request.reportAttempt(exclusiveSyncPoint.syncId, node.elapsed(MICROSECONDS), coordinate); - coordinate.addCallback((success, fail) -> { + coordinate.invoke((success, fail) -> { synchronized (this) { unregisterInProgress(exclusiveSyncPoint, coordinate); diff --git a/accord-core/src/main/java/accord/local/durability/ShardDurability.java b/accord-core/src/main/java/accord/local/durability/ShardDurability.java index 4c17b8c4..f9abe277 100644 --- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java +++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java @@ -321,7 +321,7 @@ public class ShardDurability syncId -> node.withEpoch(syncId.epoch(), () -> syncPointControl.submit( () -> CoordinateSyncPoint.exclusive(node, syncId, (FullRoute<Range>) node.computeRoute(syncId, ranges)) - .addCallback(logSyncPoint(syncId, ranges)) + .invoke(logSyncPoint(syncId, ranges)) ))) .begin((success, fail) -> { scheduled = null; diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java index 379f0a7b..8b5f3855 100644 --- a/accord-core/src/main/java/accord/messages/Apply.java +++ b/accord-core/src/main/java/accord/messages/Apply.java @@ -52,9 +52,9 @@ public class Apply extends TxnRequest<ApplyReply> public static final Factory FACTORY = Apply::new; public static class SerializationSupport { - public static Apply create(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) + public static Apply create(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) { - return new Apply(kind, txnId, scope, minEpoch, waitForEpoch, executeAt, deps, txn, fullRoute, writes, result); + return new Apply(kind, txnId, scope, minEpoch, waitForEpoch, maxEpoch, executeAt, deps, txn, fullRoute, writes, result); } } @@ -71,6 +71,7 @@ public class Apply extends TxnRequest<ApplyReply> public final @Nullable Writes writes; public final Result result; public final long minEpoch; + public final long maxEpoch; public enum Kind { Minimal, Maximal } @@ -86,6 +87,7 @@ public class Apply extends TxnRequest<ApplyReply> this.writes = writes; this.result = result; this.minEpoch = participates.oldestEpoch(); + this.maxEpoch = participates.currentEpoch(); } public static Topologies participates(Node node, Unseekables<?> route, TxnId txnId, Timestamp executeAt, Topologies executes) @@ -98,7 +100,7 @@ public class Apply extends TxnRequest<ApplyReply> return node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch(), SHARE); } - protected Apply(Kind kind, TxnId txnId, Route<?> route, long minEpoch, long waitForEpoch, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) + protected Apply(Kind kind, TxnId txnId, Route<?> route, long minEpoch, long waitForEpoch, long maxEpoch, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) { super(txnId, route, waitForEpoch); this.kind = kind; @@ -109,19 +111,20 @@ public class Apply extends TxnRequest<ApplyReply> this.writes = writes; this.result = result; this.minEpoch = minEpoch; + this.maxEpoch = maxEpoch; } @Override public Cancellable submit() { - return node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch(), this); + return node.mapReduceConsumeLocal(this, minEpoch, maxEpoch, this); } @Override public ApplyReply apply(SafeCommandStore safeStore) { Route<?> route = fullRoute != null ? fullRoute : scope; - StoreParticipants participants = StoreParticipants.execute(safeStore, route, minEpoch, txnId, executeAt.epoch()); + StoreParticipants participants = StoreParticipants.execute(safeStore, route, minEpoch, txnId, maxEpoch); return apply(safeStore, participants); } diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java index d3c0da31..e858a659 100644 --- a/accord-core/src/main/java/accord/messages/BeginRecovery.java +++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java @@ -479,7 +479,7 @@ public class BeginRecovery extends TxnRequest.WithUnsynced<BeginRecovery.Recover ", deps:" + deps + ", earlierWait:" + earlierWait + ", earlierNoWait:" + earlierNoWait + - ", laterNoVote:" + laterCoordRejects + + ", laterCoordRejects:" + laterCoordRejects + ", selfAcceptsFastPath:" + selfAcceptsFastPath + (txnId.hasPrivilegedCoordinator() ? ", coordinatorFastPath:" + selfAcceptsFastPath : "") + ", supersedingRejects:" + supersedingRejects + diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java index a55201e6..dabff631 100644 --- a/accord-core/src/main/java/accord/messages/PreAccept.java +++ b/accord-core/src/main/java/accord/messages/PreAccept.java @@ -123,7 +123,7 @@ public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply> if (command.status().compareTo(Status.PreAccepted) > 0) return PreAcceptNack.INSTANCE; - if (command.executeAt().is(REJECTED)) + if (command.executeAt().is(REJECTED) && !participants.owns().isEmpty()) // if our vote is required we don't need to compute deps return new PreAcceptOk(txnId, command.executeAt(), Deps.NONE, ExecuteFlags.none()); case Retired: diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java index ff3a6c99..473ea98d 100644 --- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java +++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java @@ -40,7 +40,7 @@ public class SetGloballyDurable implements Request, PreLoadContext @Override public void process(Node node, Node.Id from, ReplyContext replyContext) { - node.markDurable(durableBefore).addCallback((success, fail) -> { + node.markDurable(durableBefore).invoke((success, fail) -> { node.reply(from, replyContext, fail == null ? Ok : null, fail); }); } diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java b/accord-core/src/main/java/accord/messages/SetShardDurable.java index 1ead882f..5f27c3e1 100644 --- a/accord-core/src/main/java/accord/messages/SetShardDurable.java +++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java @@ -57,7 +57,7 @@ public class SetShardDurable extends AbstractRequest<SimpleReply> Invariants.require(durability.compareTo(Durability.MajorityOrInvalidated) >= 0); TxnId syncIdWithFlags = syncIdWithFlags(); node.markDurable(exclusiveSyncPoint.route.toRanges(), syncIdWithFlags, durability.compareTo(Durability.UniversalOrInvalidated) >= 0 ? syncIdWithFlags : TxnId.NONE) - .addCallback((success, fail) -> { + .invoke((success, fail) -> { if (fail != null) node.reply(replyTo, replyContext, null, fail); else node.mapReduceConsumeLocal(this, exclusiveSyncPoint.route, waitForEpoch(), waitForEpoch(), this); }); diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java index bc3d8dac..a2fa3c88 100644 --- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java +++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java @@ -26,6 +26,7 @@ import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.TxnId; import accord.topology.Topologies; +import accord.utils.Invariants; import static accord.messages.MessageType.StandardMessage.WAIT_UNTIL_APPLIED_REQ; import static accord.primitives.SaveStatus.Applied; @@ -52,6 +53,7 @@ public class WaitUntilApplied extends ReadData { super(to, topologies, txnId, scope, executeAtEpoch); this.minEpoch = topologies.oldestEpoch(); + Invariants.require(minEpoch <= executeAtEpoch); } protected WaitUntilApplied(TxnId txnId, Participants<?> scope, long minEpoch, long executeAtEpoch) diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java index dc19ff20..d7bcad48 100644 --- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java +++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java @@ -222,6 +222,12 @@ public abstract class AbstractRanges implements Iterable<Range>, Routables<Range return SortedArrays.binarySearch(ranges, 0, size(), find, Range::compareIntersecting, search); } + @Override + public final int indexOf(Range find) + { + return Arrays.binarySearch(ranges, 0, size(), find, Range::compare); + } + @Override public final int findNext(int thisIndex, Range find, SortedArrays.Search search) { @@ -648,6 +654,8 @@ public abstract class AbstractRanges implements Iterable<Range>, Routables<Range @Override public boolean equals(Object that) { + if (this == that) + return true; if (that == null || this.getClass() != that.getClass()) return false; return Arrays.equals(this.ranges, ((AbstractRanges) that).ranges); diff --git a/accord-core/src/main/java/accord/primitives/LatestDeps.java b/accord-core/src/main/java/accord/primitives/LatestDeps.java index cb6a656b..294df183 100644 --- a/accord-core/src/main/java/accord/primitives/LatestDeps.java +++ b/accord-core/src/main/java/accord/primitives/LatestDeps.java @@ -46,9 +46,11 @@ import accord.utils.UnhandledEnum; import static accord.messages.Accept.Kind.SLOW; import static accord.primitives.Known.KnownDeps.DepsCommitted; import static accord.primitives.Known.KnownDeps.DepsErased; +import static accord.primitives.Known.KnownDeps.DepsFromCoordinator; import static accord.primitives.Known.KnownDeps.DepsKnown; import static accord.primitives.Known.KnownDeps.DepsProposed; import static accord.primitives.Known.KnownDeps.DepsProposedFixed; +import static accord.primitives.Known.KnownDeps.DepsUnknown; import static accord.topology.Topologies.SelectNodeOwnership.SHARE; public class LatestDeps extends ReducingRangeMap<LatestDeps.LatestEntry> @@ -419,7 +421,17 @@ public class LatestDeps extends ReducingRangeMap<LatestDeps.LatestEntry> return result; } - Deps mergeProposal() + public Participants<?> notAccepted(Participants<?> participants) + { + for (int i = 0 ; i < values.length ; ++i) + { + if (values[i] != null && values[i].known != DepsUnknown && values[i].known != DepsFromCoordinator) + participants = participants.without(Ranges.of(starts[i].rangeFactory().newRange(starts[i], starts[i + 1]))); + } + return participants; + } + + public Deps mergeProposal() { return mergeProposal(null); } diff --git a/accord-core/src/main/java/accord/primitives/Routables.java b/accord-core/src/main/java/accord/primitives/Routables.java index 16db6efb..cf08d5fb 100644 --- a/accord-core/src/main/java/accord/primitives/Routables.java +++ b/accord-core/src/main/java/accord/primitives/Routables.java @@ -51,6 +51,7 @@ public interface Routables<K extends Routable> extends Iterable<K> } K get(int i); + int indexOf(K key); int size(); boolean isEmpty(); diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java b/accord-core/src/main/java/accord/primitives/Timestamp.java index 41a33693..c1e0ceb4 100644 --- a/accord-core/src/main/java/accord/primitives/Timestamp.java +++ b/accord-core/src/main/java/accord/primitives/Timestamp.java @@ -360,6 +360,9 @@ public class Timestamp implements Comparable<Timestamp>, EpochSupplier public boolean equals(Timestamp that) { + if (that == this) + return true; + return that != null && this.msb == that.msb && ((this.lsb ^ that.lsb) & IDENTITY_LSB) == 0 && this.node.equals(that.node); diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 39943296..72f12f53 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -1220,7 +1220,9 @@ public class TopologyManager Epochs snapshot = epochs; EpochState maxState = snapshot.get(maxEpoch); - Invariants.require(maxState != null, "Unable to find epoch %d; known epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch); + if (maxState == null) + throw new TopologyRetiredException(maxEpoch, snapshot.minEpoch()); + if (minEpoch == maxEpoch) return new Single(sorter, selectFunction.apply(snapshot.get(minEpoch).global, select, selectNodeOwnership)); diff --git a/accord-core/src/main/java/accord/utils/Functions.java b/accord-core/src/main/java/accord/utils/Functions.java index 58551c41..8e25d80e 100644 --- a/accord-core/src/main/java/accord/utils/Functions.java +++ b/accord-core/src/main/java/accord/utils/Functions.java @@ -18,29 +18,12 @@ package accord.utils; -import javax.annotation.Nonnull; import java.util.List; import java.util.function.BiFunction; import java.util.function.Function; public class Functions { - - public static <T> T reduceNonNull(BiFunction<T, T, T> merge, T a, T b) - { - return a == null ? b : b == null ? a : merge.apply(a, b); - } - - public static <T1, T2> T1 reduceNonNull(BiFunction<T1, T2, T1> merge, @Nonnull T1 a, T2 ... bs) - { - for (T2 b : bs) - { - if (b != null) - a = merge.apply(a, b); - } - return a; - } - public static <I, O> O mapReduceNonNull(Function<I, O> map, BiFunction<O, O, O> reduce, List<I> input) { O result = null; diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java index fe6cae97..1e2b7bab 100644 --- a/accord-core/src/main/java/accord/utils/Invariants.java +++ b/accord-core/src/main/java/accord/utils/Invariants.java @@ -285,6 +285,12 @@ public class Invariants throw illegalState(format(fmt, p1, p2, p3)); } + public static void require(boolean condition, String fmt, int p1, @Nullable Object p2, @Nullable Object p3) + { + if (!condition) + throw illegalState(format(fmt, p1, p2, p3)); + } + public static <P> void require(boolean condition, String fmt, @Nullable Object p1, @Nullable Object p2, @Nullable P p3, Function<? super P, Object> transformP3) { if (!condition) diff --git a/accord-core/src/main/java/accord/utils/LogGroupTimers.java b/accord-core/src/main/java/accord/utils/LogGroupTimers.java index 4d45479d..338ef27b 100644 --- a/accord-core/src/main/java/accord/utils/LogGroupTimers.java +++ b/accord-core/src/main/java/accord/utils/LogGroupTimers.java @@ -339,7 +339,7 @@ public class LogGroupTimers<T extends LogGroupTimers.Timer> { wakeAt = deadline; } - else if (prevDeadline == wakeAt && bucketsStart != bucketsEnd) + else if (prevDeadline == wakeAt && prevDeadline != Long.MAX_VALUE && bucketsStart != bucketsEnd) { Bucket<T> head = buckets[bucketsStart]; if (!head.isHeapified()) diff --git a/accord-core/src/main/java/accord/utils/PersistentField.java b/accord-core/src/main/java/accord/utils/PersistentField.java index 6face04d..117f8157 100644 --- a/accord-core/src/main/java/accord/utils/PersistentField.java +++ b/accord-core/src/main/java/accord/utils/PersistentField.java @@ -105,7 +105,7 @@ public class PersistentField<Input, Saved> pending.add(new Pending<>(id, newValue)); AsyncResult<?> pendingWrite = persister.persist(inputValue, newValue); - pendingWrite.addCallback((success, fail) -> { + pendingWrite.invoke((success, fail) -> { synchronized (this) { complete.add(id); diff --git a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java index d8950f94..d05fc1ec 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java @@ -49,8 +49,6 @@ public class AsyncCallbacks } public static <T> BiConsumer<T, Throwable> toCallback(Runnable runnable) { - return (unused, failure) -> { - if (failure == null) runnable.run(); - }; + return (s, f) -> runnable.run(); } } diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java b/accord-core/src/main/java/accord/utils/async/AsyncChain.java index 0899b268..b2f6070f 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java @@ -27,13 +27,8 @@ import java.util.function.Function; import javax.annotation.Nullable; -import com.google.common.util.concurrent.ListenableFuture; - public interface AsyncChain<V> { - /** - * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively - */ <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper); default <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper, Executor executor) @@ -41,9 +36,6 @@ public interface AsyncChain<V> return AsyncChains.map(this, mapper, executor); } - /** - * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively - */ <T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>> mapper); default <T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>> mapper, Executor executor) @@ -78,19 +70,19 @@ public interface AsyncChain<V> */ AsyncChain<V> recover(Function<? super Throwable, ? extends AsyncChain<V>> mapper); - default AsyncChain<Void> accept(Consumer<? super V> action) + default AsyncChain<V> invokeIfSuccess(Consumer<? super V> action) { return map(r -> { action.accept(r); - return null; + return r; }); } - default AsyncChain<Void> accept(Consumer<? super V> action, Executor executor) + default AsyncChain<V> invokeIfSuccess(Consumer<? super V> action, Executor executor) { return map(r -> { action.accept(r); - return null; + return r; }, executor); } @@ -101,27 +93,29 @@ public interface AsyncChain<V> return map(a -> a, e); } - /** - * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively - */ - AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback); + AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback); /** - * Adds a callback that only listens to the successful case, a failed chain will not trigger the callback + * Adds a callback that fires on success only */ - default AsyncChain<V> addCallback(Runnable runnable) + default AsyncChain<V> invokeIfSuccess(Runnable runnable) { - return addCallback(AsyncCallbacks.toCallback(runnable)); + return invoke((success, fail) -> { + if (fail == null) runnable.run(); + }); } - default AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback, ExecutorService es) + /** + * Adds a callback that fires on either success or failure + */ + default AsyncChain<V> invoke(Runnable runnable) { - return addCallback(AsyncCallbacks.inExecutorService(callback, es)); + return invoke((success, fail) -> runnable.run()); } - default AsyncChain<V> addCallback(Runnable runnable, ExecutorService es) + default AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback, ExecutorService es) { - return addCallback(AsyncCallbacks.inExecutorService(runnable, es)); + return invoke(AsyncCallbacks.inExecutorService(callback, es)); } /** diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java index bc5256b6..060e4f7c 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java @@ -19,6 +19,7 @@ package accord.utils.async; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; @@ -39,7 +40,6 @@ import java.util.function.Function; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,7 +129,7 @@ public abstract class AsyncChains<V> implements AsyncChain<V> } @Override - public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback) + public AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback) { if (value == null || value.getClass() != FailureHolder.class) callback.accept((V) value, null); @@ -141,7 +141,7 @@ public abstract class AsyncChains<V> implements AsyncChain<V> @Override public Cancellable begin(BiConsumer<? super V, Throwable> callback) { - addCallback(callback); + invoke(callback); return null; } } @@ -454,7 +454,7 @@ public abstract class AsyncChains<V> implements AsyncChain<V> } @Override - public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback) + public AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback) { return add(EncapsulatedCallback::new, callback); } @@ -655,28 +655,45 @@ public abstract class AsyncChains<V> implements AsyncChain<V> }); } - public static <V> AsyncChain<V[]> allOf(List<? extends AsyncChain<? extends V>> chains) + public static <V> AsyncChain<List<V>> allOf(List<? extends AsyncChain<? extends V>> chains) { - return new AsyncChainCombiner<>(chains); + return allOfInternal(chains).map(Arrays::asList); } - public static <V> AsyncChain<List<V>> all(List<? extends AsyncChain<? extends V>> chains) + // cannot expose this as we're actually providing an Object[] to the next in the chain + // which is not safe if receiver statically expecting the strongly typed array + private static <V> AsyncChain<V[]> allOfInternal(List<? extends AsyncChain<? extends V>> chains) { - return new AsyncChainCombiner<>(chains).map(Lists::newArrayList); + return new AsyncChainCombiner<>(chains); } public static <V> AsyncChain<V> reduce(List<? extends AsyncChain<? extends V>> chains, BiFunction<? super V, ? super V, ? extends V> reducer) { if (chains.size() == 1) return (AsyncChain<V>) chains.get(0); - return allOf(chains).map(r -> reduceArray(r, reducer)); + return allOfInternal(chains).map(r -> reduceArray(r, reducer)); + } + + public static <I, O> AsyncChain<O> reduce(List<? extends AsyncChain<? extends I>> chains, BiFunction<? super I, ? super O, ? extends O> reducer, O identity) + { + if (chains.size() == 1) + return chains.get(0).map(i -> reducer.apply(i, identity)); + return allOfInternal(chains).map(r -> reduceArray(r, reducer, identity)); } - private static <V> V reduceArray(V[] results, BiFunction<? super V, ? super V, ? extends V> reducer) + private static <V> V reduceArray(Object[] results, BiFunction<? super V, ? super V, ? extends V> reducer) { - V result = results[0]; + V result = (V) results[0]; for (int i=1; i< results.length; i++) - result = reducer.apply(result, results[i]); + result = reducer.apply(result, (V)results[i]); + return result; + } + + private static <I, O> O reduceArray(Object[] results, BiFunction<? super I, ? super O, ? extends O> reducer, O identity) + { + O result = identity; + for (int i=0; i< results.length; i++) + result = reducer.apply((I)results[i], identity); return result; } @@ -710,7 +727,7 @@ public abstract class AsyncChains<V> implements AsyncChain<V> case 0: return AsyncChains.success(identity); case 1: return chains.get(0).map(a -> reducer.apply(identity, a)); } - return allOf(chains).map(results -> { + return allOfInternal(chains).map(results -> { B result = identity; for (A r : results) result = reducer.apply(result, r); diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResult.java b/accord-core/src/main/java/accord/utils/async/AsyncResult.java index 30fd524f..fae3bc9c 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncResult.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncResult.java @@ -31,13 +31,7 @@ import static accord.utils.Invariants.illegalState; public interface AsyncResult<V> extends AsyncChain<V> { @Override - AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> callback); - - @Override - default AsyncResult<V> addCallback(Runnable runnable) - { - return addCallback(AsyncCallbacks.toCallback(runnable)); - } + AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback); boolean isDone(); boolean isSuccess(); @@ -46,7 +40,7 @@ public interface AsyncResult<V> extends AsyncChain<V> default @Nullable Cancellable begin(BiConsumer<? super V, Throwable> callback) { //TODO chain shouldn't allow double calling, but should result allow? - addCallback(callback); + invoke(callback); return null; } diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java index 01632db4..6c954376 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java @@ -136,7 +136,7 @@ public class AsyncResults @Override protected Cancellable start(BiConsumer<? super V, Throwable> callback) { - AbstractResult.this.addCallback(callback); + AbstractResult.this.invoke(callback); return null; } }; @@ -172,7 +172,7 @@ public class AsyncResults } @Override - public AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> callback) + public AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback) { Listener<V> listener = null; while (true) @@ -290,7 +290,7 @@ public class AsyncResults @Override protected Cancellable start(BiConsumer<? super V, Throwable> callback) { - AsyncResults.Immediate.this.addCallback(callback); + AsyncResults.Immediate.this.invoke(callback); return null; } }; @@ -315,7 +315,7 @@ public class AsyncResults } @Override - public AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> callback) + public AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback) { callback.accept(value, failure); return this; diff --git a/accord-core/src/main/java/accord/utils/btree/BTreeSet.java b/accord-core/src/main/java/accord/utils/btree/BTreeSet.java index 1122da86..1d7dd15b 100644 --- a/accord-core/src/main/java/accord/utils/btree/BTreeSet.java +++ b/accord-core/src/main/java/accord/utils/btree/BTreeSet.java @@ -31,8 +31,6 @@ import java.util.SortedSet; import java.util.Spliterator; import java.util.Spliterators; -import com.google.common.collect.Ordering; - import static accord.utils.btree.BTree.Dir; import static accord.utils.btree.BTree.findIndex; @@ -76,7 +74,7 @@ public class BTreeSet<V> extends AbstractSet<V> implements NavigableSet<V>, List */ public V get(int index) { - return BTree.<V>findByIndex(tree, index); + return BTree.findByIndex(tree, index); } public int lastIndexOf(Object o) @@ -658,7 +656,7 @@ public class BTreeSet<V> extends AbstractSet<V> implements NavigableSet<V>, List public static <V extends Comparable<V>> BTreeSet<V> of(V value) { - return new BTreeSet<>(BTree.singleton(value), Ordering.<V>natural()); + return new BTreeSet<>(BTree.singleton(value), Comparator.naturalOrder()); } public static <V> BTreeSet<V> empty(Comparator<? super V> comparator) diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 6a6a8f9e..54a8314a 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -915,7 +915,7 @@ public class Cluster Command afterCommand = e.getValue().value(); if (beforeCommand == null) { - Invariants.requireArgument(afterCommand.is(Status.NotDefined) || afterCommand.saveStatus() == SaveStatus.Vestigial); + Invariants.requireArgument(afterCommand.is(Status.NotDefined) || afterCommand.saveStatus().compareTo(SaveStatus.Vestigial) >= 0); continue; } if (afterCommand.hasBeen(Status.Truncated)) diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 975cc727..b02f0744 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -360,8 +360,8 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread if (next == null) return; - next.addCallback(agent()); // used to track unexpected exceptions and notify simulations - next.addCallback(this::afterExecution); + next.invoke(agent()); // used to track unexpected exceptions and notify simulations + next.invoke(this::afterExecution); executor.execute(next); } diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index 32303e91..2b4edb8c 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -20,6 +20,8 @@ package accord.impl.basic; import java.util.AbstractList; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.EnumMap; import java.util.Iterator; import java.util.List; @@ -33,7 +35,6 @@ import com.google.common.collect.ImmutableSortedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.api.Agent; import accord.api.Journal; import accord.api.Result; import accord.impl.CommandChange; @@ -110,35 +111,34 @@ import static accord.utils.Invariants.illegalState; public class InMemoryJournal implements Journal { private static final Logger log = LoggerFactory.getLogger(InMemoryJournal.class); - private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> diffsPerCommandStore = new Int2ObjectHashMap<>(); + private final Int2ObjectHashMap<NavigableMap<TxnId, Diffs>> diffsPerCommandStore = new Int2ObjectHashMap<>(); private final List<TopologyUpdate> topologyUpdates = new ArrayList<>(); private final Int2ObjectHashMap<FieldUpdates> fieldStates = new Int2ObjectHashMap<>(); private Node node; - private Agent agent; private final RandomSource random; + private final float partialCompactionChance; public InMemoryJournal(Node.Id id, RandomSource random) { this.random = random; + this.partialCompactionChance = 1f - (random.nextFloat()/2); } - public Journal start(Node node) + public void start(Node node) { this.node = node; - this.agent = node.agent(); - return this; } @Override public Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { - NavigableMap<TxnId, List<Diff>> commandStore = this.diffsPerCommandStore.get(commandStoreId); + NavigableMap<TxnId, Diffs> commandStore = this.diffsPerCommandStore.get(commandStoreId); if (commandStore == null) return null; - List<Diff> saved = this.diffsPerCommandStore.get(commandStoreId).get(txnId); + Diffs saved = this.diffsPerCommandStore.get(commandStoreId).get(txnId); if (saved == null) return null; @@ -174,7 +174,7 @@ public class InMemoryJournal implements Journal private Builder reconstruct(int commandStoreId, TxnId txnId, Load load) { - NavigableMap<TxnId, List<Diff>> commandStore = this.diffsPerCommandStore.get(commandStoreId); + NavigableMap<TxnId, Diffs> commandStore = this.diffsPerCommandStore.get(commandStoreId); if (commandStore == null) return null; @@ -182,12 +182,13 @@ public class InMemoryJournal implements Journal return reconstruct(this.diffsPerCommandStore.get(commandStoreId).get(txnId), load); } - private Builder reconstruct(List<Diff> saved, Load load) + private Builder reconstruct(Diffs files, Load load) { - if (saved == null) + if (files == null) return null; Builder builder = null; + List<Diff> saved = files.sorted(false); for (int i = saved.size() - 1; i >= 0; i--) { Diff diff = saved.get(i); @@ -212,8 +213,8 @@ public class InMemoryJournal implements Journal } diffsPerCommandStore.computeIfAbsent(commandStoreId, (k) -> new TreeMap<>()) - .computeIfAbsent(update.txnId, (k_) -> new ArrayList<>()) - .add(diff); + .computeIfAbsent(update.txnId, (k_) -> new Diffs()) + .addFlushed(diff); if (onFlush!= null) onFlush.run(); @@ -323,50 +324,189 @@ public class InMemoryJournal implements Journal onFlush.run(); } + static class DiffFile extends ArrayList<Diff> + { + DiffFile(){} + DiffFile(List<Diff> diffs) + { + for (Diff diff : diffs) + { + if (diff != null) + add(diff); + } + } + } + + static class Diffs + { + final boolean subset; + final List<DiffFile> files; + final List<Diff> flushed; + int nextId; + + int size; + List<Diff> sorted; + + Diffs() + { + this.subset = false; + this.files = new ArrayList<>(); + this.flushed = new ArrayList<>(); + } + + Diffs(PurgedList purged) + { + this.subset = false; + this.files = Collections.emptyList(); + this.flushed = purged; + } + + Diffs(TruncatedList truncated) + { + this.subset = false; + this.files = new ArrayList<>(); + this.flushed = truncated; + this.size = 1; + } + + Diffs(ErasedList erased) + { + this.subset = false; + this.files = Collections.emptyList(); + this.flushed = erased; + this.size = 1; + } + + Diffs(boolean subset, List<DiffFile> files, List<Diff> flushed) + { + this.subset = subset; + this.files = files; + this.flushed = flushed; + this.size = flushed.size(); + for (DiffFile file : files) + size += file.size(); + } + + void addFlushed(Diff diff) + { + diff.rowId = ++nextId; + flushed.add(diff); + if (sorted != null && sorted != flushed) + sorted.add(diff); + ++size; + } + + List<Diff> sorted(boolean copy) + { + if (sorted != null) + { + Invariants.require(sorted.size() == size); + return copy ? new ArrayList<>(sorted) : sorted; + } + + if (!subset) + { + if (files.isEmpty()) + return copy ? new ArrayList<>(flushed) : flushed; + + if (flushed.isEmpty() && files.size() == 1) + return copy ? new ArrayList<>(files.get(0)) : files.get(0); + } + + List<Diff> sorted = new ArrayList<>(size); + for (Diff diff : flushed) + { + if (diff != null) + sorted.add(diff); + } + for (DiffFile file : this.files) + { + if (file != null) + sorted.addAll(file); + } + Invariants.require(sorted.size() == size); + sorted.sort(Comparator.comparingInt(d -> d.rowId)); + if (!copy) + this.sorted = sorted; + return sorted; + } + + void removeAll(Diffs diffs) + { + files.removeAll(diffs.files); + flushed.removeAll(diffs.flushed); + size -= diffs.size; + sorted = null; + } + + boolean isEmpty() + { + return size == 0; + } + } + static int counter = 0; @Override public void purge(CommandStores commandStores, EpochSupplier minEpoch) { truncateTopologiesForTesting(minEpoch.epoch()); - boolean isPartialCompaction = random.nextBoolean(); - for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> e : diffsPerCommandStore.entrySet()) + boolean isPartialCompaction = random.decide(0.9f); + for (Map.Entry<Integer, NavigableMap<TxnId, Diffs>> e : diffsPerCommandStore.entrySet()) { int commandStoreId = e.getKey(); - Map<TxnId, List<Diff>> localJournal = e.getValue(); + Map<TxnId, Diffs> localJournal = e.getValue(); CommandStore store = commandStores.forId(commandStoreId); if (store == null) continue; - for (Map.Entry<TxnId, List<Diff>> e2 : localJournal.entrySet()) + for (Map.Entry<TxnId, Diffs> e2 : localJournal.entrySet()) { - List<Diff> diffs = e2.getValue(); - + Diffs diffs = e2.getValue(); if (diffs.isEmpty()) continue; - List<Diff> subset = diffs; - if (diffs.size() > 1 && isPartialCompaction) + + Diffs subset = diffs; { - int removeCount = 1 + random.nextInt(diffs.size() - 1); - int count = diffs.size(); - subset = new ArrayList<>(diffs); - while (removeCount-- > 0) + int filesAndFlushed = subset.flushed.size() + subset.files.size(); + if (filesAndFlushed > 1 && isPartialCompaction) { - int removeIndex = random.nextInt(diffs.size()); - if (subset.get(removeIndex) == null) + int removeCount = 1 + random.nextInt(filesAndFlushed - 1); + int count = filesAndFlushed; + subset = new Diffs(true, new ArrayList<>(diffs.files), new ArrayList<>(diffs.flushed)); + List<DiffFile> files = subset.files; + List<Diff> flushed = subset.flushed; + while (removeCount-- > 0) + { + int removeIndex = random.nextInt(filesAndFlushed); + if (removeIndex < flushed.size()) + { + if (flushed.get(removeIndex) == null) + continue; + --subset.size; + flushed.set(removeIndex, null); + } + else + { + removeIndex -= flushed.size(); + if (files.get(removeIndex) == null) + continue; + subset.size -= files.get(removeIndex).size(); + files.set(removeIndex, null); + } + --count; + } + + if (count == 0) continue; - subset.set(removeIndex, null); - --count; } - - if (count == 0) - continue; } - Builder[] builders = new Builder[diffs.size()]; - for (int i = 0 ; i < subset.size() ; ++i) + Builder[] builders = new Builder[subset.size]; + List<Diff> sorted = subset.sorted(true); + for (int i = 0 ; i < sorted.size() ; ++i) { - if (subset.get(i) == null) continue; + if (sorted.get(i) == null) continue; Builder builder = new Builder(e2.getKey(), ALL); - builder.apply(subset.get(i)); + builder.apply(sorted.get(i)); builders[i] = builder; } @@ -388,7 +528,8 @@ public class InMemoryJournal implements Journal { if (cleanup == EXPUNGE) { - if (input == FULL || subset == diffs) e2.setValue(new PurgedList()); + if (input == FULL) e2.setValue(new Diffs(new PurgedList())); + else if (subset == diffs) e2.setValue(new Diffs()); else diffs.removeAll(subset); continue; } @@ -411,20 +552,39 @@ public class InMemoryJournal implements Journal else { Diff diff = builder.toDiff(); - e2.setValue(cleanup == ERASE ? new ErasedList(diff) : new TruncatedList(diff)); + e2.setValue(cleanup == ERASE ? new Diffs(new ErasedList(diff)) : new Diffs(new TruncatedList(diff))); continue; } } } + if (diffs.flushed instanceof FinalList) + continue; + + int removeCount = 0; for (int i = 0 ; i < builders.length ; ++i) { if (builders[i] != null) { Diff diff = builders[i].toDiff(); - diffs.set(i, diff.flags == 0 ? null : diff); + if (diff.flags == 0) + { + ++removeCount; + sorted.set(i, null); + } + else + { + diff.rowId = sorted.get(i).rowId; + sorted.set(i, diff); + } } } + + diffs.size -= removeCount; + diffs.flushed.removeAll(subset.flushed); + diffs.files.removeAll(subset.files); + diffs.files.add(new DiffFile(sorted)); + diffs.sorted = null; } } } @@ -432,18 +592,18 @@ public class InMemoryJournal implements Journal @Override public void replay(CommandStores commandStores) { - for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> diffEntry : diffsPerCommandStore.entrySet()) + for (Map.Entry<Integer, NavigableMap<TxnId, Diffs>> diffEntry : diffsPerCommandStore.entrySet()) { int commandStoreId = diffEntry.getKey(); // copy to avoid concurrent modification when appending to journal - Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffEntry.getValue()); + Map<TxnId, List<Diff>> diffs = new TreeMap<>(); InMemoryCommandStore commandStore = (InMemoryCommandStore) commandStores.forId(commandStoreId); Loader loader = commandStore.loader(); - for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet()) - e.setValue(new ArrayList<>(e.getValue())); + for (Map.Entry<TxnId, Diffs> e : diffEntry.getValue().entrySet()) + diffs.put(e.getKey(), e.getValue().sorted(true)); for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet()) { @@ -455,7 +615,20 @@ public class InMemoryJournal implements Journal } } - private static class ErasedList extends AbstractList<Diff> + static class TruncatedList extends ArrayList<Diff> + { + TruncatedList(Diff truncated) + { + add(truncated); + } + } + + private static abstract class FinalList extends AbstractList<Diff> + { + + } + + private static class ErasedList extends FinalList { private Diff erased; @@ -500,15 +673,7 @@ public class InMemoryJournal implements Journal } } - static class TruncatedList extends ArrayList<Diff> - { - TruncatedList(Diff truncated) - { - add(truncated); - } - } - - private static class PurgedList extends AbstractList<Diff> + private static class PurgedList extends FinalList { @Override public Diff get(int index) @@ -558,6 +723,7 @@ public class InMemoryJournal implements Journal public final TxnId txnId; public final Map<Field, Object> changes; public final int flags; + private int rowId; private Diff(TxnId txnId, int flags, Map<Field, Object> changes) { diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java index 3df406ec..32a94e5c 100644 --- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java @@ -76,9 +76,9 @@ public class LoggingJournal implements Journal @Override - public Journal start(Node node) + public void start(Node node) { - return this; + delegate.start(node); } @Override diff --git a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java index 53b78354..0f2452bd 100644 --- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java +++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java @@ -66,7 +66,7 @@ public class ListFetchCoordinator extends AbstractFetchCoordinator ListData listData = (ListData) data; persisting.add(commandStore.build(PreLoadContext.empty(), safeStore -> { listData.forEach((key, value) -> listStore.writeUnsafe(key, value)); - }).flatMap(ignore -> listStore.snapshot(true, received, syncPoint.syncId)).addCallback((success, fail) -> { + }).flatMap(ignore -> listStore.snapshot(true, received, syncPoint.syncId)).invoke((success, fail) -> { if (fail == null) success(from, received); else fail(from, received, fail); }).beginAsResult()); diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java index ddc06379..dc95693d 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRequest.java +++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java @@ -300,7 +300,7 @@ public class ListRequest implements Request txn = gen.apply(node); id = txnIdGen.apply(node, txn); listener.onClientAction(MessageListener.ClientAction.SUBMIT, node.id(), id, txn); - node.coordinate(id, txn).addCallback(new ResultCallback(node, client, replyContext, listener, id, txn)); + node.coordinate(id, txn).invoke(new ResultCallback(node, client, replyContext, listener, id, txn)); } @Override diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java b/accord-core/src/test/java/accord/local/CommandsTest.java index f10b9e7f..eab240a7 100644 --- a/accord-core/src/test/java/accord/local/CommandsTest.java +++ b/accord-core/src/test/java/accord/local/CommandsTest.java @@ -101,7 +101,7 @@ class CommandsTest for (Node n : nodeMap.values()) ((TestableConfigurationService) n.configService()).reportTopology(updatedTopology); - node.coordinate(txnId, txn).addCallback((success, failure) -> { + node.coordinate(txnId, txn).invoke((success, failure) -> { if (failure == null) { node.agent().onUncaughtException(new AssertionError("Expected TopologyMismatch exception, but txn was success")); diff --git a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java index 8f429a3e..f7893e5a 100644 --- a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java +++ b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java @@ -90,7 +90,7 @@ public class AsyncChainsTest ResultCallback<Integer> intermediateCallback = new ResultCallback<>(); AsyncChain<Integer> chain = AsyncChains.ofCallable(MoreExecutors.directExecutor(), () -> 5); - chain = chain.addCallback(intermediateCallback); + chain = chain.invoke(intermediateCallback); chain = chain.map(i -> i + 2); chain.begin(finalCallback); @@ -129,10 +129,10 @@ public class AsyncChainsTest AsyncChain<Integer> chain1 = AsyncChains.success(1); AsyncChain<Integer> chain2 = AsyncChains.success(2); AsyncChain<Integer> chain3 = AsyncChains.success(3); - AsyncChain<List<Integer>> reduced = AsyncChains.all(Lists.newArrayList(chain1, chain2, chain3)); + AsyncChain<List<Integer>> reduced = AsyncChains.allOf(Lists.newArrayList(chain1, chain2, chain3)); ResultCallback<List<Integer>> callback = new ResultCallback<>(); reduced.begin(callback); - Assertions.assertEquals(Lists.newArrayList(1, 2, 3), callback.value()); + Assertions.assertEquals(Arrays.asList(1, 2, 3), callback.value()); } @Test @@ -164,7 +164,7 @@ public class AsyncChainsTest }, () -> {}) .map(ignore -> 1) .beginAsResult() - .addCallback((success, failure) -> { + .invoke((success, failure) -> { if (failure == null) throw illegalState("Expected to fail"); }); @@ -243,7 +243,7 @@ public class AsyncChainsTest AtomicBoolean sawCallback = new AtomicBoolean(false); AsyncChains.failure(new NullPointerException("just kidding")) .beginAsResult() - .addCallback(() -> sawCallback.set(true)) + .invokeIfSuccess(() -> sawCallback.set(true)) .begin((success, failure) -> { if (failure != null) sawFailure.set(true); else sawFailure.set(false); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index ffc0d3ef..8ab5ed99 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -388,7 +388,7 @@ public class Cluster implements Scheduler public static class NoOpJournal implements Journal { - @Override public Journal start(Node node) { return null; } + @Override public void start(Node node) { } @Override public Command loadCommand(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } @Override public Command.Minimal loadMinimal(int store, TxnId txnId, Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } @Override public void saveCommand(int store, CommandUpdate value, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java index f7987140..01621f72 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java @@ -51,7 +51,7 @@ public class MaelstromRequest extends Body implements Request @Override public void process(Node node, Id client, ReplyContext replyContext) { - node.coordinate(txn).addCallback((success, fail) -> { + node.coordinate(txn).invoke((success, fail) -> { Reply reply = success != null ? new MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), (MaelstromResult) success) : null; node.reply(client, replyContext, reply, fail); }); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org