This is an automated email from the ASF dual-hosted git repository.
belliottsmith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new e8c70e09 CASSANDRA-21361 follow-up: recontact forbids self addressed
messages and we ensure self-addressed messages send full information to avoid
having multiple LocalDelivery contexts
e8c70e09 is described below
commit e8c70e097f305a5097084079f136d8be7c085b29
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed May 13 21:02:46 2026 +0100
CASSANDRA-21361 follow-up: recontact forbids self addressed messages and we
ensure self-addressed messages send full information to avoid having multiple
LocalDelivery contexts
---
.../main/java/accord/api/ProtocolModifiers.java | 8 ++---
.../accord/coordinate/AbstractCoordination.java | 4 +--
.../main/java/accord/coordinate/ExecuteTxn.java | 34 +++++++++++++---------
.../src/main/java/accord/coordinate/Stabilise.java | 8 ++---
4 files changed, 31 insertions(+), 23 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/ProtocolModifiers.java
b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
index 1eeb4249..e0c6fb5f 100644
--- a/accord-core/src/main/java/accord/api/ProtocolModifiers.java
+++ b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
@@ -144,8 +144,8 @@ public class ProtocolModifiers
private static SendStableMessages sendStableMessages =
FOR_READS_OR_NONE_IF_FASTEXEC;
public static synchronized void
setSendStableMessages(SendStableMessages newSendStableMessages) { pre();
sendStableMessages = newSendStableMessages; }
- private static boolean sendMinimalCommits = true;
- public static synchronized void setSendMinimalCommits(boolean
newSendMinimalCommits) { pre(); sendMinimalCommits = newSendMinimalCommits; }
+ private static boolean sendMinimal = true;
+ public static synchronized void setSendMinimal(boolean newSendMinimal)
{ pre(); sendMinimal = newSendMinimal; }
private static boolean permitCoordinatorLocalExecution = true;
public static synchronized void
setPermitCoordinatorLocalExecution(boolean newPermitCoordinatorLocalExecution)
{ pre(); permitCoordinatorLocalExecution = newPermitCoordinatorLocalExecution; }
@@ -261,8 +261,8 @@ public class ProtocolModifiers
public static boolean sendOnlyReadStableMessages(TxnId txnId) { return
sendStableMessages.compareTo(FOR_READS) >= 0 || (sendStableMessages ==
TO_ALL_REPLICA_EXECUTABLE_ELSE_FOR_READS && (!txnId.is(SingleKey) ||
!txnId.is(Txn.Kind.Write))); }
public static boolean sendNoStableIfFastExec() { return sendStableMessages
== FOR_READS_OR_NONE_IF_FASTEXEC; }
- private static final boolean sendMinimalCommits =
Configure.sendMinimalCommits;
- public static boolean sendMinimal() { return sendMinimalCommits; }
+ private static final boolean sendMinimal = Configure.sendMinimal;
+ public static boolean sendMinimal() { return sendMinimal; }
private static final boolean dataStoreRequiresUniqueHlcs =
Configure.dataStoreRequiresUniqueHlcs;
public static boolean dataStoreRequiresUniqueHlcs() { return
dataStoreRequiresUniqueHlcs; }
diff --git
a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
index 3effa95f..55f327bb 100644
--- a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
+++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
@@ -236,8 +236,8 @@ public abstract class AbstractCoordination<P extends
Participants<?>, Result, Re
void recontact(Node.Id to, Request send)
{
node.maybeTraceRemote(to, send, tracing);
- if (permitLocalDelivery() && to.equals(node.id())) new
LocalDelivery<>(node, this).deliver(send);
- else node.send(to, send, tracing);
+ if (permitLocalDelivery()) Invariants.expect(!to.equals(node.id()));
// we should never have to recontact ourselves, and callbacks/timeouts will not
work correctly if we do
+ node.send(to, send, tracing);
}
@Override
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
index e2429df0..9f813631 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
@@ -86,7 +86,9 @@ import static
accord.coordinate.CoordinationAdapter.Factory.Kind.Standard;
import static accord.coordinate.ExecuteFlag.READY_TO_EXECUTE;
import static accord.coordinate.ExecutePath.EPHEMERAL;
import static accord.coordinate.ExecutePath.FAST;
+import static accord.coordinate.ExecutePath.MEDIUM;
import static accord.coordinate.ExecutePath.RECOVER;
+import static accord.coordinate.ExecutePath.SLOW;
import static accord.coordinate.ReadCoordinator.Action.Approve;
import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
import static accord.local.CommandSummaries.SummaryStatus.STABLE;
@@ -277,15 +279,18 @@ public class ExecuteTxn extends ReadCoordinator<Result,
ReadReply>
{
// TODO (desired): migrate to SortedListSet; or introduce a
specialised version for integer keys; or introduce a hash equivalent
Topologies all = allTopologies;
- Commit.Kind kind = commitKind();
for (int i = 0, size = sendReadTo.size() ; i < size ; ++i)
{
Node.Id to = sendReadTo.get(i);
ExecuteFlags flags = this.flags.get(to);
- Invariants.require(kind.compareTo(StableFastPath) >= 0);
boolean sendUnstable = flags.contains(READY_TO_EXECUTE) &&
sendNoStableIfFastExec() && path != RECOVER;
if (sendUnstable) sendUnstableRead(to, flags);
- else sendStableRead(to, kind);
+ else
+ {
+ Commit.Kind kind = commitKind(to);
+ Invariants.require(kind.compareTo(StableFastPath) >= 0);
+ sendStableRead(to, kind);
+ }
}
boolean sendOnlyReadStableMessages = flags.isAnyReadyToExecute() &&
sendOnlyReadStableMessages(txnId);
@@ -303,13 +308,13 @@ public class ExecuteTxn extends ReadCoordinator<Result,
ReadReply>
if (sendOnlyReadStableMessages && all.current().contains(to))
continue;
- sendStableOnly(to, kind);
+ sendStableOnly(to);
}
}
- private void sendStableOnly(Node.Id to, Commit.Kind kind)
+ private void sendStableOnly(Node.Id to)
{
- Commit send = new Commit(kind, to, allTopologies, txnId, txn, route,
ballot, executeAt, stableDeps);
+ Commit send = new Commit(commitKind(to), to, allTopologies, txnId,
txn, route, ballot, executeAt, stableDeps);
boolean addCallback = allTopologies.size() == 1 ||
stable.nodes().contains(to);
if (addCallback) node.send(to, send, executor, stable, tracing);
else node.send(to, send, tracing);
@@ -346,9 +351,10 @@ public class ExecuteTxn extends ReadCoordinator<Result,
ReadReply>
node.send(to, send, executor, stable, tracing);
}
- private Commit.Kind commitKind()
+ private Commit.Kind commitKind(Node.Id to)
{
- if (!sendMinimal())
+ // we MUST send StableFastPath to self to ensure we use the privileged
coordinator fast commit machinery that rejects if we have witnessed a future
ballot
+ if (!sendMinimal() && (path != FAST || !to.equals(node.id())))
return StableWithTxnAndDeps;
switch (path)
@@ -369,7 +375,7 @@ public class ExecuteTxn extends ReadCoordinator<Result,
ReadReply>
ExecuteFlags flags = this.flags.get(to);
boolean sendUnstable = !sendOnlyReadStableMessages(txnId) || path ==
RECOVER || flags.contains(READY_TO_EXECUTE);
if (sendUnstable) sendUnstableRead(to, flags);
- else sendStableRead(to, commitKind());
+ else sendStableRead(to, commitKind(to));
}
@Override
@@ -467,16 +473,18 @@ public class ExecuteTxn extends ReadCoordinator<Result,
ReadReply>
if (sendOnlyReadStableMessages(txnId))
{
// send additional stable messages to record the
transaction outcome
- Commit.Kind kind = commitKind();
if (!candidates.isEmpty())
{
for (int i = 0, size = candidates.size() ; i <
size ; ++i)
- sendStableOnly(candidates.get(i), kind);
+ {
+ Node.Id to = candidates.get(i);
+ sendStableOnly(to);
+ }
}
if (unstableFastReads != null)
{
for (Node.Id to : unstableFastReads)
- sendStableOnly(to, kind);
+ sendStableOnly(to);
}
}
}
@@ -591,7 +599,7 @@ public class ExecuteTxn extends ReadCoordinator<Result,
ReadReply>
@Override
protected CommitOrReadNack applyInternal(SafeCommandStore safeStore,
SafeCommand safeCommand, StoreParticipants participants)
{
- if (CommitOutcome.Rejected == Commands.commit(safeStore,
safeCommand, participants, Stable, Ballot.ZERO, txnId, route, txn, executeAt,
stableDeps, commitKind()))
+ if (CommitOutcome.Rejected == Commands.commit(safeStore,
safeCommand, participants, Stable, Ballot.ZERO, txnId, route, txn, executeAt,
stableDeps, commitKind(node.id())))
return CommitOrReadNack.Rejected;
if (coordinatorBacklogExecution(ballot) && txnId.is(SingleKey) &&
txnId.is(Key))
diff --git a/accord-core/src/main/java/accord/coordinate/Stabilise.java
b/accord-core/src/main/java/accord/coordinate/Stabilise.java
index f78c6b79..bd8c7cd4 100644
--- a/accord-core/src/main/java/accord/coordinate/Stabilise.java
+++ b/accord-core/src/main/java/accord/coordinate/Stabilise.java
@@ -77,12 +77,12 @@ public abstract class Stabilise<R> extends
AbstractCoordination<FullRoute<?>, R,
void start()
{
super.start();
- contact(to -> new Commit(commitKind(), to, allTopologies, txnId, txn,
scope, ballot, executeAt, stabiliseDeps));
+ contact(to -> new Commit(commitKind(to), to, allTopologies, txnId,
txn, scope, ballot, executeAt, stabiliseDeps));
if (allTopologies.size() > 1)
{
SortedArrayList<Node.Id> extra =
allTopologies.nodes().without(tracker.nodes()).without(allTopologies::isFaulty);
for (Node.Id to : extra)
- node.send(to, new Commit(commitKind(), to, allTopologies,
txnId, txn, scope, ballot, executeAt, stabiliseDeps), tracing);
+ node.send(to, new Commit(commitKind(to), to, allTopologies,
txnId, txn, scope, ballot, executeAt, stabiliseDeps), tracing);
}
}
@@ -126,9 +126,9 @@ public abstract class Stabilise<R> extends
AbstractCoordination<FullRoute<?>, R,
}
}
- private static Commit.Kind commitKind()
+ private Commit.Kind commitKind(Node.Id to)
{
- return sendMinimal() ? CommitSlowPath : CommitWithTxn;
+ return sendMinimal() && !to.equals(node.id()) ? CommitSlowPath :
CommitWithTxn;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]