This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk-tmp in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 22e47364cd7c4bc81724ce6d90352c34ecea12c9 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Thu Sep 25 10:21:46 2025 +0100 filter and record faulty in AbstractCoordination --- .../coordinate/AbstractCoordinatePreAccept.java | 7 ------ .../accord/coordinate/AbstractCoordination.java | 27 ++++++++++++++++++---- .../java/accord/coordinate/CollectLatestDeps.java | 19 ++++----------- .../main/java/accord/coordinate/Coordination.java | 11 +++------ .../java/accord/coordinate/ExecuteSyncPoint.java | 9 ++------ .../main/java/accord/coordinate/Invalidate.java | 12 ++-------- .../src/main/java/accord/coordinate/Persist.java | 12 ++-------- .../src/main/java/accord/coordinate/Propose.java | 12 ++-------- .../src/main/java/accord/coordinate/Stabilise.java | 15 ++++-------- .../accord/impl/list/ListFetchCoordinator.java | 10 ++++++++ 10 files changed, 53 insertions(+), 81 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java index dcb60592..6b7fe396 100644 --- a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java +++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java @@ -28,7 +28,6 @@ import accord.messages.Callback; import accord.primitives.FullRoute; import accord.primitives.TxnId; import accord.topology.Topologies; -import accord.utils.SortedList; import static accord.api.ProtocolModifiers.QuorumEpochIntersections; import static accord.topology.Topologies.SelectNodeOwnership.SHARE; @@ -75,10 +74,4 @@ abstract class AbstractCoordinatePreAccept<Result, Reply extends accord.messages { return CoordinationKind.PreAccept; } - - @Override - public SortedList<Id> nodes() - { - return topologies.nodes(); - } } diff --git a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java index d5e2da68..1d92fc79 100644 --- a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java +++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import accord.api.Tracing; import accord.coordinate.tracking.AbstractTracker; +import accord.coordinate.tracking.RequestStatus; import accord.local.Node; import accord.local.SequentialAsyncExecutor; import accord.messages.Callback; @@ -36,6 +37,7 @@ import accord.messages.Request; import accord.primitives.Participants; import accord.primitives.Route; import accord.primitives.TxnId; +import accord.topology.Topologies; import accord.utils.DebugMap; import accord.utils.Invariants; import accord.utils.SimpleBitSet; @@ -73,6 +75,8 @@ public abstract class AbstractCoordination<P extends Participants<?>, Result, Re abstract void onSuccessInternal(Node.Id from, int fromIndex, Reply reply); abstract void onFailureInternal(Node.Id from, int fromIndex, Throwable fail); void onSlowResponseInternal(Node.Id from) {} + public abstract @Nonnull AbstractTracker<?> tracker(); + public SortedList<Node.Id> nodes() { return nodes; } void recordOk(int fromIndex, Ok ok) { @@ -171,15 +175,28 @@ public abstract class AbstractCoordination<P extends Participants<?>, Result, Re void contact(Function<Node.Id, Request> request, @Nullable Predicate<Node.Id> include) { executor.executeMaybeImmediately(() -> { + AbstractTracker<?> tracker = tracker(); + Topologies topologies = tracker.topologies(); for (int i = 0; i < nodes.size() ; ++i) { Node.Id to = nodes.get(i); if (include == null || include.test(to)) { - Invariants.require(replyState[i] == null); - expectingReply.set(i); - replyState[i] = node.send(to, request.apply(to), executor, this); - Invariants.require(expectingReply.get(i) || replyState[i] == null); + if (topologies.isFaulty(to)) + { + if (RequestStatus.Failed == tracker.prerecordFailure(to)) + { + finishOnExaustion(); + return; + } + } + else + { + Invariants.require(replyState[i] == null); + expectingReply.set(i); + replyState[i] = node.send(to, request.apply(to), executor, this); + Invariants.require(expectingReply.get(i) || replyState[i] == null); + } } } }); @@ -340,7 +357,7 @@ public abstract class AbstractCoordination<P extends Participants<?>, Result, Re return kind().name() + ':' + txnId + " scope:" + scope() + " inflight:" + inflight() - + (tracker == null ? "" : " tracker:" + tracker.summariseTracker()) + + " tracker:" + tracker.summariseTracker() + (describe.isEmpty() ? "" : ' ' + describe) + (replies == null ? "" : " replies:" + summariseReplies(replies, 60)); } diff --git a/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java b/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java index c967c162..8ca9d936 100644 --- a/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java +++ b/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java @@ -24,6 +24,7 @@ import java.util.function.BiConsumer; import javax.annotation.Nullable; +import accord.coordinate.tracking.AbstractTracker; import accord.coordinate.tracking.QuorumTracker; import accord.local.Node; import accord.local.Node.Id; @@ -39,8 +40,6 @@ import accord.primitives.TxnId; import accord.primitives.Unseekables; import accord.topology.Topologies; import accord.utils.Invariants; -import accord.utils.SortedArrays.SortedArrayList; -import accord.utils.SortedList; import accord.utils.SortedListMap; import static accord.coordinate.tracking.RequestStatus.Failed; @@ -73,16 +72,8 @@ public class CollectLatestDeps extends AbstractCoordination<Route<?>, List<Lates @Override void start() { - SortedArrayList<Id> contact = tracker.filterAndRecordFaulty(); - if (contact == null) - { - finishOnExaustion(); - } - else - { - super.start(); - contact(to -> new GetLatestDeps(to, tracker.topologies(), scope, txnId, ballot, executeAt)); - } + super.start(); + contact(to -> new GetLatestDeps(to, tracker.topologies(), scope, txnId, ballot, executeAt)); } @Override @@ -125,8 +116,8 @@ public class CollectLatestDeps extends AbstractCoordination<Route<?>, List<Lates } @Override - public SortedList<Id> nodes() + public AbstractTracker<?> tracker() { - return tracker.nodes(); + return tracker; } } diff --git a/accord-core/src/main/java/accord/coordinate/Coordination.java b/accord-core/src/main/java/accord/coordinate/Coordination.java index 51c413fb..469c1346 100644 --- a/accord-core/src/main/java/accord/coordinate/Coordination.java +++ b/accord-core/src/main/java/accord/coordinate/Coordination.java @@ -18,6 +18,7 @@ package accord.coordinate; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import accord.coordinate.tracking.AbstractTracker; @@ -55,20 +56,14 @@ public interface Coordination } long coordinationId(); - TxnId txnId(); CoordinationKind kind(); Participants<?> scope(); + @Nullable AbstractTracker<?> tracker(); + @Nullable SortedList<Id> nodes(); default @Nullable Ballot ballot() { return null; } - default SortedList<Id> nodes() - { - AbstractTracker<?> tracker = tracker(); - return tracker == null ? null : tracker.nodes(); - } - default @Nullable AbstractTracker<?> tracker() { return null; } - default String describe() { return ""; } default @Nullable SortedList<Id> inflight() { return null; } diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java index c4962126..805bb830 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java @@ -139,16 +139,11 @@ public class ExecuteSyncPoint extends AbstractCoordination<FullRoute<?>, Durabil void start() { node.agent().coordinatorEvents().onExecuting(syncPoint.syncId, null, syncPoint.waitFor, null); - SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty(); // TODO (desired): special Apply message that doesn't resend deps if path=MEDIUM Txn txn = node.agent().emptySystemTxn(syncPoint.syncId.kind(), syncPoint.syncId.domain()); Result result = txn.result(syncPoint.syncId, syncPoint.executeAt, null); - if (contact == null) finishOnExaustion(); - else - { - super.start(); - contact(to -> new ApplyThenWaitUntilApplied(to, tracker.topologies(), syncPoint.executeAt, tracker.topologies().currentEpoch(), syncPoint.route, syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, null, result)); - } + super.start(); + contact(to -> new ApplyThenWaitUntilApplied(to, tracker.topologies(), syncPoint.executeAt, tracker.topologies().currentEpoch(), syncPoint.route, syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, null, result)); } @Override diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java b/accord-core/src/main/java/accord/coordinate/Invalidate.java index 6e4f134e..c26ed340 100644 --- a/accord-core/src/main/java/accord/coordinate/Invalidate.java +++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java @@ -94,16 +94,8 @@ public class Invalidate extends AbstractCoordination<Participants<?>, Outcome, I @Override void start() { - SortedArrays.SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty(); - if (contact == null) - { - finishOnExaustion(); - } - else - { - super.start(); - contact(to -> new BeginInvalidation(to, tracker.topologies(), txnId, scope, ballot)); - } + super.start(); + contact(to -> new BeginInvalidation(to, tracker.topologies(), txnId, scope, ballot)); } @Override diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java b/accord-core/src/main/java/accord/coordinate/Persist.java index 80edbc9d..cd57a087 100644 --- a/accord-core/src/main/java/accord/coordinate/Persist.java +++ b/accord-core/src/main/java/accord/coordinate/Persist.java @@ -132,16 +132,8 @@ public abstract class Persist extends AbstractCoordination<FullRoute<?>, Void, A node.agent().coordinatorEvents().onExecuted(txnId, ballot); // applyMinimal is used for transaction execution by the original coordinator so it's important to use // Node's Apply factory in case the factory has to do synchronous Apply. - SortedArrays.SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty(); - if (contact == null) - { - finishOnExaustion(); - } - else - { - super.start(); - contact(to -> factory.create(applyKind, to, tracker.topologies(), txnId, ballot, sendTo, txn, executeAt, stableDeps, writes, result, scope, flags.get(to))); - } + super.start(); + contact(to -> factory.create(applyKind, to, tracker.topologies(), txnId, ballot, sendTo, txn, executeAt, stableDeps, writes, result, scope, flags.get(to))); } @Override diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java index 1e0aec04..8d05206a 100644 --- a/accord-core/src/main/java/accord/coordinate/Propose.java +++ b/accord-core/src/main/java/accord/coordinate/Propose.java @@ -86,16 +86,8 @@ abstract class Propose<R> extends AbstractCoordination<FullRoute<?>, R, AcceptRe @Override void start() { - SortedArrays.SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty(); - if (contact == null) - { - finishOnExaustion(); - } - else - { - super.start(); - contact(to -> new Accept(to, tracker.topologies(), kind, ballot, txnId, scope, executeAt, deps, require != scope)); - } + super.start(); + contact(to -> new Accept(to, tracker.topologies(), kind, ballot, txnId, scope, executeAt, deps, require != scope)); } @Override diff --git a/accord-core/src/main/java/accord/coordinate/Stabilise.java b/accord-core/src/main/java/accord/coordinate/Stabilise.java index cb3c52ab..925d44da 100644 --- a/accord-core/src/main/java/accord/coordinate/Stabilise.java +++ b/accord-core/src/main/java/accord/coordinate/Stabilise.java @@ -74,18 +74,13 @@ public abstract class Stabilise<R> extends AbstractCoordination<FullRoute<?>, R, @Override void start() { - SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty(); + super.start(); + contact(to -> new Commit(CommitSlowPath, to, allTopologies, txnId, txn, scope, ballot, executeAt, stabiliseDeps)); if (allTopologies.size() > 1) - contact = contact.with(allTopologies.nodes().without(tracker.nodes()).without(allTopologies::isFaulty)); - - if (contact == null) - { - finishOnExaustion(); - } - else { - super.start(); - contact(to -> new Commit(CommitSlowPath, to, allTopologies, txnId, txn, scope, ballot, executeAt, stabiliseDeps)); + SortedArrayList<Node.Id> extra = allTopologies.nodes().without(tracker.nodes()).without(allTopologies::isFaulty); + for (Node.Id to : extra) + node.send(to, new Commit(CommitSlowPath, to, allTopologies, txnId, txn, scope, ballot, executeAt, stabiliseDeps)); } } diff --git a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java index e9c9731e..6f011c87 100644 --- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java +++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java @@ -20,8 +20,11 @@ package accord.impl.list; import java.util.function.Function; +import javax.annotation.Nullable; + import accord.api.Data; import accord.api.DataStore; +import accord.coordinate.tracking.AbstractTracker; import accord.impl.AbstractFetchCoordinator; import accord.local.CommandStore; import accord.local.Node; @@ -74,6 +77,13 @@ public class ListFetchCoordinator extends AbstractFetchCoordinator return new ListFetchRequest(sourceEpoch, syncId, ranges, partialDeps, partialTxn); } + @Nullable + @Override + public AbstractTracker<?> tracker() + { + return null; + } + static class ListFetchRequest extends FetchRequest { public ListFetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
