This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 5e115ae37d390622c1633e059e69b404899ea955 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Tue Jun 10 17:25:31 2025 +0100 Fix: - Do not query local topology when deciding what keys to fetch to avoid TopologyRetiredException - Ensure we propagate information back to the requesting CommandStore by using the store id to ensure it is included - BurnTest topology fetching was broken by earlier patch - Topology callbacks were not being invoked as we were not calling .begin() - Topology mismatch failure during notAccept phase was not being reported due to CoordinatePreAccept already having isDone==true patch by Benedict; reviewed by David Capwell for CASSANDRA-20711 --- .../main/java/accord/coordinate/CheckShards.java | 16 +- .../accord/coordinate/CoordinatePreAccept.java | 2 +- .../src/main/java/accord/coordinate/FetchData.java | 224 ++------------- .../java/accord/coordinate/FetchSomeRoute.java | 151 ++++++++++ .../src/main/java/accord/coordinate/FindRoute.java | 81 ------ .../main/java/accord/coordinate/FindSomeRoute.java | 85 ------ .../src/main/java/accord/coordinate/Infer.java | 29 +- .../main/java/accord/coordinate/Invalidate.java | 23 +- .../main/java/accord/coordinate/MaybeRecover.java | 23 +- .../src/main/java/accord/coordinate/Recover.java | 39 ++- .../java/accord/coordinate/RecoverWithRoute.java | 69 +++-- .../accord/impl/AbstractConfigurationService.java | 22 +- .../java/accord/impl/progresslog/HomeState.java | 10 +- .../java/accord/impl/progresslog/WaitingState.java | 46 +-- .../src/main/java/accord/local/CommandStores.java | 314 +++++++++++++++------ accord-core/src/main/java/accord/local/Node.java | 15 +- .../main/java/accord/local/StoreParticipants.java | 4 +- .../accord/local/durability/DurabilityQueue.java | 2 +- .../src/main/java/accord/messages/CheckStatus.java | 8 +- .../src/main/java/accord/messages/Propagate.java | 42 ++- .../main/java/accord/utils/async/AsyncResult.java | 2 + .../accord/burn/BurnTestConfigurationService.java | 10 +- .../impl/AbstractConfigurationServiceTest.java | 11 +- .../accord/impl/basic/DelayedCommandStores.java | 10 +- 24 files changed, 607 insertions(+), 631 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java b/accord-core/src/main/java/accord/coordinate/CheckShards.java index 888e3305..6dea99dd 100644 --- a/accord-core/src/main/java/accord/coordinate/CheckShards.java +++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java @@ -39,7 +39,7 @@ import static accord.utils.Invariants.illegalState; */ public abstract class CheckShards<U extends Participants<?>> extends ReadCoordinator<CheckStatusReply> { - final U route; + final U query; /** * The epoch we want to fetch data from remotely @@ -54,17 +54,17 @@ public abstract class CheckShards<U extends Participants<?>> extends ReadCoordin protected boolean truncated; // srcEpoch is either txnId.epoch() or executeAt.epoch() - protected CheckShards(Node node, TxnId txnId, U route, IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf) + protected CheckShards(Node node, TxnId txnId, U query, IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf) { - this(node, txnId, route, txnId.epoch(), includeInfo, bumpBallot, previouslyKnownToBeInvalidIf); + this(node, txnId, query, txnId.epoch(), includeInfo, bumpBallot, previouslyKnownToBeInvalidIf); Invariants.require(txnId.isVisible()); } - protected CheckShards(Node node, TxnId txnId, U route, long srcEpoch, IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf) + protected CheckShards(Node node, TxnId txnId, U query, long srcEpoch, IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf) { - super(node, topologyFor(node, txnId, route, srcEpoch), txnId); + super(node, topologyFor(node, txnId, query, srcEpoch), txnId); this.sourceEpoch = srcEpoch; - this.route = route; + this.query = query; this.includeInfo = includeInfo; this.bumpBallot = bumpBallot; this.previouslyKnownToBeInvalidIf = previouslyKnownToBeInvalidIf; @@ -79,7 +79,7 @@ public abstract class CheckShards<U extends Participants<?>> extends ReadCoordin @Override protected void contact(Id id) { - Participants<?> unseekables = route.slice(topologies().computeRangesForNode(id)); + Participants<?> unseekables = query.slice(topologies().computeRangesForNode(id)); node.send(id, new CheckStatus(txnId, unseekables, sourceEpoch, includeInfo, bumpBallot), this); } @@ -119,7 +119,7 @@ public abstract class CheckShards<U extends Participants<?>> extends ReadCoordin @Override protected void finishOnExhaustion() { - if (merged != null && merged.map.hasFullyTruncated(route)) finishOnFailure(new Truncated(txnId, null), false); + if (merged != null && merged.map.hasFullyTruncated(query)) finishOnFailure(new Truncated(txnId, null), false); else super.finishOnExhaustion(); } } diff --git a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java index 5649a402..40c17630 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java @@ -142,7 +142,7 @@ abstract class CoordinatePreAccept<T> extends AbstractCoordinatePreAccept<T, Pre proposeInvalidate(node, node.uniqueTimestamp(Ballot::fromValues), txnId, route.homeKey(), (outcome, failure) -> { if (failure != null) mismatch.addSuppressed(failure); - setFailure(mismatch); + callback.accept(null, mismatch); }); } diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java b/accord-core/src/main/java/accord/coordinate/FetchData.java index b7a7c0ef..38b5c878 100644 --- a/accord-core/src/main/java/accord/coordinate/FetchData.java +++ b/accord-core/src/main/java/accord/coordinate/FetchData.java @@ -21,30 +21,24 @@ import java.util.function.BiConsumer; import javax.annotation.Nullable; import accord.coordinate.Infer.InvalidIf; +import accord.local.CommandStores.LatentStoreSelector; +import accord.local.CommandStores.StoreSelector; import accord.local.Node; -import accord.primitives.Status; import accord.primitives.Known; import accord.messages.CheckStatus; import accord.messages.CheckStatus.CheckStatusOkFull; import accord.messages.Propagate; -import accord.primitives.FullRoute; import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.Route; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Unseekables; -import accord.topology.TopologyManager; -import accord.topology.TopologyManager.TopologyRetiredException; import accord.utils.Invariants; import javax.annotation.Nonnull; import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid; -import static accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback; -import static accord.primitives.Route.castToRoute; -import static accord.primitives.Route.isFullRoute; -import static accord.primitives.Route.isRoute; /** * Find data and persist locally @@ -75,16 +69,13 @@ public class FetchData extends CheckShards<Route<?>> final InvalidIf invalidIf; final @Nullable Timestamp executeAt; final long srcEpoch; - final Participants<?> fetchKeys; - final long lowEpoch, highEpoch; + // known participants, a subset of which we may fetch from + final Participants<?> contactable; + final Participants<?> requestedFor; + final StoreSelector reportTo; final BiConsumer<? super FetchResult, Throwable> callback; - public FetchRequest(Known fetch, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Participants<?> fetchKeys, BiConsumer<? super FetchResult, Throwable> callback) - { - this(fetch, txnId, invalidIf, executeAt, fetchKeys, Long.MIN_VALUE, Long.MIN_VALUE, callback); - } - - public FetchRequest(Known fetch, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Participants<?> fetchKeys, long lowEpoch, long highEpoch, BiConsumer<? super FetchResult, Throwable> callback) + public FetchRequest(Known fetch, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Participants<?> contactable, Participants<?> requestedFor, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { this.fetch = fetch; this.invalidIf = invalidIf; @@ -92,88 +83,32 @@ public class FetchData extends CheckShards<Route<?>> this.executeAt = executeAt; this.callback = callback; this.srcEpoch = fetch.fetchEpoch(txnId, executeAt); - this.fetchKeys = fetchKeys; - this.lowEpoch = lowEpoch == Long.MIN_VALUE ? txnId.epoch() : lowEpoch; - this.highEpoch = highEpoch == Long.MIN_VALUE ? srcEpoch : highEpoch; - } - - Ranges localRanges(Node node) - { - try - { - return node.topology().localRangesForEpochs(lowEpoch, highEpoch); - } - catch (TopologyRetiredException t) - { - throw new TopologyRetiredException("Failed to read local ranges for " + txnId + " between " + lowEpoch + " and " + highEpoch, t); - } - } - } - - public static void fetch(Known fetch, Node node, TxnId txnId, @Nullable Timestamp executeAt, Participants<?> someKeys, long localLowEpoch, long localHighEpoch, BiConsumer<? super FetchResult, Throwable> callback) - { - fetch(fetch, node, txnId, NotKnownToBeInvalid, executeAt, someKeys, localLowEpoch, localHighEpoch, callback); - } - - public static void fetch(Known fetch, Node node, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Participants<?> fetchKeys, long localLowEpoch, long localHighEpoch, BiConsumer<? super FetchResult, Throwable> callback) - { - fetch(node, new FetchRequest(fetch, txnId, invalidIf, executeAt, fetchKeys, localLowEpoch, localHighEpoch, callback)); - } - - public static void fetch(Known fetch, Node node, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Participants<?> fetchKeys, BiConsumer<? super FetchResult, Throwable> callback) - { - fetch(node, new FetchRequest(fetch, txnId, invalidIf, executeAt, fetchKeys, callback)); - } - - public static void fetch(Node node, FetchRequest request) - { - Participants<?> fetchKeys = request.fetchKeys; - if (fetchKeys.kind().isRoute()) fetch(node, castToRoute(fetchKeys), request); - else fetchViaSomeRoute(node, fetchKeys, request); - } - - public static void fetch(Node node, Route<?> route, FetchRequest request) - { - long srcEpoch = request.srcEpoch; - if (!node.topology().hasEpoch(srcEpoch)) - { - node.withEpochAtLeast(srcEpoch, request.callback, () -> fetch(node, route, request)); - return; - } - - Invariants.requireArgument(node.topology().hasEpoch(srcEpoch), "Unknown epoch %d, latest known is %d", srcEpoch, node.epoch()); - Ranges ranges = request.localRanges(node); - if (!Route.isFullRoute(route)) - { - fetchWithIncompleteRoute(node, route, request); - } - else - { - Route<?> slicedRoute = route.slice(ranges); - fetchInternal(node, ranges, slicedRoute, request); + this.contactable = contactable; + this.requestedFor = requestedFor; + this.reportTo = reportTo; } } /** * Do not make an attempt to discern what keys need to be contacted; fetch from only the specific remote keys that were requested. */ - public static void fetchSpecific(Known fetch, Node node, TxnId txnId, @Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, Participants<?> localKeys, long lowEpoch, long highEpoch, BiConsumer<? super FetchResult, Throwable> callback) + public static void fetchSpecific(Known fetch, Node node, TxnId txnId, @Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, Participants<?> requestedFor, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { - fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid, executeAt, query, maxRoute, localKeys, lowEpoch, highEpoch, callback); + fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid, executeAt, query, maxRoute, requestedFor, reportTo, callback); } /** * Do not make an attempt to discern what keys need to be contacted; fetch from only the specific remote keys that were requested. */ - public static void fetchSpecific(Known fetch, Node node, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, Participants<?> localKeys, long lowEpoch, long highEpoch, BiConsumer<? super FetchResult, Throwable> callback) + public static void fetchSpecific(Known fetch, Node node, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, Participants<?> requestedFor, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { - fetchSpecific(node, query, maxRoute, new FetchRequest(fetch, txnId, invalidIf, executeAt, localKeys, lowEpoch, highEpoch, callback)); + fetchSpecific(node, query, maxRoute, new FetchRequest(fetch, txnId, invalidIf, executeAt, maxRoute, requestedFor, reportTo, callback)); } public static void fetchSpecific(Node node, Route<?> query, Route<?> maxRoute, FetchRequest request) { long srcEpoch = request.srcEpoch; - if (!node.topology().hasEpoch(srcEpoch)) + if (!node.topology().hasAtLeastEpoch(srcEpoch)) { node.withEpochAtLeast(srcEpoch, request.callback, () -> fetchSpecific(node, query, maxRoute, request)); return; @@ -182,166 +117,67 @@ public class FetchData extends CheckShards<Route<?>> fetchData(node, query, maxRoute, request); } - @SuppressWarnings({"unchecked", "rawtypes"}) - private static void fetchViaSomeRoute(Node node, Participants<?> someUnseekables, FetchRequest request) - { - FindSomeRoute.findSomeRoute(node, request.txnId, request.invalidIf, someUnseekables, (foundRoute, fail) -> { - if (fail != null) request.callback.accept(null, fail); - else if (foundRoute.route == null) - { - reportRouteNotFound(node, foundRoute.known, request); - } - else if (isFullRoute(foundRoute.route)) - { - fetch(node, Route.castToFullRoute(foundRoute.route), request); - } - else if (isRoute(someUnseekables) && someUnseekables.containsAll(foundRoute.route)) - { - // this is essentially a reentrancy check; we can only reach this point if we have already tried once to fetchSomeRoute - // (as a user-provided Route is used to fetchRoute, not fetchSomeRoute) - reportRouteNotFound(node, foundRoute.known, request); - } - else - { - Route<?> route = foundRoute.route; - if (isRoute(someUnseekables)) - route = Route.merge(route, (Route)someUnseekables); - fetch(node, route, request); - } - }); - } - - private static void reportRouteNotFound(Node node, Known found, FetchRequest req) - { - Invariants.require(req.executeAt == null); - TxnId txnId = req.txnId; - switch (found.outcome()) - { - default: throw new AssertionError("Unknown outcome: " + found.outcome()); - case Abort: - locallyInvalidateAndCallback(node, txnId, req.lowEpoch, req.highEpoch, req.fetchKeys, new FetchResult(found, req.fetchKeys, null), req.callback); - break; - - case Unknown: - if (found.canProposeInvalidation()) - { - Invalidate.invalidate(node, txnId, req.fetchKeys, false, req.lowEpoch, req.highEpoch, (outcome, throwable) -> { - FetchResult result = null; - if (throwable == null) - { - Known achieved = Known.Invalidated; - Unseekables<?> achievedTarget = req.fetchKeys, didNotAchieveTarget = null; - if (outcome.asProgressToken().status != Status.Invalidated) - { - achievedTarget = req.fetchKeys.slice(0, 0); - didNotAchieveTarget = req.fetchKeys; - achieved = Known.Nothing; - } - result = new FetchResult(achieved, achievedTarget, didNotAchieveTarget); - } - req.callback.accept(result, throwable); - }); - break; - } - case Erased: - case WasApply: - case Apply: - // TODO (required): we may be stale - req.callback.accept(new FetchResult(found, req.fetchKeys.slice(0, 0), req.fetchKeys), null); - } - } - - private static void fetchWithIncompleteRoute(Node node, Route<?> someRoute, FetchRequest request) - { - long srcEpoch = request.srcEpoch; - Invariants.requireArgument(node.topology().hasEpoch(srcEpoch), "Unknown epoch %d, latest known is %d", srcEpoch, node.epoch()); - FindRoute.findRoute(node, request.txnId, request.invalidIf, someRoute.withHomeKey(), (foundRoute, fail) -> { - if (fail != null) request.callback.accept(null, fail); - else if (foundRoute == null) fetchViaSomeRoute(node, someRoute, request); - else fetch(node, foundRoute.route, request); - }); - } - - public static void fetch(Node node, FullRoute<?> route, FetchRequest request) - { - node.withEpochAtLeast(request.srcEpoch, request.callback, () -> { - Ranges ranges = request.localRanges(node); - fetchInternal(node, ranges, route, request); - }); - } - - private static Object fetchInternal(Node node, Ranges ranges, Route<?> route, FetchRequest request) - { - long srcEpoch = request.srcEpoch; - Invariants.requireArgument(node.topology().hasEpoch(srcEpoch), "Unknown epoch %d, latest known is %d", srcEpoch, node.epoch()); - Route<?> slicedRoute = route.slice(ranges); - return fetchData(node, slicedRoute, route, request); - } - final BiConsumer<? super FetchResult, Throwable> callback; /** * The epoch until which we want to persist any response for locally */ final Known target; final Route<?> maxRoute; + final Participants<?> requestedFor; // to support cases where a later epoch that ultimately does not participate in execution has a vestigial entry // (i.e. if preaccept/accept contact a later epoch than execution is decided for) - final long lowEpoch, highEpoch; - - final Unseekables<?> propagateTo; + final LatentStoreSelector reportTo; - private FetchData(Node node, Known target, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> maxRoute, Unseekables<?> propagateTo, long sourceEpoch, long lowEpoch, long highEpoch, BiConsumer<? super FetchResult, Throwable> callback) + private FetchData(Node node, Known target, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> maxRoute, Participants<?> requestedFor, long sourceEpoch, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { - this(node, target, txnId, invalidIf, route, route.withHomeKey(), maxRoute, propagateTo, sourceEpoch, lowEpoch, highEpoch, callback); + this(node, target, txnId, invalidIf, route, route.withHomeKey(), maxRoute, requestedFor, sourceEpoch, reportTo, callback); } - private FetchData(Node node, Known target, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, Unseekables<?> propagateTo, long sourceEpoch, long lowEpoch, long highEpoch, BiConsumer<? super FetchResult, Throwable> callback) + private FetchData(Node node, Known target, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, Participants<?> requestedFor, long sourceEpoch, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { // TODO (desired, efficiency): restore behaviour of only collecting info if e.g. Committed or Executed super(node, txnId, routeWithHomeKey, sourceEpoch, CheckStatus.IncludeInfo.All, null, invalidIf); - this.propagateTo = propagateTo; - this.lowEpoch = lowEpoch; + this.reportTo = reportTo; this.maxRoute = maxRoute; + this.requestedFor = requestedFor; Invariants.requireArgument(routeWithHomeKey.contains(route.homeKey()), "route %s does not contain %s", routeWithHomeKey, route.homeKey()); this.target = target; - this.highEpoch = highEpoch; this.callback = callback; } private static FetchData fetchData(Node node, Route<?> route, Route<?> maxRoute, FetchRequest req) { - Invariants.require(!req.fetchKeys.isEmpty()); - FetchData fetch = new FetchData(node, req.fetch, req.txnId, req.invalidIf, route, maxRoute, req.fetchKeys, req.srcEpoch, req.lowEpoch, req.highEpoch, req.callback); + Invariants.require(!req.contactable.isEmpty()); + FetchData fetch = new FetchData(node, req.fetch, req.txnId, req.invalidIf, route, maxRoute, req.requestedFor, req.srcEpoch, req.reportTo, req.callback); fetch.start(); return fetch; } - private static FetchData fetchData(Node node, Known fetch, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> maxRoute, Unseekables<?> localKeys, long sourceEpoch, long lowEpoch, long highEpoch, BiConsumer<? super FetchResult, Throwable> callback) + private static FetchData fetchData(Node node, Known fetch, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> maxRoute, Participants<?> requestedFor, long sourceEpoch, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { - Invariants.require(!localKeys.isEmpty()); - FetchData fetchData = new FetchData(node, fetch, txnId, invalidIf, route, maxRoute, localKeys, sourceEpoch, lowEpoch, highEpoch, callback); + FetchData fetchData = new FetchData(node, fetch, txnId, invalidIf, route, maxRoute, requestedFor, sourceEpoch, reportTo, callback); fetchData.start(); return fetchData; } protected Route<?> query() { - return route; + return query; } @Override protected boolean isSufficient(Node.Id from, CheckStatus.CheckStatusOk ok) { Ranges rangesForNode = topologies().computeRangesForNode(from); - Route<?> scope = this.route.slice(rangesForNode); + Route<?> scope = this.query.slice(rangesForNode); return isSufficient(scope, ok); } @Override protected boolean isSufficient(CheckStatus.CheckStatusOk ok) { - return isSufficient(route, ok); + return isSufficient(query, ok); } protected boolean isSufficient(Route<?> scope, CheckStatus.CheckStatusOk ok) @@ -363,7 +199,7 @@ public class FetchData extends CheckShards<Route<?>> Invariants.require(isSufficient(merged), "Status %s is not sufficient", merged); // TODO (expected): should we automatically trigger a new fetch if we find executeAt but did not request enough information? would be more robust - Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, lowEpoch, highEpoch, success.withQuorum, query(), propagateTo, target, (CheckStatusOkFull) merged, callback); + Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query(), requestedFor, reportTo, target, (CheckStatusOkFull) merged, callback); } } } diff --git a/accord-core/src/main/java/accord/coordinate/FetchSomeRoute.java b/accord-core/src/main/java/accord/coordinate/FetchSomeRoute.java new file mode 100644 index 00000000..a1691f92 --- /dev/null +++ b/accord-core/src/main/java/accord/coordinate/FetchSomeRoute.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.coordinate; + +import java.util.function.BiConsumer; + +import accord.local.Commands; +import accord.local.Node; +import accord.local.SafeCommand; +import accord.local.SafeCommandStore; +import accord.primitives.Known; +import accord.messages.CheckStatus.CheckStatusOk; +import accord.messages.CheckStatus.IncludeInfo; +import accord.primitives.WithQuorum; +import accord.primitives.Participants; +import accord.primitives.Route; +import accord.primitives.TxnId; +import accord.utils.MapReduceConsume; + +import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid; +import static accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback; +import static accord.local.CommandStores.*; +import static accord.primitives.Known.Nothing; +import static accord.primitives.WithQuorum.HasQuorum; + +/** + * Find some Route for a txnId using some known participants + */ +public class FetchSomeRoute extends CheckShards<Participants<?>> +{ + final LatentStoreSelector reportTo; + final BiConsumer<Route<?>, Throwable> callback; + FetchSomeRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback) + { + super(node, txnId, contactable, IncludeInfo.Route, null, invalidIf); + this.reportTo = reportTo; + this.callback = callback; + } + + public static void fetchSomeRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Participants<?> unseekables, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback) + { + if (!node.topology().hasEpoch(txnId.epoch())) + { + node.withEpochAtLeast(txnId.epoch(), callback, () -> fetchSomeRoute(node, txnId, invalidIf, unseekables, reportTo, callback)); + return; + } + + FetchSomeRoute fetchSomeRoute = new FetchSomeRoute(node, txnId, invalidIf, unseekables, reportTo, callback); + fetchSomeRoute.start(); + } + + + public static void fetchSomeRoute(Node node, TxnId txnId, Participants<?> contactable, BiConsumer<Route<?>, Throwable> callback) + { + fetchSomeRoute(node, txnId, contactable, LatentStoreSelector.standard(), callback); + } + + public static void fetchSomeRoute(Node node, TxnId txnId, Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback) + { + fetchSomeRoute(node, txnId, NotKnownToBeInvalid, contactable, reportTo, callback); + } + + @Override + protected boolean isSufficient(CheckStatusOk ok) + { + return ok.route != null; + } + + @Override + protected void onDone(Success success, Throwable failure) + { + if (failure != null) callback.accept(null, failure); + else + { + final Route<?> route = merged == null ? null : merged.route; + if (route == null) + { + Known known = Nothing; + if (merged != null) + known = merged.finish(query, query, query, success.withQuorum, previouslyKnownToBeInvalidIf).knownFor(txnId, query, query); + reportRouteNotFound(node, success.withQuorum, known, txnId, query, reportTo, callback); + } + else + { + StoreSelector selector = reportTo.refine(txnId, null, query); + node.mapReduceConsumeLocal(txnId, selector, new MapReduceConsume<>() + { + @Override + public void accept(Object result, Throwable failure) + { + callback.accept(route, null); + } + + @Override + public Object apply(SafeCommandStore safeStore) + { + SafeCommand safeCommand = safeStore.ifInitialised(txnId); + if (safeCommand != null) + Commands.updateRoute(safeStore, safeCommand, route); + return null; + } + + @Override + public Object reduce(Object o1, Object o2) + { + return null; + } + }); + } + } + } + + private static void reportRouteNotFound(Node node, WithQuorum withQuorum, Known found, TxnId txnId, Participants<?> participants, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback) + { + switch (found.outcome()) + { + default: throw new AssertionError("Unknown outcome: " + found.outcome()); + case Abort: + locallyInvalidateAndCallback(node, txnId, reportTo.refine(txnId, null, participants), participants, null, callback); + break; + + case Unknown: + if (withQuorum == HasQuorum && found.canProposeInvalidation()) + { + Invalidate.invalidate(node, txnId, participants, false, reportTo, (outcome, throwable) -> callback.accept(null, throwable)); + break; + } + case Erased: + case WasApply: + case Apply: + callback.accept(null, null); + } + } + +} diff --git a/accord-core/src/main/java/accord/coordinate/FindRoute.java b/accord-core/src/main/java/accord/coordinate/FindRoute.java deleted file mode 100644 index a5a1046e..00000000 --- a/accord-core/src/main/java/accord/coordinate/FindRoute.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.coordinate; - -import java.util.function.BiConsumer; - -import accord.coordinate.Infer.InvalidIf; -import accord.local.Node; -import accord.messages.CheckStatus.CheckStatusOk; -import accord.messages.CheckStatus.IncludeInfo; -import accord.primitives.*; - -import static accord.primitives.Route.isFullRoute; - -/** - * Find the Route of a known (txnId, homeKey) pair - */ -public class FindRoute extends CheckShards<Route<?>> -{ - public static class Result - { - public final FullRoute<?> route; - public final Timestamp executeAt; - - public Result(FullRoute<?> route, Timestamp executeAt) - { - this.route = route; - this.executeAt = executeAt; - } - - public Result(CheckStatusOk ok) - { - this.route = Route.castToFullRoute(ok.route); - this.executeAt = ok.maxKnown().isExecuteAtKnown() ? ok.executeAt : null; - } - } - - final BiConsumer<Result, Throwable> callback; - FindRoute(Node node, TxnId txnId, InvalidIf invalidIf, Route<?> someRoute, BiConsumer<Result, Throwable> callback) - { - super(node, txnId, someRoute, IncludeInfo.Route, null, invalidIf); - this.callback = callback; - } - - public static FindRoute findRoute(Node node, TxnId txnId, InvalidIf invalidIf, Route<?> someRoute, BiConsumer<Result, Throwable> callback) - { - FindRoute findRoute = new FindRoute(node, txnId, invalidIf, someRoute, callback); - findRoute.start(); - return findRoute; - } - - @Override - protected boolean isSufficient(CheckStatusOk ok) - { - return isFullRoute(ok.route); - } - - @Override - protected void onDone(Success success, Throwable failure) - { - if (failure != null) callback.accept(null, failure); - else if (success == Success.Success) callback.accept(new Result(merged), null); - else callback.accept(null, null); - } -} diff --git a/accord-core/src/main/java/accord/coordinate/FindSomeRoute.java b/accord-core/src/main/java/accord/coordinate/FindSomeRoute.java deleted file mode 100644 index f087f4be..00000000 --- a/accord-core/src/main/java/accord/coordinate/FindSomeRoute.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.coordinate; - -import java.util.function.BiConsumer; - -import accord.local.Node; -import accord.primitives.Known; -import accord.messages.CheckStatus.CheckStatusOk; -import accord.messages.CheckStatus.IncludeInfo; -import accord.primitives.WithQuorum; -import accord.primitives.Participants; -import accord.primitives.Route; -import accord.primitives.TxnId; - -import static accord.primitives.Known.Nothing; - -/** - * Find the homeKey of a txnId with some known keys - */ -public class FindSomeRoute extends CheckShards<Participants<?>> -{ - static class Result - { - public final Route<?> route; - public final Known known; - public final WithQuorum withQuorum; - - Result(Route<?> route, Known known, WithQuorum withQuorum) - { - this.route = route; - this.known = known; - this.withQuorum = withQuorum; - } - } - - final BiConsumer<Result, Throwable> callback; - FindSomeRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Participants<?> unseekables, BiConsumer<Result, Throwable> callback) - { - super(node, txnId, unseekables, IncludeInfo.Route, null, invalidIf); - this.callback = callback; - } - - public static void findSomeRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Participants<?> unseekables, BiConsumer<Result, Throwable> callback) - { - if (!node.topology().hasEpoch(txnId.epoch())) - { - node.withEpochAtLeast(txnId.epoch(), callback, () -> findSomeRoute(node, txnId, invalidIf, unseekables, callback)); - return; - } - - FindSomeRoute findSomeRoute = new FindSomeRoute(node, txnId, invalidIf, unseekables, callback); - findSomeRoute.start(); - } - - @Override - protected boolean isSufficient(CheckStatusOk ok) - { - return ok.homeKey != null; - } - - @Override - protected void onDone(Success success, Throwable failure) - { - if (failure != null) callback.accept(null, failure); - else if (merged == null) callback.accept(new Result(null, Nothing, success.withQuorum), null); - else callback.accept(new Result(merged.route, merged.finish(this.route, this.route, this.route, success.withQuorum, previouslyKnownToBeInvalidIf).knownFor(txnId, this.route, this.route), success.withQuorum), null); - } -} diff --git a/accord-core/src/main/java/accord/coordinate/Infer.java b/accord-core/src/main/java/accord/coordinate/Infer.java index cc50ffbb..e5e4c6b3 100644 --- a/accord-core/src/main/java/accord/coordinate/Infer.java +++ b/accord-core/src/main/java/accord/coordinate/Infer.java @@ -21,6 +21,8 @@ package accord.coordinate; import java.util.function.BiConsumer; import accord.local.Command; +import accord.local.CommandStores.StoreFinder; +import accord.local.CommandStores.StoreSelector; import accord.local.Commands; import accord.local.Node; import accord.local.SafeCommand; @@ -30,13 +32,10 @@ import accord.primitives.Known; import accord.local.StoreParticipants; import accord.primitives.Participants; import accord.primitives.TxnId; -import accord.primitives.Unseekables; import accord.utils.Invariants; import accord.utils.MapReduceConsume; import static accord.primitives.Status.PreCommitted; -import static accord.primitives.Route.castToRoute; -import static accord.primitives.Route.isRoute; // TODO (testing): dedicated randomised testing of all inferences public class Infer @@ -108,17 +107,16 @@ public class Infer final TxnId txnId; // TODO (expected): more consistent handling of transactions that only MAY intersect a commandStore // (e.g. dependencies from an earlier epoch that have not yet committed, or commands that are proposed to execute in a later epoch than eventually agreed) - final long lowEpoch, highEpoch; + final StoreSelector reportTo; final Participants<?> participants; final T param; final BiConsumer<T, Throwable> callback; - private CleanupAndCallback(Node node, TxnId txnId, long lowEpoch, long highEpoch, Participants<?> participants, T param, BiConsumer<T, Throwable> callback) + private CleanupAndCallback(Node node, TxnId txnId, StoreSelector reportTo, Participants<?> participants, T param, BiConsumer<T, Throwable> callback) { this.node = node; this.txnId = txnId; - this.lowEpoch = lowEpoch; - this.highEpoch = highEpoch; + this.reportTo = reportTo; this.participants = participants; this.param = param; this.callback = callback; @@ -126,8 +124,7 @@ public class Infer void start() { - Unseekables<?> propagateTo = isRoute(participants) ? castToRoute(participants).withHomeKey() : participants; - node.mapReduceConsumeLocal(txnId, propagateTo, lowEpoch, highEpoch, this); + node.mapReduceConsumeLocal(txnId, reportTo.refine(txnId, null, participants), this); } @Override @@ -153,17 +150,21 @@ public class Infer } } - // TODO (required, consider): low and high bounds are correct? static class InvalidateAndCallback<T> extends CleanupAndCallback<T> { - private InvalidateAndCallback(Node node, TxnId txnId, long lowEpoch, long highEpoch, Participants<?> someUnseekables, T param, BiConsumer<T, Throwable> callback) + private InvalidateAndCallback(Node node, TxnId txnId, StoreSelector selector, Participants<?> participants, T param, BiConsumer<T, Throwable> callback) { - super(node, txnId, lowEpoch, highEpoch, someUnseekables, param, callback); + super(node, txnId, selector, participants, param, callback); } - public static <T> void locallyInvalidateAndCallback(Node node, TxnId txnId, long lowEpoch, long highEpoch, Participants<?> someUnseekables, T param, BiConsumer<T, Throwable> callback) + public static <T> void locallyInvalidateAndCallback(Node node, TxnId txnId, long lowEpoch, long highEpoch, Participants<?> participants, T param, BiConsumer<T, Throwable> callback) { - new InvalidateAndCallback<>(node, txnId, lowEpoch, highEpoch, someUnseekables, param, callback).start(); + new InvalidateAndCallback<>(node, txnId, StoreFinder.selector(participants, lowEpoch, highEpoch), participants, param, callback).start(); + } + + public static <T> void locallyInvalidateAndCallback(Node node, TxnId txnId, StoreSelector selector, Participants<?> participants, T param, BiConsumer<T, Throwable> callback) + { + new InvalidateAndCallback<>(node, txnId, selector, participants, param, callback).start(); } @Override diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java b/accord-core/src/main/java/accord/coordinate/Invalidate.java index 3912058d..813aca1b 100644 --- a/accord-core/src/main/java/accord/coordinate/Invalidate.java +++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java @@ -23,6 +23,8 @@ import java.util.function.BiConsumer; import accord.coordinate.tracking.InvalidationTracker; import accord.coordinate.tracking.InvalidationTracker.InvalidationShardTracker; import accord.coordinate.tracking.RequestStatus; +import accord.local.CommandStores.LatentStoreSelector; +import accord.local.CommandStores.StoreSelector; import accord.local.Node.Id; import accord.messages.Commit; import accord.local.*; @@ -63,9 +65,9 @@ public class Invalidate implements Callback<InvalidateReply> private final SortedListMap<Id, InvalidateReply> replies; private final InvalidationTracker tracker; private Throwable failure; - private final long reportLowEpoch, reportHighEpoch; + private final LatentStoreSelector reportTo; - private Invalidate(Node node, Ballot ballot, TxnId txnId, Participants<?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + private Invalidate(Node node, Ballot ballot, TxnId txnId, Participants<?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { this.callback = callback; this.node = node; @@ -73,8 +75,7 @@ public class Invalidate implements Callback<InvalidateReply> this.txnId = txnId; this.invalidateWith = invalidateWith; this.transitivelyInvokedByPriorInvalidation = transitivelyInvokedByPriorInvalidation; - this.reportLowEpoch = reportLowEpoch; - this.reportHighEpoch = reportHighEpoch; + this.reportTo = reportTo; Topologies topologies = node.topology().forEpoch(invalidateWith, txnId.epoch(), SHARE); Invariants.require(topologies.size() == 1); this.tracker = new InvalidationTracker(topologies, txnId); @@ -88,13 +89,13 @@ public class Invalidate implements Callback<InvalidateReply> public static Invalidate invalidate(Node node, TxnId txnId, Participants<?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, BiConsumer<Outcome, Throwable> callback) { - return invalidate(node, txnId, invalidateWith, transitivelyInvokedByPriorInvalidation, txnId.epoch(), txnId.epoch(), callback); + return invalidate(node, txnId, invalidateWith, transitivelyInvokedByPriorInvalidation, LatentStoreSelector.standard(), callback); } - public static Invalidate invalidate(Node node, TxnId txnId, Participants<?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + public static Invalidate invalidate(Node node, TxnId txnId, Participants<?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { Ballot ballot = node.uniqueTimestamp(Ballot::fromValues); - Invalidate invalidate = new Invalidate(node, ballot, txnId, invalidateWith, transitivelyInvokedByPriorInvalidation, reportLowEpoch, reportHighEpoch, callback); + Invalidate invalidate = new Invalidate(node, ballot, txnId, invalidateWith, transitivelyInvokedByPriorInvalidation, reportTo, callback); invalidate.start(); return invalidate; } @@ -216,7 +217,7 @@ public class Invalidate implements Callback<InvalidateReply> if (!invalidateWith.containsAll(fullRoute)) witnessedByInvalidation = null; } - RecoverWithRoute.recover(node, txnId, NotKnownToBeInvalid, fullRoute, witnessedByInvalidation, reportLowEpoch, reportHighEpoch, callback); + RecoverWithRoute.recover(node, txnId, NotKnownToBeInvalid, fullRoute, witnessedByInvalidation, reportTo, callback); return; case Invalidated: @@ -289,14 +290,14 @@ public class Invalidate implements Callback<InvalidateReply> // so we do not need to explicitly do so here before notifying the waiter Participants<?> commitTo = Participants.merge(route, (Participants) invalidateWith); Commit.Invalidate.commitInvalidate(node, txnId, commitTo, txnId); - commitInvalidateLocal(commitTo, reportLowEpoch, reportHighEpoch); + commitInvalidateLocal(commitTo, reportTo.refine(txnId, null, commitTo)); } - private void commitInvalidateLocal(Participants<?> commitTo, long lowEpoch, long highEpoch) + private void commitInvalidateLocal(Participants<?> commitTo, StoreSelector reportTo) { // TODO (desired): merge with FetchData.InvalidateOnDone // TODO (desired): when sending to network, register a callback for when local application of commitInvalidate message ahs been performed, so no need to special-case - node.forEachLocal(txnId, commitTo, lowEpoch, highEpoch, safeStore -> { + node.forEachLocal(txnId, reportTo.refine(txnId, null, commitTo), safeStore -> { // TODO (expected): consid StoreParticipants participants = StoreParticipants.notAccept(safeStore, commitTo, txnId); Commands.commitInvalidate(safeStore, safeStore.get(txnId, participants), commitTo); diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java index dd26cb47..37831946 100644 --- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java +++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java @@ -20,6 +20,7 @@ package accord.coordinate; import java.util.function.BiConsumer; +import accord.local.CommandStores.StoreSelector; import accord.messages.InformDurable; import accord.primitives.*; import accord.utils.Invariants; @@ -31,6 +32,7 @@ import accord.utils.UnhandledEnum; import static accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback; import static accord.messages.Commit.Invalidate.commitInvalidate; +import static accord.primitives.WithQuorum.HasQuorum; /** * A result of null indicates the transaction is globally persistent @@ -40,21 +42,20 @@ public class MaybeRecover extends CheckShards<Route<?>> { final ProgressToken prevProgress; final BiConsumer<Outcome, Throwable> callback; - final long reportLowEpoch, reportHighEpoch; + final StoreSelector reportTo; - MaybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + MaybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, StoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { // we only want to enquire with the home shard, but we prefer maximal route information for running Invalidation against, if necessary super(node, txnId, someRoute.withHomeKey(), IncludeInfo.Route, null, invalidIf); this.prevProgress = prevProgress; this.callback = callback; - this.reportLowEpoch = reportLowEpoch; - this.reportHighEpoch = reportHighEpoch; + this.reportTo = reportTo; } - public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, StoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { - MaybeRecover maybeRecover = new MaybeRecover(node, txnId, invalidIf, someRoute, prevProgress, reportLowEpoch, reportHighEpoch, callback); + MaybeRecover maybeRecover = new MaybeRecover(node, txnId, invalidIf, someRoute, prevProgress, reportTo, callback); maybeRecover.start(); return maybeRecover; } @@ -85,7 +86,7 @@ public class MaybeRecover extends CheckShards<Route<?>> else { Invariants.require(merged != null); - CheckStatusOk full = merged.finish(this.route, this.route, this.route, success.withQuorum, previouslyKnownToBeInvalidIf); + CheckStatusOk full = merged.finish(this.query, this.query, this.query, success.withQuorum, previouslyKnownToBeInvalidIf); Known known = full.maxKnown(); Route<?> someRoute = full.route; @@ -102,7 +103,7 @@ public class MaybeRecover extends CheckShards<Route<?>> // may be disseminated globally. However, if all shards are erased then the outcome must be // decided locally by the application of GC points. // TODO (expected): replicas may be stale in this case, and should detect this and stop attempting to coordinate/invalidate. - if (known.canProposeInvalidation() && !Route.isFullRoute(full.route)) + if (success.withQuorum == HasQuorum && known.canProposeInvalidation() && !Route.isFullRoute(full.route)) { // for correctness reasons, we have not necessarily preempted the initial pre-accept round and // may have raced with it, so we must attempt to recover anything we see pre-accepted. @@ -116,12 +117,12 @@ public class MaybeRecover extends CheckShards<Route<?>> if (hasMadeProgress(full) || !Route.isFullRoute(someRoute)) { if (full.durability.isDurable()) - InformDurable.informDefault(node, topologies, txnId, route, full.executeAtIfKnown(), full.durability); + InformDurable.informDefault(node, topologies, txnId, query, full.executeAtIfKnown(), full.durability); callback.accept(full.toProgressToken(), null); } else { - node.recover(txnId, full.invalidIf, Route.castToFullRoute(someRoute), reportLowEpoch, reportHighEpoch).invoke(callback); + node.recover(txnId, full.invalidIf, Route.castToFullRoute(someRoute), reportTo).invoke(callback); } break; @@ -132,7 +133,7 @@ public class MaybeRecover extends CheckShards<Route<?>> break; case Abort: - commitInvalidate(node, txnId, Route.merge(full.route, (Route)route), txnId.epoch()); + commitInvalidate(node, txnId, Route.merge(full.route, (Route) query), txnId.epoch()); locallyInvalidateAndCallback(node, txnId, txnId.epoch(), txnId.epoch(), someRoute, full.toProgressToken(), callback); break; } diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java index ee7a58c8..62c6c2d2 100644 --- a/accord-core/src/main/java/accord/coordinate/Recover.java +++ b/accord-core/src/main/java/accord/coordinate/Recover.java @@ -32,6 +32,7 @@ import accord.api.Result; import accord.api.RoutingKey; import accord.coordinate.ExecuteFlag.ExecuteFlags; import accord.coordinate.tracking.RecoveryTracker; +import accord.local.CommandStores.LatentStoreSelector; import accord.local.Node; import accord.local.Node.Id; import accord.messages.Accept; @@ -109,7 +110,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw private final Txn txn; private final FullRoute<?> route; private final @Nullable Timestamp committedExecuteAt; - private final long reportLowEpoch, reportHighEpoch; + private final LatentStoreSelector reportTo; private final BiConsumer<Outcome, Throwable> callback; private boolean isDone; @@ -118,12 +119,9 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw private boolean isBallotPromised; private Recover(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, - @Nullable Timestamp committedExecuteAt, long reportLowEpoch, long reportHighEpoch, + @Nullable Timestamp committedExecuteAt, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { - this.committedExecuteAt = committedExecuteAt; - this.reportLowEpoch = reportLowEpoch; - this.reportHighEpoch = reportHighEpoch; Invariants.require(txnId.isVisible()); this.adapter = node.coordinationAdapter(txnId, Recovery); this.node = node; @@ -131,6 +129,8 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw this.txnId = txnId; this.txn = txn; this.route = route; + this.committedExecuteAt = committedExecuteAt; + this.reportTo = reportTo; this.callback = callback; this.tracker = new RecoveryTracker(topologies); this.recoverOks = new SortedListMap<>(topologies.nodes(), RecoverOk[]::new); @@ -166,39 +166,39 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw public static Recover recover(Node node, TxnId txnId, Txn txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback) { - return recover(node, txnId, txn, route, txnId.epoch(), txnId.epoch(), callback); + return recover(node, txnId, txn, route, LatentStoreSelector.standard(), callback); } - public static Recover recover(Node node, TxnId txnId, Txn txn, FullRoute<?> route, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + public static Recover recover(Node node, TxnId txnId, Txn txn, FullRoute<?> route, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { Ballot ballot = node.uniqueTimestamp(Ballot::fromValues); - return recover(node, ballot, txnId, txn, route, reportLowEpoch, reportHighEpoch, callback); + return recover(node, ballot, txnId, txn, route, reportTo, callback); } private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback) { - return recover(node, ballot, txnId, txn, route, null, txnId.epoch(), txnId.epoch(), callback); + return recover(node, ballot, txnId, txn, route, null, null, callback); } - private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { - return recover(node, ballot, txnId, txn, route, null, reportLowEpoch, reportHighEpoch, callback); + return recover(node, ballot, txnId, txn, route, null, reportTo, callback); } public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, @Nullable Timestamp executeAt, BiConsumer<Outcome, Throwable> callback) { - return recover(node, ballot, txnId, txn, route, executeAt, txnId.epoch(), (executeAt == null ? txnId : executeAt).epoch(), callback); + return recover(node, ballot, txnId, txn, route, executeAt, null, callback); } - public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, @Nullable Timestamp executeAt, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, @Nullable Timestamp executeAt, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { Topologies topologies = node.topology().select(route, txnId, executeAt == null ? txnId : executeAt, SHARE, QuorumEpochIntersections.recover); - return recover(node, topologies, ballot, txnId, txn, route, executeAt, reportLowEpoch, reportHighEpoch, callback); + return recover(node, topologies, ballot, txnId, txn, route, executeAt, reportTo, callback); } - private static Recover recover(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + private static Recover recover(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { - Recover recover = new Recover(node, topologies, ballot, txnId, txn, route, executeAt, reportLowEpoch, reportHighEpoch, callback); + Recover recover = new Recover(node, topologies, ballot, txnId, txn, route, executeAt, reportTo, callback); recover.start(topologies.nodes()); return recover; } @@ -523,12 +523,11 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw private void commitInvalidate(Timestamp invalidateUntil) { - long highEpoch = Math.max(invalidateUntil.epoch(), reportHighEpoch); - node.withEpochAtLeast(highEpoch, node.agent(), () -> { + node.withEpochAtLeast(invalidateUntil.epoch(), node.agent(), () -> { Commit.Invalidate.commitInvalidate(node, txnId, route, invalidateUntil); }); isDone = true; - locallyInvalidateAndCallback(node, txnId, reportLowEpoch, highEpoch, route, ProgressToken.INVALIDATED, callback); + locallyInvalidateAndCallback(node, txnId, reportTo.refine(txnId, null, route), route, ProgressToken.INVALIDATED, callback); } private void propose(Accept.Kind kind, Timestamp executeAt, List<RecoverOk> recoverOkList) @@ -549,7 +548,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw Topologies topologies = tracker.topologies(); if (executeAt != null && executeAt.epoch() != (this.committedExecuteAt == null ? txnId : this.committedExecuteAt).epoch()) topologies = node.topology().select(route, txnId, executeAt, SHARE, QuorumEpochIntersections.recover); - Recover.recover(node, topologies, node.uniqueTimestamp(Ballot::fromValues), txnId, txn, route, executeAt, reportLowEpoch, reportHighEpoch, callback); + Recover.recover(node, topologies, node.uniqueTimestamp(Ballot::fromValues), txnId, txn, route, executeAt, reportTo, callback); } AsyncResult<InferredFastPath> awaitEarlier(Node node, Deps waitOn, BlockedUntil blockedUntil) diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java index 9935dacf..e2970b8a 100644 --- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java +++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java @@ -21,6 +21,7 @@ package accord.coordinate; import java.util.function.BiConsumer; import javax.annotation.Nullable; +import accord.local.CommandStores.LatentStoreSelector; import accord.local.Node; import accord.local.Node.Id; import accord.messages.CheckStatus; @@ -62,14 +63,12 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> { final BiConsumer<Outcome, Throwable> callback; final Status witnessedByInvalidation; - // TODO (required): we don't use this on all paths, and should at least make sure we don't trigger any Invariant failures e.g. "Fetch was successful for all keys, but the WaitingState has not been cleared" - final long reportLowEpoch, reportHighEpoch; + final LatentStoreSelector reportTo; - private RecoverWithRoute(Node node, Topologies topologies, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, Status witnessedByInvalidation, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + private RecoverWithRoute(Node node, Topologies topologies, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, Status witnessedByInvalidation, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { super(node, txnId, route, IncludeInfo.All, node.uniqueTimestamp(Ballot::fromValues), invalidIf); - this.reportLowEpoch = reportLowEpoch; - this.reportHighEpoch = reportHighEpoch; + this.reportTo = reportTo; // if witnessedByInvalidation == AcceptedInvalidate then we cannot assume its definition was known, and our comparison with the status is invalid Invariants.require(witnessedByInvalidation != Status.AcceptedInvalidate); // if witnessedByInvalidation == Invalidated we should anyway not be recovering @@ -79,14 +78,14 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> assert topologies.oldestEpoch() == topologies.currentEpoch() && topologies.currentEpoch() == txnId.epoch(); } - public static RecoverWithRoute recover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable Status witnessedByInvalidation, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + public static RecoverWithRoute recover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable Status witnessedByInvalidation, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { - return recover(node, node.topology().forEpoch(route, txnId.epoch(), SHARE), txnId, invalidIf, route, witnessedByInvalidation, reportLowEpoch, reportHighEpoch, callback); + return recover(node, node.topology().forEpoch(route, txnId.epoch(), SHARE), txnId, invalidIf, route, witnessedByInvalidation, reportTo, callback); } - private static RecoverWithRoute recover(Node node, Topologies topologies, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable Status witnessedByInvalidation, long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback) + private static RecoverWithRoute recover(Node node, Topologies topologies, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable Status witnessedByInvalidation, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { - RecoverWithRoute recover = new RecoverWithRoute(node, topologies, txnId, invalidIf, route, witnessedByInvalidation, reportLowEpoch, reportHighEpoch, callback); + RecoverWithRoute recover = new RecoverWithRoute(node, topologies, txnId, invalidIf, route, witnessedByInvalidation, reportTo, callback); recover.start(); return recover; } @@ -94,21 +93,21 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> @Override public void contact(Id to) { - node.send(to, new CheckStatus(to, topologies(), txnId, route, sourceEpoch, IncludeInfo.All, bumpBallot), this); + node.send(to, new CheckStatus(to, topologies(), txnId, query, sourceEpoch, IncludeInfo.All, bumpBallot), this); } @Override protected boolean isSufficient(Id from, CheckStatusOk ok) { Ranges rangesForNode = topologies().getEpoch(txnId.epoch()).rangesForNode(from); - Route<?> route = this.route.slice(rangesForNode); + Route<?> route = this.query.slice(rangesForNode); return isSufficient(route, ok); } @Override protected boolean isSufficient(CheckStatusOk ok) { - return isSufficient(route, merged); + return isSufficient(query, merged); } protected boolean isSufficient(Route<?> route, CheckStatusOk ok) @@ -134,8 +133,8 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> return; } - CheckStatusOkFull full = ((CheckStatusOkFull) this.merged).finish(route, route, route, success.withQuorum, previouslyKnownToBeInvalidIf); - Known known = full.knownFor(txnId, route, route); + CheckStatusOkFull full = ((CheckStatusOkFull) this.merged).finish(query, query, query, success.withQuorum, previouslyKnownToBeInvalidIf); + Known known = full.knownFor(txnId, query, query); // TODO (required): audit this logic, and centralise with e.g. FetchData inferences // TODO (expected): skip straight to ExecuteTxn if we have a Stable reply from each shard @@ -145,14 +144,14 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> case Unknown: if (known.definition().isKnown()) { - Txn txn = full.partialTxn.reconstitute(route); - Recover.recover(node, txnId, txn, route, reportLowEpoch, reportHighEpoch, callback); + Txn txn = full.partialTxn.reconstitute(query); + Recover.recover(node, txnId, txn, query, reportTo, callback); } else if (!known.definition().isOrWasKnown()) { if (witnessedByInvalidation != null && witnessedByInvalidation.compareTo(Status.PreAccepted) > 0) throw illegalState("We previously invalidated, finding a status that should be recoverable"); - Invalidate.invalidate(node, txnId, route, witnessedByInvalidation != null, reportLowEpoch, reportHighEpoch, callback); + Invalidate.invalidate(node, txnId, query, witnessedByInvalidation != null, reportTo, callback); } else { @@ -173,11 +172,11 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> // TODO (expected): if we determine new durability, propagate it CheckStatusOkFull propagate; - if (!full.map.hasFullyTruncated(route)) + if (!full.map.hasFullyTruncated(query)) { // we might have only part of the full transaction, and a shard may have truncated; // in this case we want to skip straight to apply, but only for the shards that haven't truncated - Route<?> trySendTo = route.without(full.map.matchingRanges(minMax -> minMax.min.isTruncated())); + Route<?> trySendTo = query.without(full.map.matchingRanges(minMax -> minMax.min.isTruncated())); if (!trySendTo.isEmpty()) { if (known.isInvalidated()) @@ -193,20 +192,20 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> if (!known.is(DepsKnown)) { Invariants.require(txnId.isSystemTxn() || full.partialTxn.covers(trySendTo)); - Participants<?> haveStable = full.map.knownFor(Known.DepsOnly, route); - Route<?> haveUnstable = route.without(haveStable); + Participants<?> haveStable = full.map.knownFor(Known.DepsOnly, query); + Route<?> haveUnstable = query.without(haveStable); Deps stable = full.stableDeps.reconstitutePartial(haveStable).asFullUnsafe(); - LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, full.executeAt, full.partialTxn, stable, haveUnstable, trySendTo, SLICE, route, callback, deps -> { + LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, full.executeAt, full.partialTxn, stable, haveUnstable, trySendTo, SLICE, query, callback, deps -> { Deps stableDeps = deps.intersecting(trySendTo); - node.coordinationAdapter(txnId, Recovery).persist(node, null, trySendTo, trySendTo, SLICE, route, bumpBallot, txnId, full.partialTxn, full.executeAt, stableDeps, full.writes, full.result, informDurableOnDone, null); + node.coordinationAdapter(txnId, Recovery).persist(node, null, trySendTo, trySendTo, SLICE, query, bumpBallot, txnId, full.partialTxn, full.executeAt, stableDeps, full.writes, full.result, informDurableOnDone, null); }); } else { Invariants.require(full.stableDeps.covers(trySendTo)); Invariants.require(txnId.isSystemTxn() || full.partialTxn.covers(trySendTo)); - node.coordinationAdapter(txnId, Recovery).persist(node, null, trySendTo, trySendTo, SLICE, route, bumpBallot, txnId, full.partialTxn, full.executeAt, full.stableDeps, full.writes, full.result, informDurableOnDone, null); + node.coordinationAdapter(txnId, Recovery).persist(node, null, trySendTo, trySendTo, SLICE, query, bumpBallot, txnId, full.partialTxn, full.executeAt, full.stableDeps, full.writes, full.result, informDurableOnDone, null); } } } @@ -222,24 +221,24 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> propagate = full; } - Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, null, propagate, (s, f) -> callback.accept(f == null ? propagate.toProgressToken() : null, f)); + Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query, reportTo, null, propagate, (s, f) -> callback.accept(f == null ? propagate.toProgressToken() : null, f)); break; } - Txn txn = full.partialTxn.reconstitute(route); + Txn txn = full.partialTxn.reconstitute(query); if (known.is(ApplyAtKnown) && known.outcome() == Apply) { Deps deps; Route<?> missingDeps; if (known.is(DepsKnown)) { - deps = full.stableDeps.reconstitute(route); - missingDeps = route.slice(0, 0); + deps = full.stableDeps.reconstitute(query); + missingDeps = query.slice(0, 0); } else { - Participants<?> hasDeps = full.map.knownFor(Known.DepsOnly, route); - missingDeps = route.without(hasDeps); + Participants<?> hasDeps = full.map.knownFor(Known.DepsOnly, query); + missingDeps = query.without(hasDeps); if (full.stableDeps == null) { Invariants.require(hasDeps.isEmpty()); @@ -252,9 +251,9 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> deps = new Deps(full.stableDeps.reconstitutePartial(hasDeps)); } } - LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, full.executeAt, full.partialTxn, deps, missingDeps, route, SHARE, route, callback, mergedDeps -> { + LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, full.executeAt, full.partialTxn, deps, missingDeps, query, SHARE, query, callback, mergedDeps -> { node.withEpochExact(full.executeAt.epoch(), node.agent(), t -> WrappableException.wrap(t), () -> { - node.coordinationAdapter(txnId, Recovery).persist(node, topologies, route, bumpBallot, txnId, txn, full.executeAt, mergedDeps, full.writes, full.result, (s, f) -> { + node.coordinationAdapter(txnId, Recovery).persist(node, topologies, query, bumpBallot, txnId, txn, full.executeAt, mergedDeps, full.writes, full.result, (s, f) -> { callback.accept(f == null ? APPLIED : null, f); }); }); @@ -262,7 +261,7 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> } else { - Recover.recover(node, txnId, txn, route, callback); + Recover.recover(node, txnId, txn, query, callback); } break; @@ -270,12 +269,12 @@ public class RecoverWithRoute extends CheckShards<FullRoute<?>> if (witnessedByInvalidation != null && witnessedByInvalidation.hasBeen(Status.PreCommitted)) throw illegalState("We previously invalidated, finding a status that should be recoverable"); - Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, null, full, (s, f) -> callback.accept(f == null ? INVALIDATED : null, f)); + Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) -> callback.accept(f == null ? INVALIDATED : null, f)); break; case Erased: // we should only be able to hit the Erased case if every participating shard has advanced past this TxnId, so we don't need to recover it - Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, null, full, (s, f) -> callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f)); + Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) -> callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f)); break; } } diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java index f426a6d5..56e763d7 100644 --- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -29,6 +29,7 @@ import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.Agent; import accord.api.ConfigurationService; import accord.local.Node; import accord.primitives.Ranges; @@ -45,6 +46,7 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class); protected final Node.Id localId; + protected final Agent agent; protected final EpochHistory epochs = createEpochHistory(); @@ -294,9 +296,10 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo } } - public AbstractConfigurationService(Node.Id localId) + public AbstractConfigurationService(Node.Id localId, Agent agent) { this.localId = localId; + this.agent = agent; } protected abstract EpochHistory createEpochHistory(); @@ -363,8 +366,9 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo if (lastReceived > 0 && topology.epoch() > lastReceived + 1) { logger.debug("Epoch {} received; waiting to receive {} before reporting", topology.epoch(), lastReceived + 1); - epochs.receiveFuture(lastReceived + 1).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync), executor()); - + epochs.receiveFuture(lastReceived + 1) + .invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync), executor()) + .begin(agent); fetchTopologyForEpoch(lastReceived + 1); return; } @@ -373,14 +377,18 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo if (lastAcked == 0 && lastReceived > 0) { logger.debug("Epoch {} received; waiting for {} to ack before reporting", topology.epoch(), epochs.minEpoch(), executor()); - epochs.acknowledgeFuture(epochs.minEpoch()).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync)); + epochs.acknowledgeFuture(epochs.minEpoch()) + .invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync)) + .begin(agent); return; } if (lastAcked > 0 && topology.epoch() > lastAcked + 1) { logger.debug("Epoch {} received; waiting for {} to ack before reporting", topology.epoch(), lastAcked + 1); - epochs.acknowledgeFuture(lastAcked + 1).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync), executor()); + epochs.acknowledgeFuture(lastAcked + 1) + .invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync), executor()) + .begin(agent); return; } @@ -453,9 +461,9 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo } } - public Minimal(Node.Id node) + public Minimal(Node.Id node, Agent agent) { - super(node); + super(node, agent); } @Override diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java index 8c1885d8..129d03e1 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java @@ -24,6 +24,8 @@ import javax.annotation.Nullable; import accord.coordinate.MaybeRecover; import accord.coordinate.Outcome; import accord.local.Command; +import accord.local.CommandStores; +import accord.local.CommandStores.IncludingSpecificStoreSelector; import accord.local.SafeCommand; import accord.local.SafeCommandStore; import accord.primitives.Status; @@ -153,9 +155,8 @@ abstract class HomeState extends WaitingState CallbackInvoker<ProgressToken, Outcome> invoker = invokeHomeCallback(instance, txnId, maxProgressToken, HomeState::recoverCallback); - long lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, txnId.epoch(), command.participants().hasTouched()); - long highEpoch = safeStore.ranges().earliestLaterEpochThatFullyCovers(command.executeAtIfKnownElseTxnId().epoch(), command.participants().hasTouched()); - instance.debugActive(MaybeRecover.maybeRecover(instance.node(), txnId, invalidIf(), command.route(), maxProgressToken, lowEpoch, highEpoch, invoker), invoker); + CommandStores.StoreSelector reportTo = new IncludingSpecificStoreSelector(safeStore.commandStore().id()); + instance.debugActive(MaybeRecover.maybeRecover(instance.node(), txnId, invalidIf(), command.route(), maxProgressToken, reportTo, invoker), invoker); set(safeStore, instance, ReadyToExecute, Querying); } @@ -178,6 +179,9 @@ abstract class HomeState extends WaitingState if (fail != null) { safeStore.agent().onCaughtException(fail, "Failed recovering " + state); + // re-save prior progress token + if (prevProgressToken != null && prevProgressToken.compareTo(command) > 0) + instance.saveProgressToken(command.txnId(), prevProgressToken); state.incrementHomeRetryCounter(); state.set(safeStore, instance, status, Queued); } diff --git a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java index ef973281..e5045636 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java @@ -25,7 +25,9 @@ import javax.annotation.Nullable; import accord.api.ProgressLog.BlockedUntil; import accord.coordinate.AsynchronousAwait; import accord.coordinate.FetchData; +import accord.coordinate.FetchSomeRoute; import accord.local.Command; +import accord.local.CommandStores.IncludingSpecificStoreSelector; import accord.local.CommandStores.RangesForEpoch; import accord.local.Node; import accord.local.SafeCommand; @@ -141,7 +143,7 @@ abstract class WaitingState extends BaseTxnState return waitingProgress(encodedState); } - final @Nonnull long waitingKeyTrackerBits() + final long waitingKeyTrackerBits() { return (encodedState >>> AWAIT_SHIFT) & (-1L >>> (64 - AWAIT_BITS)); } @@ -240,7 +242,7 @@ abstract class WaitingState extends BaseTxnState if (offset >= 3) { offset = 3; - lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, lowEpoch, command.maxContactable()); + lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(lowEpoch, command.maxContactable()); } encodedState = encodedState & ~(0x3L << AWAIT_EPOCH_SHIFT); encodedState |= ((long)offset) << AWAIT_EPOCH_SHIFT; @@ -263,7 +265,7 @@ abstract class WaitingState extends BaseTxnState long epoch = ranges.epochAtIndex(Math.max(0, i)) - 1; if (offset < 3) return epoch; - return safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, epoch, route); + return safeStore.ranges().latestEarlierEpochThatFullyCovers(epoch, route); } boolean hasNewLowEpoch(SafeCommandStore safeStore, TxnId txnId, long prevLowEpoch, long newLowEpoch) @@ -298,10 +300,10 @@ abstract class WaitingState extends BaseTxnState long readHighEpoch(SafeCommandStore safeStore, TxnId txnId, Route<?> route) { + RangesForEpoch ranges = safeStore.ranges(); int offset = (int) ((encodedState >>> (AWAIT_EPOCH_SHIFT + 2)) & 0x3); if (offset == 0) - return txnId.epoch(); - RangesForEpoch ranges = safeStore.ranges(); + return Math.max(txnId.epoch(), ranges.epochAtIndex(0)); long epoch = ranges.epochAtIndex(Math.max(0, ranges.floorIndex(txnId.epoch())) + offset); if (offset < 3) return epoch; @@ -389,17 +391,15 @@ abstract class WaitingState extends BaseTxnState TxnId txnId = safeCommand.txnId(); // first make sure we have enough information to obtain the command locally Timestamp executeAt = command.executeAtIfKnown(); - Participants<?> fetchKeys = Invariants.nonNull(command.maxContactable()); + Participants<?> maxContactable = Invariants.nonNull(command.maxContactable()); - if (!Route.isRoute(fetchKeys)) + if (!Route.isRoute(maxContactable)) { - long lowEpoch = updateLowEpoch(safeStore, txnId, command); - long highEpoch = updateHighEpoch(safeStore, txnId, blockedUntil, command, executeAt); - fetchRoute(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, fetchKeys); + fetchRoute(owner, blockedUntil, txnId, maxContactable); return; } - Route<?> route = Route.castToRoute(fetchKeys); + Route<?> route = Route.castToRoute(maxContactable); if (homeSatisfies().compareTo(blockedUntil) < 0) { // first wait until the homeKey has progressed to a point where it can answer our query; we don't expect our shards to know until then anyway @@ -418,7 +418,7 @@ abstract class WaitingState extends BaseTxnState { // we know it has been decided one way or the other by the home shard at least, so we attempt a fetch // including the home shard to get us to at least PreCommitted where we can safely wait on individual shards - fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, slicedRoute, slicedRoute.withHomeKey(), route); + fetch(owner, blockedUntil, txnId, executeAt, slicedRoute, slicedRoute.withHomeKey(), route); return; } @@ -430,7 +430,7 @@ abstract class WaitingState extends BaseTxnState if (awaitRoute.isHomeKeyOnlyRoute()) { // at this point we can switch to polling as we know someone has the relevant state - fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, slicedRoute, fetchRoute, route); + fetch(owner, blockedUntil, txnId, executeAt, slicedRoute, fetchRoute, route); return; } @@ -457,7 +457,7 @@ abstract class WaitingState extends BaseTxnState { // all of the shards we are awaiting have been processed and found at least one replica that has the state needed to answer our query // at this point we can switch to polling as we know someone has the relevant state - fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, slicedRoute, fetchRoute, route); + fetch(owner, blockedUntil, txnId, executeAt, slicedRoute, fetchRoute, route); return; } @@ -485,6 +485,7 @@ abstract class WaitingState extends BaseTxnState if (route == null) { Invariants.require(kind == FetchRoute); + Invariants.require(ready == null); state.retry(safeStore, safeCommand, owner, blockedUntil); return; } @@ -599,9 +600,12 @@ abstract class WaitingState extends BaseTxnState } } - static void fetchRouteCallback(SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog owner, TxnId txnId, BlockedUntil blockedUntil, FetchData.FetchResult fetchResult, Throwable fail) + static void fetchRouteCallback(SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog owner, TxnId txnId, BlockedUntil blockedUntil, Route<?> found, Throwable fail) { - fetchCallback(FetchRoute, safeStore, safeCommand, owner, txnId, blockedUntil, fetchResult, fail); + if (found == null) + found = safeCommand.current().route(); + Participants<?> ready = found != null ? found.slice(0, 0) : null; + awaitOrFetchCallback(FetchRoute, safeStore, safeCommand, owner, txnId, blockedUntil, ready, null, null, fail); } static void fetchCallback(SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog owner, TxnId txnId, BlockedUntil blockedUntil, FetchData.FetchResult fetchResult, Throwable fail) @@ -703,20 +707,20 @@ abstract class WaitingState extends BaseTxnState } } - static void fetchRoute(DefaultProgressLog owner, BlockedUntil blockedUntil, TxnId txnId, Timestamp executeAt, long lowEpoch, long highEpoch, Participants<?> fetchKeys) + static void fetchRoute(DefaultProgressLog owner, BlockedUntil blockedUntil, TxnId txnId, Participants<?> contactable) { // TODO (desired): fetch only the route // we MUSt allocate before calling withEpoch to register cancellation, as async - BiConsumer<FetchData.FetchResult, Throwable> invoker = invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchRouteCallback); - FetchData.fetch(blockedUntil.unblockedFrom.known, owner.node(), txnId, executeAt, fetchKeys, lowEpoch, highEpoch, invoker); + BiConsumer<Route<?>, Throwable> invoker = invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchRouteCallback); + FetchSomeRoute.fetchSomeRoute(owner.node(), txnId, contactable, new IncludingSpecificStoreSelector(owner.commandStore.id()), invoker); } - static void fetch(DefaultProgressLog owner, BlockedUntil blockedUntil, TxnId txnId, Timestamp executeAt, long lowEpoch, long highEpoch, Route<?> slicedRoute, Route<?> fetchRoute, Route<?> maxRoute) + static void fetch(DefaultProgressLog owner, BlockedUntil blockedUntil, TxnId txnId, Timestamp executeAt, Route<?> slicedRoute, Route<?> fetchRoute, Route<?> maxRoute) { Invariants.require(!slicedRoute.isEmpty()); // we MUSt allocate before calling withEpoch to register cancellation, as async BiConsumer<FetchData.FetchResult, Throwable> invoker = invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchCallback); - FetchData.fetchSpecific(blockedUntil.unblockedFrom.known, owner.node(), txnId, executeAt, fetchRoute, maxRoute, slicedRoute, lowEpoch, highEpoch, invoker); + FetchData.fetchSpecific(blockedUntil.unblockedFrom.known, owner.node(), txnId, executeAt, fetchRoute, maxRoute, slicedRoute, new IncludingSpecificStoreSelector(owner.commandStore.id()), invoker); } void awaitHomeKey(DefaultProgressLog owner, BlockedUntil blockedUntil, TxnId txnId, Timestamp executeAt, Route<?> route) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 0bbc4bbf..e7e457d6 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -20,6 +20,8 @@ package accord.local; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -27,7 +29,6 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -52,7 +53,6 @@ import accord.primitives.EpochSupplier; import accord.primitives.Participants; import accord.primitives.Range; import accord.primitives.Ranges; -import accord.primitives.Route; import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -73,6 +73,7 @@ import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResults; import accord.utils.async.Cancellable; import org.agrona.collections.Hashing; +import org.agrona.collections.Int2IntHashMap; import org.agrona.collections.Int2ObjectHashMap; import static accord.api.ConfigurationService.EpochReady.done; @@ -89,6 +90,158 @@ public abstract class CommandStores @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(CommandStores.class); + public interface LatentStoreSelector + { + StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants<?> participants); + + class StandardLatentStoreSelector implements LatentStoreSelector + { + private static final StandardLatentStoreSelector INSTANCE = new StandardLatentStoreSelector(); + + @Override + public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants<?> participants) + { + return snapshot -> StoreFinder.find(snapshot, participants) + .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()) + .iterator(snapshot); + } + } + + static LatentStoreSelector standard() + { + return StandardLatentStoreSelector.INSTANCE; + } + } + + public interface StoreSelector extends LatentStoreSelector + { + default StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants<?> participants) { return this; } + Iterator<CommandStore> select(Snapshot snapshot); + } + + public static class IncludingSpecificStoreSelector implements StoreSelector + { + final int storeId; + + public IncludingSpecificStoreSelector(int storeId) + { + this.storeId = storeId; + } + + @Override + public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants<?> participants) + { + return snapshot -> { + StoreFinder finder = StoreFinder.find(snapshot, participants) + .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()); + finder.set(snapshot.byId.get(storeId)); + return finder.iterator(snapshot); + }; + } + + @Override + public Iterator<CommandStore> select(Snapshot snapshot) + { + return Collections.singletonList(snapshot.byId(storeId)).iterator(); + } + } + + + public static class StoreFinder extends SimpleBitSet implements IndexedQuadConsumer<Object, Object, Object, Object>, IndexedRangeQuadConsumer<Object, Object, Object, Object> + { + final int[] indexMap; + + private StoreFinder(int size, int[] indexMap) + { + super(size); + this.indexMap = indexMap; + } + + public StoreFinder(Snapshot snapshot) + { + this(snapshot.shards.length, snapshot.indexForRange); + } + + public static StoreSelector selector(Unseekables<?> unseekables, long minEpoch, long maxEpoch) + { + return snapshot -> { + StoreFinder finder = StoreFinder.find(snapshot, unseekables); + finder.filter(snapshot, unseekables, minEpoch, maxEpoch); + return finder.iterator(snapshot); + }; + } + + public static StoreFinder find(Snapshot snapshot, Unseekables<?> unseekables) + { + StoreFinder finder = new StoreFinder(snapshot); + switch (unseekables.domain()) + { + default: throw new UnhandledEnum(unseekables.domain()); + case Range: + { + int minIndex = 0; + for (Range range : (AbstractRanges)unseekables) + minIndex = snapshot.lookupByRange.forEachRange(range, finder, finder, null, null, null, null, minIndex); + break; + } + case Key: + { + int minIndex = 0; + for (RoutingKey key : (AbstractUnseekableKeys)unseekables) + minIndex = snapshot.lookupByRange.forEachKey(key, finder, finder, null, null, null, null, minIndex); + break; + } + } + return finder; + } + + public StoreFinder filter(Snapshot snapshot, Unseekables<?> unseekables, long minEpoch, long maxEpoch) + { + for (int i = firstSetBit(); i >= 0 ; i = nextSetBit(i + 1, -1)) + { + ShardHolder shard = snapshot.shards[i]; + Ranges shardRanges = shard.ranges().allBetween(minEpoch, maxEpoch); + if (shardRanges != shard.ranges.all() && !shardRanges.intersects(unseekables)) + unset(i); + } + return this; + } + + public Iterator<CommandStore> iterator(Snapshot snapshot) + { + return new Iterator<>() + { + int i = firstSetBit(); + @Override + public boolean hasNext() + { + return i >= 0; + } + + @Override + public CommandStore next() + { + CommandStore next = snapshot.shards[i].store; + i = nextSetBit(i + 1, -1); + return next; + } + }; + } + + @Override + public void accept(Object p1, Object p2, Object p3, Object p4, int index) + { + set(indexMap[index]); + } + + @Override + public void accept(Object p1, Object p2, Object p3, Object p4, int fromIndex, int toIndex) + { + for (int i = fromIndex ; i < toIndex ; ++i) + set(indexMap[i]); + } + } + public interface Factory { CommandStores create(NodeCommandStoreService time, @@ -130,7 +283,7 @@ public abstract class CommandStores } } - protected static class ShardHolder + public static class ShardHolder { public final CommandStore store; RangesForEpoch ranges; @@ -360,22 +513,27 @@ public abstract class CommandStores return Math.max(sinceEpoch, epochs[0]); } - public long latestEarlierEpochThatFullyCovers(SafeCommandStore safeStore, long beforeEpoch, Unseekables<?> keysOrRanges) + public long latestEarlierEpochThatFullyCovers(long beforeEpoch, Unseekables<?> keysOrRanges) { int i = ceilIndex(beforeEpoch); - if (i == 0 || i <= epochs.length) + if (i == 0) return beforeEpoch; + long latest = beforeEpoch; - Ranges existing = i >= ranges.length ? Ranges.EMPTY : ranges[i]; - long minEpoch = safeStore.node().topology().minEpoch(); - while (--i >= 0 && minEpoch < epochs[i]) + Ranges existing = Ranges.EMPTY; + long next = beforeEpoch; + if (i < epochs.length) + { + existing = ranges[i]; + next = Math.min(next, epochs[i]); + } + while (--i >= 0) { if (ranges[i].without(existing).intersects(keysOrRanges)) - latest = epochs[i + 1] - 1; + latest = next - 1; existing = existing.with(ranges[i]); + next = epochs[i]; } - if (latest < beforeEpoch) - latest = Math.max(latest, safeStore.node().topology().minEpoch()); return latest; } @@ -398,22 +556,23 @@ public abstract class CommandStores current = toLoad; } - protected static class Snapshot extends Journal.TopologyUpdate + public static class Snapshot extends Journal.TopologyUpdate implements Iterable<ShardHolder> { - public final ShardHolder[] shards; - public final Int2ObjectHashMap<CommandStore> byId; + final ShardHolder[] shards; + final Int2IntHashMap byId; private final int[] indexForRange; - public final SearchableRangeList lookupByRange; + final SearchableRangeList lookupByRange; public Snapshot(ShardHolder[] shards, Topology local, Topology global) { super(asMap(shards), local, global); this.shards = shards; - this.byId = new Int2ObjectHashMap<>(shards.length, Hashing.DEFAULT_LOAD_FACTOR, true); + this.byId = new Int2IntHashMap(shards.length, Hashing.DEFAULT_LOAD_FACTOR, -1); int count = 0; - for (ShardHolder shard : shards) + for (int i = 0 ; i < shards.length ; ++i) { - byId.put(shard.store.id(), shard.store); + ShardHolder shard = shards[i]; + byId.put(shard.store.id(), i); count += shard.ranges.all().size(); } class RangeAndIndex @@ -461,6 +620,17 @@ public abstract class CommandStores commandStores.put(shard.store.id, shard.ranges); return commandStores; } + + public CommandStore byId(int id) + { + return shards[byId.get(id)].store; + } + + @Override + public Iterator<ShardHolder> iterator() + { + return Arrays.asList(shards).iterator(); + } } final StoreSupplier supplier; @@ -666,7 +836,7 @@ public abstract class CommandStores private AsyncChain<Void> forEach(PreLoadContext context, Unseekables<?> keys, long minEpoch, long maxEpoch, Consumer<SafeCommandStore> forEach, boolean matchesMultiple) { - return this.mapReduce(context, keys, minEpoch, maxEpoch, new MapReduce<SafeCommandStore, Void>() + return this.mapReduce(context, keys, minEpoch, maxEpoch, new MapReduce<>() { @Override public Void apply(SafeCommandStore in) @@ -692,6 +862,22 @@ public abstract class CommandStores }); } + public AsyncChain<Void> forEach(PreLoadContext context, StoreSelector selector, Consumer<SafeCommandStore> forEach) + { + return this.mapReduce(context, selector, new MapReduce<>() + { + @Override + public Void apply(SafeCommandStore in) + { + forEach.accept(in); + return null; + } + @Override + public Void reduce(Void o1, Void o2) { return null; } + @Override public String toString() { return forEach.getClass().getName(); } + }); + } + /** * See {@link #mapReduceConsume(PreLoadContext, Unseekables, long, long, MapReduceConsume)} */ @@ -715,79 +901,33 @@ public abstract class CommandStores return reduced.begin(mapReduceConsume); } + protected <O> Cancellable mapReduceConsume(PreLoadContext context, StoreSelector selector, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume) + { + AsyncChain<O> reduced = mapReduce(context, selector, mapReduceConsume); + return reduced.begin(mapReduceConsume); + } + public <O> Cancellable mapReduceConsume(PreLoadContext context, IntStream commandStoreIds, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume) { AsyncChain<O> reduced = mapReduce(context, commandStoreIds, mapReduceConsume); return reduced.begin(mapReduceConsume); } - private static class StoreSelector extends SimpleBitSet implements IndexedQuadConsumer<Object, Object, Object, Object>, IndexedRangeQuadConsumer<Object, Object, Object, Object> + // TODO (required): as we get more tables this will become expensive to allocate; we need to index first by prefix + public <O> AsyncChain<O> mapReduce(PreLoadContext context, Unseekables<?> unseekables, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, O> mapReduce) { - final int[] indexMap; - - private StoreSelector(int size, int[] indexMap) - { - super(size); - this.indexMap = indexMap; - } - - public StoreSelector(Snapshot snapshot) - { - this(snapshot.shards.length, snapshot.indexForRange); - } - - static StoreSelector select(Snapshot snapshot, Unseekables<?> unseekables) - { - StoreSelector selector = new StoreSelector(snapshot); - switch (unseekables.domain()) - { - default: throw new UnhandledEnum(unseekables.domain()); - case Range: - { - int minIndex = 0; - for (Range range : (AbstractRanges)unseekables) - minIndex = snapshot.lookupByRange.forEachRange(range, selector, selector, null, null, null, null, minIndex); - break; - } - case Key: - { - int minIndex = 0; - for (RoutingKey key : (AbstractUnseekableKeys)unseekables) - minIndex = snapshot.lookupByRange.forEachKey(key, selector, selector, null, null, null, null, minIndex); - break; - } - } - return selector; - } - - @Override - public void accept(Object p1, Object p2, Object p3, Object p4, int index) - { - set(indexMap[index]); - } - - @Override - public void accept(Object p1, Object p2, Object p3, Object p4, int fromIndex, int toIndex) - { - for (int i = fromIndex ; i < toIndex ; ++i) - set(indexMap[i]); - } + return mapReduce(context, StoreFinder.selector(unseekables, minEpoch, maxEpoch), mapReduce); } - public <O> AsyncChain<O> mapReduce(PreLoadContext context, Unseekables<?> unseekables, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, O> mapReduce) + public <O> AsyncChain<O> mapReduce(PreLoadContext context, StoreSelector selector, MapReduce<? super SafeCommandStore, O> mapReduce) { AsyncChain<O> chain = null; BiFunction<O, O, O> reducer = mapReduce::reduce; Snapshot snapshot = current; - StoreSelector selector = StoreSelector.select(snapshot, unseekables); - for (int i = selector.nextSetBit(0, -1); i >= 0 ; i = selector.nextSetBit(i + 1, -1)) + Iterator<CommandStore> stores = selector.select(snapshot); + while (stores.hasNext()) { - ShardHolder shard = snapshot.shards[i]; - Ranges shardRanges = shard.ranges().allBetween(minEpoch, maxEpoch); - if (shardRanges != shard.ranges.all() && !shardRanges.intersects(unseekables)) - continue; - - AsyncChain<O> next = shard.store.build(context, mapReduce); + AsyncChain<O> next = stores.next().build(context, mapReduce); chain = chain != null ? AsyncChains.reduce(chain, next, reducer) : next; } return chain == null ? AsyncChains.success(null) : chain; @@ -811,17 +951,7 @@ public abstract class CommandStores protected <O> AsyncChain<O> mapReduce(PreLoadContext context, IntStream commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce) { - // TODO (low priority, efficiency): avoid using an array, or use a scratch buffer - int[] ids = commandStoreIds.toArray(); - AsyncChain<O> chain = null; - BiFunction<O, O, O> reducer = mapReduce::reduce; - for (int id : ids) - { - CommandStore commandStore = forId(id); - AsyncChain<O> next = commandStore.build(context, mapReduce); - chain = chain != null ? AsyncChains.reduce(chain, next, reducer) : next; - } - return chain == null ? AsyncChains.success(null) : chain; + return mapReduce(context, snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce); } public <O> Cancellable mapReduceConsume(PreLoadContext context, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume) @@ -900,8 +1030,8 @@ public abstract class CommandStores public CommandStore select(Participants<?> participants) { Snapshot snapshot = current; - StoreSelector selector = StoreSelector.select(snapshot, participants); - int i = selector.firstSetBit(); + StoreFinder stores = StoreFinder.find(snapshot, participants); + int i = stores.firstSetBit(); if (i < 0) return any(); return snapshot.shards[i].store; } @@ -938,7 +1068,7 @@ public abstract class CommandStores public CommandStore forId(int id) { Snapshot snapshot = current; - return snapshot.byId.get(id); + return snapshot.shards[snapshot.byId.get(id)].store; } public int[] ids() diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index f0cd7360..25ac1688 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -59,6 +59,7 @@ import accord.coordinate.CoordinationAdapter.Factory.Kind; import accord.coordinate.Infer.InvalidIf; import accord.coordinate.Outcome; import accord.coordinate.RecoverWithRoute; +import accord.local.CommandStores.StoreSelector; import accord.local.durability.DurabilityService; import accord.messages.Callback; import accord.messages.Reply; @@ -522,6 +523,11 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ return commandStores.forEach(context, unseekables, minEpoch, maxEpoch, forEach); } + public AsyncChain<Void> forEachLocal(PreLoadContext context, StoreSelector selector, Consumer<SafeCommandStore> forEach) + { + return commandStores.forEach(context, selector, forEach); + } + public AsyncChain<Void> forEachLocalSince(PreLoadContext context, Unseekables<?> unseekables, Timestamp since, Consumer<SafeCommandStore> forEach) { return commandStores.forEach(context, unseekables, since.epoch(), Long.MAX_VALUE, forEach); @@ -552,6 +558,11 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ return commandStores.mapReduceConsume(context, keys, minEpoch, maxEpoch, mapReduceConsume); } + public <T> Cancellable mapReduceConsumeLocal(PreLoadContext context, StoreSelector selector, MapReduceConsume<SafeCommandStore, T> mapReduceConsume) + { + return commandStores.mapReduceConsume(context, selector, mapReduceConsume); + } + public <T> Cancellable mapReduceConsumeAllLocal(PreLoadContext context, MapReduceConsume<SafeCommandStore, T> mapReduceConsume) { return commandStores.mapReduceConsume(context, mapReduceConsume); @@ -856,7 +867,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ } } - public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf invalidIf, FullRoute<?> route, long reportLowEpoch, long reportHighEpoch) + public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf invalidIf, FullRoute<?> route, StoreSelector reportTo) { { AsyncResult<? extends Outcome> result = coordinating.get(txnId); @@ -866,7 +877,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ AsyncResult<Outcome> result = withEpochExact(txnId.epoch(), () -> { RecoverFuture<Outcome> future = new RecoverFuture<>(); - RecoverWithRoute.recover(this, txnId, invalidIf, route, null, reportLowEpoch, reportHighEpoch, future); + RecoverWithRoute.recover(this, txnId, invalidIf, route, null, reportTo, future); return future; }).beginAsResult(); coordinating.putIfAbsent(txnId, result); diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java b/accord-core/src/main/java/accord/local/StoreParticipants.java index 5f286beb..b401d674 100644 --- a/accord-core/src/main/java/accord/local/StoreParticipants.java +++ b/accord-core/src/main/java/accord/local/StoreParticipants.java @@ -423,7 +423,7 @@ public class StoreParticipants public final StoreParticipants supplement(Route<?> route) { - route = Route.merge(this.route(), (Route)route); + route = this.route != null ? this.route.with((Route)route) : route.with((Participants) owns); if (route == this.route()) return this; return create(route, owns(), executes(), waitsOn(), touches(), hasTouched()); } @@ -688,7 +688,7 @@ public class StoreParticipants private static long computeCoveringEpoch(SafeCommandStore safeStore, long txnIdEpoch, Participants<?> participants) { - long lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, txnIdEpoch, participants); + long lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(txnIdEpoch, participants); return Math.min(lowEpoch, txnIdEpoch); } } diff --git a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java index 9f209e52..8c28a445 100644 --- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java +++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java @@ -317,7 +317,7 @@ public class DurabilityQueue long retryDelay = node.agent().retryDurabilityDelay(node, attempt, MICROSECONDS); Invariants.require(retryDelay > 0); if (request != null) logger.info("{}: Retrying durability for {} requested by {} in {}s.", syncPoint.syncId, syncPoint.route.toRanges(), request.requestedBy, String.format("%.3f", retryDelay/1000_000.0)); - else logger.debug("{}: Retrying durability for {} in {}.", syncPoint.syncId, syncPoint.route.toRanges(), String.format("%.3f", retryDelay/1000_000.0)); + else logger.debug("{}: Retrying durability for {} in {}s.", syncPoint.syncId, syncPoint.route.toRanges(), String.format("%.3f", retryDelay/1000_000.0)); node.scheduler().selfRecurring(() -> submit(syncPoint, request, attempt), retryDelay, MICROSECONDS); } diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java b/accord-core/src/main/java/accord/messages/CheckStatus.java index 70d0e10c..688b2f91 100644 --- a/accord-core/src/main/java/accord/messages/CheckStatus.java +++ b/accord-core/src/main/java/accord/messages/CheckStatus.java @@ -321,7 +321,7 @@ public class CheckStatus extends AbstractRequest<CheckStatus.CheckStatusReply> return null; } - public CheckStatusOk finish(Unseekables<?> queried, Unseekables<?> propagatingTo, Unseekables<?> routeOrParticipants, WithQuorum withQuorum, InvalidIf previouslyKnownToBeInvalidIf) + public CheckStatusOk finish(Unseekables<?> queried, Unseekables<?> requestedFor, Unseekables<?> routeOrParticipants, WithQuorum withQuorum, InvalidIf previouslyKnownToBeInvalidIf) { CheckStatusOk finished = this; if (withQuorum == HasQuorum) @@ -343,7 +343,7 @@ public class CheckStatus extends AbstractRequest<CheckStatus.CheckStatusReply> } Known validForAll = map.computeValidForAll(routeOrParticipants); - if (withQuorum == HasQuorum && (invalidIf == IfUncommitted || previouslyKnownToBeInvalidIf == IfUncommitted) && queried.containsAll(propagatingTo)) + if (withQuorum == HasQuorum && (invalidIf == IfUncommitted || previouslyKnownToBeInvalidIf == IfUncommitted) && queried.containsAll(requestedFor)) { Known minKnown = finished.minMaxKnown(queried), maxKnown = finished.maxKnown(queried); InvalidIf invalidIf = this.invalidIf.inferWithQuorum(minKnown, maxKnown); @@ -534,9 +534,9 @@ public class CheckStatus extends AbstractRequest<CheckStatus.CheckStatusReply> this.result = result; } - public CheckStatusOkFull finish(Unseekables<?> queried, Unseekables<?> propagatingTo, Unseekables<?> routeOrParticipants, WithQuorum withQuorum, InvalidIf previouslyKnownToBeInvalidIf) + public CheckStatusOkFull finish(Unseekables<?> queried, Unseekables<?> requestedFor, Unseekables<?> routeOrParticipants, WithQuorum withQuorum, InvalidIf previouslyKnownToBeInvalidIf) { - return (CheckStatusOkFull) super.finish(queried, propagatingTo, routeOrParticipants, withQuorum, previouslyKnownToBeInvalidIf); + return (CheckStatusOkFull) super.finish(queried, requestedFor, routeOrParticipants, withQuorum, previouslyKnownToBeInvalidIf); } public CheckStatusOkFull merge(@Nonnull Route<?> route) diff --git a/accord-core/src/main/java/accord/messages/Propagate.java b/accord-core/src/main/java/accord/messages/Propagate.java index fd77d790..3b02d3ae 100644 --- a/accord-core/src/main/java/accord/messages/Propagate.java +++ b/accord-core/src/main/java/accord/messages/Propagate.java @@ -24,6 +24,8 @@ import accord.coordinate.FetchData.FetchResult; import accord.coordinate.Infer.InvalidIf; import accord.local.Cleanup; import accord.local.Command; +import accord.local.CommandStores.LatentStoreSelector; +import accord.local.CommandStores.StoreSelector; import accord.local.Commands; import accord.local.Node; import accord.local.PreLoadContext; @@ -80,7 +82,7 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt final Node node; final TxnId txnId; final Route<?> route; - final Unseekables<?> propagateTo; + final Unseekables<?> requestedFor; final Known target; final InvalidIf invalidIf; @@ -96,7 +98,6 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt final WithQuorum withQuorum; @Nullable final PartialTxn partialTxn; @Nullable final PartialDeps stableDeps; - final long lowEpoch, highEpoch; @Nullable final Timestamp committedExecuteAt; @Nullable final Writes writes; @Nullable final Result result; @@ -108,7 +109,7 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt Propagate( Node node, TxnId txnId, Route<?> route, - Unseekables<?> propagateTo, Known target, InvalidIf invalidIf, + Unseekables<?> requestedFor, Known target, InvalidIf invalidIf, SaveStatus maxKnowledgeSaveStatus, SaveStatus maxSaveStatus, Ballot promised, Ballot acceptedOrCommitted, @@ -117,8 +118,6 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt KnownMap known, WithQuorum withQuorum, @Nullable PartialTxn partialTxn, @Nullable PartialDeps stableDeps, - long lowEpoch, - long highEpoch, @Nullable Timestamp committedExecuteAt, @Nullable Writes writes, @Nullable Result result, @@ -127,7 +126,7 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt this.node = node; this.txnId = txnId; this.route = route; - this.propagateTo = propagateTo; + this.requestedFor = requestedFor; this.target = target; this.invalidIf = invalidIf; this.maxKnowledgeSaveStatus = maxKnowledgeSaveStatus; @@ -140,19 +139,17 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt this.withQuorum = withQuorum; this.partialTxn = partialTxn; this.stableDeps = stableDeps; - this.lowEpoch = lowEpoch; - this.highEpoch = highEpoch; this.committedExecuteAt = committedExecuteAt; this.writes = writes; this.result = result; this.callback = callback; } - public static void propagate(Node node, TxnId txnId, InvalidIf previouslyKnownToBeInvalidIf, long sourceEpoch, long lowEpoch, long highEpoch, WithQuorum withQuorum, Route<?> queried, Unseekables<?> propagateTo, @Nullable Known target, CheckStatusOkFull full, BiConsumer<? super FetchResult, Throwable> callback) + public static void propagate(Node node, TxnId txnId, InvalidIf previouslyKnownToBeInvalidIf, long sourceEpoch, WithQuorum withQuorum, Route<?> queried, Participants<?> requestedFor, LatentStoreSelector reportTo, @Nullable Known target, CheckStatusOkFull full, BiConsumer<? super FetchResult, Throwable> callback) { if (full.maxKnowledgeSaveStatus.status == NotDefined && full.invalidIf == NotKnownToBeInvalid) { - callback.accept(new FetchResult(Nothing, propagateTo.slice(0, 0), propagateTo), null); + callback.accept(new FetchResult(Nothing, requestedFor.slice(0, 0), requestedFor), null); return; } @@ -160,18 +157,19 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt // TODO (required): consider and document whether it is safe to infer that we are stale if we have not received responses from all shards we know of // (in principle, we should at least require responses from our own shard, and the home shard if we know it); if we only hear from a remote shard it may have fully Erased - full = full.finish(queried, propagateTo, queried.with((Unseekables) propagateTo), withQuorum, previouslyKnownToBeInvalidIf); + full = full.finish(queried, requestedFor, queried.with((Unseekables) requestedFor), withQuorum, previouslyKnownToBeInvalidIf); Route<?> route = Invariants.nonNull(full.route); + Timestamp committedExecuteAt = full.executeAtIfKnown(); Propagate propagate = - new Propagate(node, txnId, route, propagateTo, target, full.invalidIf, full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.maxPromised, full.acceptedOrCommitted, full.durability, full.homeKey, full.map, withQuorum, full.partialTxn, full.stableDeps, lowEpoch, highEpoch, full.executeAtIfKnown(), full.writes, full.result, callback); + new Propagate(node, txnId, route, requestedFor, target, full.invalidIf, full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.maxPromised, full.acceptedOrCommitted, full.durability, full.homeKey, full.map, withQuorum, full.partialTxn, full.stableDeps, committedExecuteAt, full.writes, full.result, callback); - if (full.executeAt != null && full.executeAt.epoch() > highEpoch) - highEpoch = full.executeAt.epoch(); - long untilEpoch = full.executeAt == null ? highEpoch : Math.max(highEpoch, full.executeAt.epoch()); + long untilEpoch = txnId.epoch(); + if (committedExecuteAt != null) + untilEpoch = Math.max(untilEpoch, committedExecuteAt.epoch()); - Route<?> finalRoute = queried; - node.withEpochAtLeast(highEpoch, propagate, () -> node.mapReduceConsumeLocal(propagate, finalRoute, lowEpoch, untilEpoch, propagate)); + StoreSelector selector = reportTo.refine(txnId, committedExecuteAt, route); + node.withEpochAtLeast(untilEpoch, propagate, () -> node.mapReduceConsumeLocal(propagate, selector, propagate)); } @Override @@ -184,8 +182,8 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt public Void apply(SafeCommandStore safeStore) { long executeAtEpoch = committedExecuteAt == null ? txnId.epoch() : committedExecuteAt.epoch(); - long lowEpoch = Math.min(this.lowEpoch, StoreParticipants.computePropagateLowEpoch(safeStore, txnId, route)); - StoreParticipants participants = StoreParticipants.update(safeStore, route, lowEpoch, txnId, executeAtEpoch, highEpoch, committedExecuteAt != null); + long lowEpoch = StoreParticipants.computePropagateLowEpoch(safeStore, txnId, route); + StoreParticipants participants = StoreParticipants.update(safeStore, route, lowEpoch, txnId, executeAtEpoch, executeAtEpoch, committedExecuteAt != null); // TODO (expected): can we come up with a better more universal pattern for avoiding updating a command we don't intersect with? // ideally integrated with safeStore.get() if (participants.owns().isEmpty() && safeStore.ifInitialised(txnId) == null) @@ -198,7 +196,7 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt if (participants.executes() == null && executeAtIfKnown != null) { executeAtEpoch = executeAtIfKnown.epoch(); - participants = StoreParticipants.update(safeStore, route, lowEpoch, txnId, executeAtEpoch, highEpoch, true); + participants = StoreParticipants.update(safeStore, route, lowEpoch, txnId, executeAtEpoch, executeAtEpoch, true); } switch (command.saveStatus().phase) @@ -347,9 +345,9 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt { FetchResult current = fetchResult; if (current == null) - return new FetchResult(Nothing, propagateTo.slice(0, 0), propagateTo); + return new FetchResult(Nothing, requestedFor.slice(0, 0), requestedFor); - Unseekables<?> missed = propagateTo.without(current.achievedTarget); + Unseekables<?> missed = requestedFor.without(current.achievedTarget); if (missed.isEmpty()) return current; diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResult.java b/accord-core/src/main/java/accord/utils/async/AsyncResult.java index fae3bc9c..de4ccd6a 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncResult.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncResult.java @@ -27,6 +27,8 @@ import static accord.utils.Invariants.illegalState; /** * Handle for async computations that supports multiple listeners and registering * listeners after the computation has started + * + * TODO (expected): by default AsyncResult methods should be started immediately; should introduce newChain() for building a chain. */ public interface AsyncResult<V> extends AsyncChain<V> { diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java index ce6f18e1..8ad08799 100644 --- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java +++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java @@ -51,7 +51,7 @@ public class BurnTestConfigurationService extends AbstractConfigurationService.M public BurnTestConfigurationService(Node.Id node, AgentExecutor executor, Supplier<RandomSource> randomSupplier, Topology topology, Function<Node.Id, Node> lookup, TopologyUpdates topologyUpdates) { - super(node); + super(node, executor.agent()); this.executor = executor; this.randomSupplier = randomSupplier; this.lookup = lookup; @@ -176,13 +176,9 @@ public class BurnTestConfigurationService extends AbstractConfigurationService.M { synchronized (this) { - if (epoch <= maxRequestedEpoch) - return; - - maxRequestedEpoch = epoch; + while (maxRequestedEpoch < epoch) + pendingEpochs.computeIfAbsent(++maxRequestedEpoch, FetchTopology::new); } - - pendingEpochs.computeIfAbsent(epoch, FetchTopology::new); } @Override diff --git a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java index a26c2181..37fdc0cf 100644 --- a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java +++ b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import accord.api.Agent; import accord.api.ConfigurationService.EpochReady; import accord.primitives.Ranges; import accord.utils.SortedArrays.SortedArrayList; @@ -130,9 +131,9 @@ public class AbstractConfigurationServiceTest final Set<Long> syncStarted = new HashSet<>(); final Set<Long> epochsFetched = new HashSet<>(); - public TestableConfigurationService(Id node) + public TestableConfigurationService(Id node, Agent agent) { - super(node); + super(node, agent); } @Override @@ -206,7 +207,7 @@ public class AbstractConfigurationServiceTest @Test public void getTopologyTest() { - TestableConfigurationService service = new TestableConfigurationService(ID1); + TestableConfigurationService service = new TestableConfigurationService(ID1, new TestAgent()); TestListener listener = new TestListener(service, false); service.registerListener(listener); service.reportTopology(TOPOLOGY1); @@ -229,7 +230,7 @@ public class AbstractConfigurationServiceTest @Test public void loadAfterTruncate() { - TestableConfigurationService service = new TestableConfigurationService(ID1); + TestableConfigurationService service = new TestableConfigurationService(ID1, new TestAgent()); TestListener listener = new TestListener(service, false); service.registerListener(listener); service.reportTopology(TOPOLOGY3); @@ -248,7 +249,7 @@ public class AbstractConfigurationServiceTest @Test public void awaitOutOfOrderTopologies() { - TestableConfigurationService service = new TestableConfigurationService(ID1); + TestableConfigurationService service = new TestableConfigurationService(ID1, new TestAgent()); TestListener listener = new TestListener(service, false); service.registerListener(listener); diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 9fa044f8..51f3c1b0 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -103,7 +103,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread Snapshot current = current(); RangesForEpoch ranges = e.getValue(); CommandStore commandStore = null; - for (ShardHolder shard : current.shards) + for (ShardHolder shard : current) { if (shard.ranges().equals(ranges)) commandStore = shard.store; @@ -122,10 +122,10 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread { Snapshot current = current(); // These checks are only applicable to delayed command stores. - for (Integer id : current.byId.keySet()) + for (ShardHolder shard : current) { - CommandStore prev = current.byId.get(id); - CommandStore next = nextSnapshot.byId.get(id); + CommandStore prev = current.byId(shard.store.id()); + CommandStore next = nextSnapshot.byId(shard.store.id()); { RedundantBefore orig = prev.unsafeGetRedundantBefore(); RedundantBefore loaded = next.unsafeGetRedundantBefore(); @@ -468,7 +468,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread public List<DelayedCommandStore> unsafeStores() { List<DelayedCommandStore> stores = new ArrayList<>(); - for (ShardHolder holder : current().shards) + for (ShardHolder holder : current()) stores.add((DelayedCommandStore) holder.store); return stores; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
