This is an automated email from the ASF dual-hosted git repository.

belliottsmith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8c70e09 CASSANDRA-21361 follow-up: recontact forbids self addressed 
messages and we ensure self-addressed messages send full information to avoid 
having multiple LocalDelivery contexts
e8c70e09 is described below

commit e8c70e097f305a5097084079f136d8be7c085b29
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed May 13 21:02:46 2026 +0100

    CASSANDRA-21361 follow-up: recontact forbids self addressed messages and we 
ensure self-addressed messages send full information to avoid having multiple 
LocalDelivery contexts
---
 .../main/java/accord/api/ProtocolModifiers.java    |  8 ++---
 .../accord/coordinate/AbstractCoordination.java    |  4 +--
 .../main/java/accord/coordinate/ExecuteTxn.java    | 34 +++++++++++++---------
 .../src/main/java/accord/coordinate/Stabilise.java |  8 ++---
 4 files changed, 31 insertions(+), 23 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ProtocolModifiers.java 
b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
index 1eeb4249..e0c6fb5f 100644
--- a/accord-core/src/main/java/accord/api/ProtocolModifiers.java
+++ b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
@@ -144,8 +144,8 @@ public class ProtocolModifiers
         private static SendStableMessages sendStableMessages = 
FOR_READS_OR_NONE_IF_FASTEXEC;
         public static synchronized void 
setSendStableMessages(SendStableMessages newSendStableMessages) { pre(); 
sendStableMessages = newSendStableMessages; }
 
-        private static boolean sendMinimalCommits = true;
-        public static synchronized void setSendMinimalCommits(boolean 
newSendMinimalCommits) { pre(); sendMinimalCommits = newSendMinimalCommits; }
+        private static boolean sendMinimal = true;
+        public static synchronized void setSendMinimal(boolean newSendMinimal) 
{ pre(); sendMinimal = newSendMinimal; }
 
         private static boolean permitCoordinatorLocalExecution = true;
         public static synchronized void 
setPermitCoordinatorLocalExecution(boolean newPermitCoordinatorLocalExecution) 
{ pre(); permitCoordinatorLocalExecution = newPermitCoordinatorLocalExecution; }
@@ -261,8 +261,8 @@ public class ProtocolModifiers
     public static boolean sendOnlyReadStableMessages(TxnId txnId) { return 
sendStableMessages.compareTo(FOR_READS) >= 0 || (sendStableMessages == 
TO_ALL_REPLICA_EXECUTABLE_ELSE_FOR_READS && (!txnId.is(SingleKey) || 
!txnId.is(Txn.Kind.Write))); }
     public static boolean sendNoStableIfFastExec() { return sendStableMessages 
== FOR_READS_OR_NONE_IF_FASTEXEC; }
 
-    private static final boolean sendMinimalCommits = 
Configure.sendMinimalCommits;
-    public static boolean sendMinimal() { return sendMinimalCommits; }
+    private static final boolean sendMinimal = Configure.sendMinimal;
+    public static boolean sendMinimal() { return sendMinimal; }
 
     private static final boolean dataStoreRequiresUniqueHlcs = 
Configure.dataStoreRequiresUniqueHlcs;
     public static boolean dataStoreRequiresUniqueHlcs() { return 
dataStoreRequiresUniqueHlcs; }
diff --git 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
index 3effa95f..55f327bb 100644
--- a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
+++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
@@ -236,8 +236,8 @@ public abstract class AbstractCoordination<P extends 
Participants<?>, Result, Re
     void recontact(Node.Id to, Request send)
     {
         node.maybeTraceRemote(to, send, tracing);
-        if (permitLocalDelivery() && to.equals(node.id())) new 
LocalDelivery<>(node, this).deliver(send);
-        else node.send(to, send, tracing);
+        if (permitLocalDelivery()) Invariants.expect(!to.equals(node.id())); 
// we should never have to recontact ourselves, and callbacks/timeouts will not 
work correctly if we do
+        node.send(to, send, tracing);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
index e2429df0..9f813631 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
@@ -86,7 +86,9 @@ import static 
accord.coordinate.CoordinationAdapter.Factory.Kind.Standard;
 import static accord.coordinate.ExecuteFlag.READY_TO_EXECUTE;
 import static accord.coordinate.ExecutePath.EPHEMERAL;
 import static accord.coordinate.ExecutePath.FAST;
+import static accord.coordinate.ExecutePath.MEDIUM;
 import static accord.coordinate.ExecutePath.RECOVER;
+import static accord.coordinate.ExecutePath.SLOW;
 import static accord.coordinate.ReadCoordinator.Action.Approve;
 import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
 import static accord.local.CommandSummaries.SummaryStatus.STABLE;
@@ -277,15 +279,18 @@ public class ExecuteTxn extends ReadCoordinator<Result, 
ReadReply>
     {
         // TODO (desired): migrate to SortedListSet; or introduce a 
specialised version for integer keys; or introduce a hash equivalent
         Topologies all = allTopologies;
-        Commit.Kind kind = commitKind();
         for (int i = 0, size = sendReadTo.size() ; i < size ; ++i)
         {
             Node.Id to = sendReadTo.get(i);
             ExecuteFlags flags = this.flags.get(to);
-            Invariants.require(kind.compareTo(StableFastPath) >= 0);
             boolean sendUnstable = flags.contains(READY_TO_EXECUTE) && 
sendNoStableIfFastExec() && path != RECOVER;
             if (sendUnstable) sendUnstableRead(to, flags);
-            else sendStableRead(to, kind);
+            else
+            {
+                Commit.Kind kind = commitKind(to);
+                Invariants.require(kind.compareTo(StableFastPath) >= 0);
+                sendStableRead(to, kind);
+            }
         }
 
         boolean sendOnlyReadStableMessages = flags.isAnyReadyToExecute() && 
sendOnlyReadStableMessages(txnId);
@@ -303,13 +308,13 @@ public class ExecuteTxn extends ReadCoordinator<Result, 
ReadReply>
             if (sendOnlyReadStableMessages && all.current().contains(to))
                 continue;
 
-            sendStableOnly(to, kind);
+            sendStableOnly(to);
         }
     }
 
