This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8ccce745 Submit ShardScheduler success to DurabilityQueue without 
holding lock to avoid deadlock (when DurabilityQueue queues a retry)
8ccce745 is described below

commit 8ccce745818cf80c7cff82c3554e4a88e9e540db
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Dec 1 20:05:00 2025 +0000

    Submit ShardScheduler success to DurabilityQueue without holding lock to 
avoid deadlock (when DurabilityQueue queues a retry)
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21053
---
 .../java/accord/impl/DefaultRemoteListeners.java   |  8 +++-
 .../accord/local/durability/ShardDurability.java   | 46 ++++++++++++++--------
 2 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
index e6abb05e..4bf35380 100644
--- a/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java
@@ -23,6 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import accord.api.RemoteListeners;
 import accord.local.Command;
 import accord.local.Node;
@@ -38,6 +41,8 @@ import accord.utils.Invariants;
 
 public class DefaultRemoteListeners implements RemoteListeners
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(DefaultRemoteListeners.class);
+
     public interface NotifySink
     {
         void notify(TxnId txnId, SaveStatus saveStatus, Route<?> participants, 
long[] listeners, int listenerCount);
@@ -60,7 +65,8 @@ public class DefaultRemoteListeners implements RemoteListeners
                 // we could in theory reply to the same node multiple times 
here, but should not be common enough to optimise
                 Node.Id replyTo = new Node.Id(listenerNodeId(listener));
                 int callbackId = listenerCallbackId(listener);
-                node.send(replyTo, new AsyncAwaitComplete(txnId, route, 
saveStatus, callbackId));
+                if (route == null) logger.warn("{}/{} attempting to notify {} 
with callbackId {} but no route", txnId, saveStatus, replyTo, callbackId);
+                else node.send(replyTo, new AsyncAwaitComplete(txnId, route, 
saveStatus, callbackId));
             }
         }
     }
diff --git 
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java 
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index fb287db4..4cbeec68 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -235,28 +235,40 @@ public class ShardDurability
                 start();
         }
 
-        synchronized void success(SyncPoint success, Ranges ranges)
+        void success(SyncPoint success, Ranges ranges)
         {
-            Object requestedBy = null;
-            if (activeRequest != null)
-                requestedBy = activeRequest.requestedBy;
+            DurabilityRequest request = null;
+            try
+            {
+                synchronized (this)
+                {
+                    request = activeRequest;
 
-            durabilityQueue.submit(success, activeRequest);
+                    Object requestedBy = null;
+                    if (request != null)
+                        requestedBy = request.requestedBy;
 
-            decrementBackoff();
-            int index = activeIndex;
-            active = active.without(ranges);
-            if (!active.isEmpty())
-            {
-                start();
+                    decrementBackoff();
+                    int index = activeIndex;
+                    active = active.without(ranges);
+                    if (!active.isEmpty())
+                    {
+                        start();
+                    }
+                    else
+                    {
+                        if (index >= 0) logCycleProgress(index);
+                        else logger.info("Successfully agreed RX requested by 
{} for {} with {}. Remaining ranges: {}.", requestedBy, ranges, success.syncId, 
active);
+                        active = null;
+                        activeRequest = null;
+                        updateActive();
+                    }
+                }
             }
-            else
+            finally
             {
-                if (index >= 0) logCycleProgress(index);
-                else logger.info("Successfully agreed RX requested by {} for 
{} with {}. Remaining ranges: {}.", requestedBy, ranges, success.syncId, 
active);
-                active = null;
-                activeRequest = null;
-                updateActive();
+                // submit without holding lock to avoid deadlock
+                durabilityQueue.submit(success, request);
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to