This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk-tmp
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 22e47364cd7c4bc81724ce6d90352c34ecea12c9
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Sep 25 10:21:46 2025 +0100

    filter and record faulty in AbstractCoordination
---
 .../coordinate/AbstractCoordinatePreAccept.java    |  7 ------
 .../accord/coordinate/AbstractCoordination.java    | 27 ++++++++++++++++++----
 .../java/accord/coordinate/CollectLatestDeps.java  | 19 ++++-----------
 .../main/java/accord/coordinate/Coordination.java  | 11 +++------
 .../java/accord/coordinate/ExecuteSyncPoint.java   |  9 ++------
 .../main/java/accord/coordinate/Invalidate.java    | 12 ++--------
 .../src/main/java/accord/coordinate/Persist.java   | 12 ++--------
 .../src/main/java/accord/coordinate/Propose.java   | 12 ++--------
 .../src/main/java/accord/coordinate/Stabilise.java | 15 ++++--------
 .../accord/impl/list/ListFetchCoordinator.java     | 10 ++++++++
 10 files changed, 53 insertions(+), 81 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
index dcb60592..6b7fe396 100644
--- 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
+++ 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
@@ -28,7 +28,6 @@ import accord.messages.Callback;
 import accord.primitives.FullRoute;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
-import accord.utils.SortedList;
 
 import static accord.api.ProtocolModifiers.QuorumEpochIntersections;
 import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
@@ -75,10 +74,4 @@ abstract class AbstractCoordinatePreAccept<Result, Reply 
extends accord.messages
     {
         return CoordinationKind.PreAccept;
     }
-
-    @Override
-    public SortedList<Id> nodes()
-    {
-        return topologies.nodes();
-    }
 }
diff --git 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
index d5e2da68..1d92fc79 100644
--- a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
+++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
 
 import accord.api.Tracing;
 import accord.coordinate.tracking.AbstractTracker;
+import accord.coordinate.tracking.RequestStatus;
 import accord.local.Node;
 import accord.local.SequentialAsyncExecutor;
 import accord.messages.Callback;
@@ -36,6 +37,7 @@ import accord.messages.Request;
 import accord.primitives.Participants;
 import accord.primitives.Route;
 import accord.primitives.TxnId;
+import accord.topology.Topologies;
 import accord.utils.DebugMap;
 import accord.utils.Invariants;
 import accord.utils.SimpleBitSet;
@@ -73,6 +75,8 @@ public abstract class AbstractCoordination<P extends 
Participants<?>, Result, Re
     abstract void onSuccessInternal(Node.Id from, int fromIndex, Reply reply);
     abstract void onFailureInternal(Node.Id from, int fromIndex, Throwable 
fail);
     void onSlowResponseInternal(Node.Id from) {}
+    public abstract @Nonnull AbstractTracker<?> tracker();
+    public SortedList<Node.Id> nodes() { return nodes; }
 
     void recordOk(int fromIndex, Ok ok)
     {
@@ -171,15 +175,28 @@ public abstract class AbstractCoordination<P extends 
Participants<?>, Result, Re
     void contact(Function<Node.Id, Request> request, @Nullable 
Predicate<Node.Id> include)
     {
         executor.executeMaybeImmediately(() -> {
+            AbstractTracker<?> tracker = tracker();
+            Topologies topologies = tracker.topologies();
             for (int i = 0; i < nodes.size() ; ++i)
             {
                 Node.Id to = nodes.get(i);
                 if (include == null || include.test(to))
                 {
-                    Invariants.require(replyState[i] == null);
-                    expectingReply.set(i);
-                    replyState[i] = node.send(to, request.apply(to), executor, 
this);
-                    Invariants.require(expectingReply.get(i) || replyState[i] 
== null);
+                    if (topologies.isFaulty(to))
+                    {
+                        if (RequestStatus.Failed == 
tracker.prerecordFailure(to))
+                        {
+                            finishOnExaustion();
+                            return;
+                        }
+                    }
+                    else
+                    {
+                        Invariants.require(replyState[i] == null);
+                        expectingReply.set(i);
+                        replyState[i] = node.send(to, request.apply(to), 
executor, this);
+                        Invariants.require(expectingReply.get(i) || 
replyState[i] == null);
+                    }
                 }
             }
         });
@@ -340,7 +357,7 @@ public abstract class AbstractCoordination<P extends 
Participants<?>, Result, Re
         return kind().name() + ':' + txnId
                + " scope:" + scope()
                + " inflight:" + inflight()
-               + (tracker == null ? "" : " tracker:" + 
tracker.summariseTracker())
+               + " tracker:" + tracker.summariseTracker()
                + (describe.isEmpty() ? "" : ' ' + describe)
                + (replies == null ? "" : " replies:" + 
summariseReplies(replies, 60));
     }
diff --git a/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java 
b/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java
index c967c162..8ca9d936 100644
--- a/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java
+++ b/accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java
@@ -24,6 +24,7 @@ import java.util.function.BiConsumer;
 
 import javax.annotation.Nullable;
 
+import accord.coordinate.tracking.AbstractTracker;
 import accord.coordinate.tracking.QuorumTracker;
 import accord.local.Node;
 import accord.local.Node.Id;
@@ -39,8 +40,6 @@ import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
 import accord.topology.Topologies;
 import accord.utils.Invariants;
-import accord.utils.SortedArrays.SortedArrayList;
-import accord.utils.SortedList;
 import accord.utils.SortedListMap;
 
 import static accord.coordinate.tracking.RequestStatus.Failed;
@@ -73,16 +72,8 @@ public class CollectLatestDeps extends 
AbstractCoordination<Route<?>, List<Lates
     @Override
     void start()
     {
-        SortedArrayList<Id> contact = tracker.filterAndRecordFaulty();
-        if (contact == null)
-        {
-            finishOnExaustion();
-        }
-        else
-        {
-            super.start();
-            contact(to -> new GetLatestDeps(to, tracker.topologies(), scope, 
txnId, ballot, executeAt));
-        }
+        super.start();
+        contact(to -> new GetLatestDeps(to, tracker.topologies(), scope, 
txnId, ballot, executeAt));
     }
 
     @Override
@@ -125,8 +116,8 @@ public class CollectLatestDeps extends 
AbstractCoordination<Route<?>, List<Lates
     }
 
     @Override