-    private void sendStableOnly(Node.Id to, Commit.Kind kind)
+    private void sendStableOnly(Node.Id to)
     {
-        Commit send = new Commit(kind, to, allTopologies, txnId, txn, route, 
ballot, executeAt, stableDeps);
+        Commit send = new Commit(commitKind(to), to, allTopologies, txnId, 
txn, route, ballot, executeAt, stableDeps);
         boolean addCallback = allTopologies.size() == 1 || 
stable.nodes().contains(to);
         if (addCallback) node.send(to, send, executor, stable, tracing);
         else node.send(to, send, tracing);
@@ -346,9 +351,10 @@ public class ExecuteTxn extends ReadCoordinator<Result, 
ReadReply>
         node.send(to, send, executor, stable, tracing);
     }
 
-    private Commit.Kind commitKind()
+    private Commit.Kind commitKind(Node.Id to)
     {
-        if (!sendMinimal())
+        // we MUST send StableFastPath to self to ensure we use the privileged 
coordinator fast commit machinery that rejects if we have witnessed a future 
ballot
+        if (!sendMinimal() && (path != FAST || !to.equals(node.id())))
             return StableWithTxnAndDeps;
 
         switch (path)
@@ -369,7 +375,7 @@ public class ExecuteTxn extends ReadCoordinator<Result, 
ReadReply>
         ExecuteFlags flags = this.flags.get(to);
         boolean sendUnstable = !sendOnlyReadStableMessages(txnId) || path == 
RECOVER || flags.contains(READY_TO_EXECUTE);
         if (sendUnstable) sendUnstableRead(to, flags);
-        else sendStableRead(to, commitKind());
+        else sendStableRead(to, commitKind(to));
     }
 
     @Override
@@ -467,16 +473,18 @@ public class ExecuteTxn extends ReadCoordinator<Result, 
ReadReply>
                     if (sendOnlyReadStableMessages(txnId))
                     {
                         // send additional stable messages to record the 
transaction outcome
-                        Commit.Kind kind = commitKind();
                         if (!candidates.isEmpty())
                         {
                             for (int i = 0, size = candidates.size() ; i < 
size ; ++i)
-                                sendStableOnly(candidates.get(i), kind);
+                            {
+                                Node.Id to = candidates.get(i);
+                                sendStableOnly(to);
+                            }
                         }
                         if (unstableFastReads != null)
                         {
                             for (Node.Id to : unstableFastReads)
-                                sendStableOnly(to, kind);
+                                sendStableOnly(to);
                         }
                     }
                 }
@@ -591,7 +599,7 @@ public class ExecuteTxn extends ReadCoordinator<Result, 
ReadReply>
         @Override
         protected CommitOrReadNack applyInternal(SafeCommandStore safeStore, 
SafeCommand safeCommand, StoreParticipants participants)
         {
-            if (CommitOutcome.Rejected == Commands.commit(safeStore, 
safeCommand, participants, Stable, Ballot.ZERO, txnId, route, txn, executeAt, 
stableDeps, commitKind()))
+            if (CommitOutcome.Rejected == Commands.commit(safeStore, 
safeCommand, participants, Stable, Ballot.ZERO, txnId, route, txn, executeAt, 
stableDeps, commitKind(node.id())))
                 return CommitOrReadNack.Rejected;
 
             if (coordinatorBacklogExecution(ballot) && txnId.is(SingleKey) && 
txnId.is(Key))
diff --git a/accord-core/src/main/java/accord/coordinate/Stabilise.java 
b/accord-core/src/main/java/accord/coordinate/Stabilise.java
index f78c6b79..bd8c7cd4 100644
--- a/accord-core/src/main/java/accord/coordinate/Stabilise.java
+++ b/accord-core/src/main/java/accord/coordinate/Stabilise.java
@@ -77,12 +77,12 @@ public abstract class Stabilise<R> extends 
AbstractCoordination<FullRoute<?>, R,
     void start()
     {
         super.start();
-        contact(to -> new Commit(commitKind(), to, allTopologies, txnId, txn, 
scope, ballot, executeAt, stabiliseDeps));
+        contact(to -> new Commit(commitKind(to), to, allTopologies, txnId, 
txn, scope, ballot, executeAt, stabiliseDeps));
         if (allTopologies.size() > 1)
         {
             SortedArrayList<Node.Id> extra = 
allTopologies.nodes().without(tracker.nodes()).without(allTopologies::isFaulty);
             for (Node.Id to : extra)
-                node.send(to, new Commit(commitKind(), to, allTopologies, 
txnId, txn, scope, ballot, executeAt, stabiliseDeps), tracing);
+                node.send(to, new Commit(commitKind(to), to, allTopologies, 
txnId, txn, scope, ballot, executeAt, stabiliseDeps), tracing);
         }
     }
 
@@ -126,9 +126,9 @@ public abstract class Stabilise<R> extends 
AbstractCoordination<FullRoute<?>, R,
         }
     }
 
-    private static Commit.Kind commitKind()
+    private Commit.Kind commitKind(Node.Id to)
     {
-        return sendMinimal() ? CommitSlowPath : CommitWithTxn;
+        return sendMinimal() && !to.equals(node.id()) ? CommitSlowPath : 
CommitWithTxn;
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to