This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8ccce745 Submit ShardScheduler success to DurabilityQueue without
holding lock to avoid deadlock (when DurabilityQueue queues a retry)
8ccce745 is described below
commit 8ccce745818cf80c7cff82c3554e4a88e9e540db
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Dec 1 20:05:00 2025 +0000
Submit ShardScheduler success to DurabilityQueue without holding lock to
avoid deadlock (when DurabilityQueue queues a retry)
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21053
---
.../java/accord/impl/DefaultRemoteListeners.java | 8 +++-
.../accord/local/durability/ShardDurability.java | 46 ++++++++++++++--------
2 files changed, 36 insertions(+), 18 deletions(-)
diff --git a/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
b/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
index e6abb05e..4bf35380 100644
--- a/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
@@ -23,6 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import accord.api.RemoteListeners;
import accord.local.Command;
import accord.local.Node;
@@ -38,6 +41,8 @@ import accord.utils.Invariants;
public class DefaultRemoteListeners implements RemoteListeners
{
+ private static final Logger logger =
LoggerFactory.getLogger(DefaultRemoteListeners.class);
+
public interface NotifySink
{
void notify(TxnId txnId, SaveStatus saveStatus, Route<?> participants,
long[] listeners, int listenerCount);
@@ -60,7 +65,8 @@ public class DefaultRemoteListeners implements RemoteListeners
// we could in theory reply to the same node multiple times
here, but should not be common enough to optimise
Node.Id replyTo = new Node.Id(listenerNodeId(listener));
int callbackId = listenerCallbackId(listener);
- node.send(replyTo, new AsyncAwaitComplete(txnId, route,
saveStatus, callbackId));
+ if (route == null) logger.warn("{}/{} attempting to notify {}
with callbackId {} but no route", txnId, saveStatus, replyTo, callbackId);
+ else node.send(replyTo, new AsyncAwaitComplete(txnId, route,
saveStatus, callbackId));
}
}
}
diff --git
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index fb287db4..4cbeec68 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -235,28 +235,40 @@ public class ShardDurability
start();
}
- synchronized void success(SyncPoint success, Ranges ranges)
+ void success(SyncPoint success, Ranges ranges)
{
- Object requestedBy = null;
- if (activeRequest != null)
- requestedBy = activeRequest.requestedBy;
+ DurabilityRequest request = null;
+ try
+ {
+ synchronized (this)
+ {
+ request = activeRequest;
- durabilityQueue.submit(success, activeRequest);
+ Object requestedBy = null;
+ if (request != null)
+ requestedBy = request.requestedBy;
- decrementBackoff();
- int index = activeIndex;
- active = active.without(ranges);
- if (!active.isEmpty())
- {
- start();
+ decrementBackoff();
+ int index = activeIndex;
+ active = active.without(ranges);
+ if (!active.isEmpty())
+ {
+ start();
+ }
+ else
+ {
+ if (index >= 0) logCycleProgress(index);
+ else logger.info("Successfully agreed RX requested by
{} for {} with {}. Remaining ranges: {}.", requestedBy, ranges, success.syncId,
active);
+ active = null;
+ activeRequest = null;
+ updateActive();
+ }
+ }
}
- else
+ finally
{
- if (index >= 0) logCycleProgress(index);
- else logger.info("Successfully agreed RX requested by {} for
{} with {}. Remaining ranges: {}.", requestedBy, ranges, success.syncId,
active);
- active = null;
- activeRequest = null;
- updateActive();
+ // submit without holding lock to avoid deadlock
+ durabilityQueue.submit(success, request);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]