-    public SortedList<Id> nodes()
+    public AbstractTracker<?> tracker()
     {
-        return tracker.nodes();
+        return tracker;
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/Coordination.java 
b/accord-core/src/main/java/accord/coordinate/Coordination.java
index 51c413fb..469c1346 100644
--- a/accord-core/src/main/java/accord/coordinate/Coordination.java
+++ b/accord-core/src/main/java/accord/coordinate/Coordination.java
@@ -18,6 +18,7 @@
 
 package accord.coordinate;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import accord.coordinate.tracking.AbstractTracker;
@@ -55,20 +56,14 @@ public interface Coordination
     }
 
     long coordinationId();
-
     TxnId txnId();
     CoordinationKind kind();
     Participants<?> scope();
+    @Nullable AbstractTracker<?> tracker();
+    @Nullable SortedList<Id> nodes();
 
     default @Nullable Ballot ballot() { return null; }
 
-    default SortedList<Id> nodes()
-    {
-        AbstractTracker<?> tracker = tracker();
-        return tracker == null ? null : tracker.nodes();
-    }
-    default @Nullable AbstractTracker<?> tracker() { return null; }
-
     default String describe() { return ""; }
 
     default @Nullable SortedList<Id> inflight() { return null; }
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index c4962126..805bb830 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -139,16 +139,11 @@ public class ExecuteSyncPoint extends 
AbstractCoordination<FullRoute<?>, Durabil
     void start()
     {
         node.agent().coordinatorEvents().onExecuting(syncPoint.syncId, null, 
syncPoint.waitFor, null);
-        SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
         // TODO (desired): special Apply message that doesn't resend deps if 
path=MEDIUM
         Txn txn = node.agent().emptySystemTxn(syncPoint.syncId.kind(), 
syncPoint.syncId.domain());
         Result result = txn.result(syncPoint.syncId, syncPoint.executeAt, 
null);
-        if (contact == null) finishOnExaustion();
-        else
-        {
-            super.start();
-            contact(to -> new ApplyThenWaitUntilApplied(to, 
tracker.topologies(), syncPoint.executeAt, tracker.topologies().currentEpoch(), 
syncPoint.route, syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, 
null, result));
-        }
+        super.start();
+        contact(to -> new ApplyThenWaitUntilApplied(to, tracker.topologies(), 
syncPoint.executeAt, tracker.topologies().currentEpoch(), syncPoint.route, 
syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, null, result));
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java 
b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index 6e4f134e..c26ed340 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -94,16 +94,8 @@ public class Invalidate extends 
AbstractCoordination<Participants<?>, Outcome, I
     @Override
     void start()
     {
-        SortedArrays.SortedArrayList<Node.Id> contact = 
tracker.filterAndRecordFaulty();
-        if (contact == null)
-        {
-            finishOnExaustion();
-        }
-        else
-        {
-            super.start();
-            contact(to -> new BeginInvalidation(to, tracker.topologies(), 
txnId, scope, ballot));
-        }
+        super.start();
+        contact(to -> new BeginInvalidation(to, tracker.topologies(), txnId, 
scope, ballot));
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java 
b/accord-core/src/main/java/accord/coordinate/Persist.java
index 80edbc9d..cd57a087 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -132,16 +132,8 @@ public abstract class Persist extends 
AbstractCoordination<FullRoute<?>, Void, A
         node.agent().coordinatorEvents().onExecuted(txnId, ballot);
         // applyMinimal is used for transaction execution by the original 
coordinator so it's important to use
         // Node's Apply factory in case the factory has to do synchronous 
Apply.
-        SortedArrays.SortedArrayList<Node.Id> contact = 
tracker.filterAndRecordFaulty();
-        if (contact == null)
-        {
-            finishOnExaustion();
-        }
-        else
-        {
-            super.start();
-            contact(to -> factory.create(applyKind, to, tracker.topologies(), 
txnId, ballot, sendTo, txn, executeAt, stableDeps, writes, result, scope, 
flags.get(to)));
-        }
+        super.start();
+        contact(to -> factory.create(applyKind, to, tracker.topologies(), 
txnId, ballot, sendTo, txn, executeAt, stableDeps, writes, result, scope, 
flags.get(to)));
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java 
b/accord-core/src/main/java/accord/coordinate/Propose.java
index 1e0aec04..8d05206a 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -86,16 +86,8 @@ abstract class Propose<R> extends 
AbstractCoordination<FullRoute<?>, R, AcceptRe
     @Override
     void start()
     {
-        SortedArrays.SortedArrayList<Node.Id> contact = 
tracker.filterAndRecordFaulty();
-        if (contact == null)
-        {
-            finishOnExaustion();
-        }
-        else
-        {
-            super.start();
-            contact(to -> new Accept(to, tracker.topologies(), kind, ballot, 
txnId, scope, executeAt, deps, require != scope));
-        }
+        super.start();
+        contact(to -> new Accept(to, tracker.topologies(), kind, ballot, 
txnId, scope, executeAt, deps, require != scope));
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/coordinate/Stabilise.java 
b/accord-core/src/main/java/accord/coordinate/Stabilise.java
index cb3c52ab..925d44da 100644
--- a/accord-core/src/main/java/accord/coordinate/Stabilise.java
+++ b/accord-core/src/main/java/accord/coordinate/Stabilise.java
@@ -74,18 +74,13 @@ public abstract class Stabilise<R> extends 
AbstractCoordination<FullRoute<?>, R,
     @Override
     void start()
     {
-        SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
+        super.start();
+        contact(to -> new Commit(CommitSlowPath, to, allTopologies, txnId, 
txn, scope, ballot, executeAt, stabiliseDeps));
         if (allTopologies.size() > 1)
-            contact = 
contact.with(allTopologies.nodes().without(tracker.nodes()).without(allTopologies::isFaulty));
-
-        if (contact == null)
-        {
-            finishOnExaustion();
-        }
-        else
         {
-            super.start();
-            contact(to -> new Commit(CommitSlowPath, to, allTopologies, txnId, 
txn, scope, ballot, executeAt, stabiliseDeps));
+            SortedArrayList<Node.Id> extra = 
allTopologies.nodes().without(tracker.nodes()).without(allTopologies::isFaulty);
+            for (Node.Id to : extra)
+                node.send(to, new Commit(CommitSlowPath, to, allTopologies, 
txnId, txn, scope, ballot, executeAt, stabiliseDeps));
         }
     }
 
diff --git 
a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java 
b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
index e9c9731e..6f011c87 100644
--- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
+++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
@@ -20,8 +20,11 @@ package accord.impl.list;
 
 import java.util.function.Function;
 
+import javax.annotation.Nullable;
+
 import accord.api.Data;
 import accord.api.DataStore;
+import accord.coordinate.tracking.AbstractTracker;
 import accord.impl.AbstractFetchCoordinator;
 import accord.local.CommandStore;
 import accord.local.Node;
@@ -74,6 +77,13 @@ public class ListFetchCoordinator extends 
AbstractFetchCoordinator
         return new ListFetchRequest(sourceEpoch, syncId, ranges, partialDeps, 
partialTxn);
     }
 
+    @Nullable
+    @Override
+    public AbstractTracker<?> tracker()
+    {
+        return null;
+    }
+
     static class ListFetchRequest extends FetchRequest
     {
         public ListFetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, 
PartialDeps partialDeps, PartialTxn partialTxn)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to