This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 65c96dde52545097d9ad853d5812601b55d7192c
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Oct 9 19:39:06 2024 +0100

    Fix reportSlow race, and CoordinateShardDurable NPE
---
 .../src/main/java/accord/coordinate/CoordinateShardDurable.java    | 5 +++++
 accord-core/src/main/java/accord/impl/RequestCallbacks.java        | 5 ++++-
 accord-core/src/main/java/accord/messages/ReadData.java            | 7 ++++---
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index f5193a56..63a0c3eb 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -56,6 +56,11 @@ public class CoordinateShardDurable extends 
ExecuteExclusiveSyncPoint implements
     protected void start()
     {
         SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
+        if (contact == null)
+        {
+            tryFailure(new Exhausted(syncPoint.syncId, 
syncPoint.route.homeKey(), null));
+            return;
+        }
         SortedArrayList<Node.Id> allStaleNodes = 
tracker.topologies().staleNodes();
         SortedArrayList<Node.Id> allCurrentNodes = 
tracker.topologies().current().nodes();
         SortedArrayList<Node.Id> staleNodes = contact.without(id -> 
!allStaleNodes.contains(id))
diff --git a/accord-core/src/main/java/accord/impl/RequestCallbacks.java 
b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
index f3644061..fa44ab96 100644
--- a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
+++ b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
@@ -53,6 +53,7 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
             final long registeredAt;
             final long reportSlowAt;
             final long reportFailAt;
+            boolean cancelInFlight;
 
             public RegisteredCallback(AgentExecutor executor, long callbackId, 
Callback<T> callback, Node.Id to, long registeredAt, long reportSlowAt, long 
reportFailAt)
             {
@@ -75,6 +76,7 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
             {
                 if (isInHeap())
                     timeouts.remove(this);
+                cancelInFlight = true;
             }
 
             @Override
@@ -122,7 +124,8 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
 
             private void unsafeOnSlow(Object ignore)
             {
-                callback.onSlowResponse(to);
+                if (!cancelInFlight)
+                    callback.onSlowResponse(to);
             }
 
             <P> void safeInvoke(BiConsumer<RegisteredCallback<T>, P> invoker, 
P param)
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index d403f48e..0d4423de 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -325,7 +325,7 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
         Runnable clear;
         synchronized(this)
         {
-            if (state == State.OBSOLETE)
+            if (state != State.PENDING)
                 return;
 
             logger.trace("{}: read completed on {}", txnId, commandStore);
@@ -401,8 +401,8 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
             if (state != State.PENDING)
                 return false;
 
-            clear = clearUnsafe();
             state = State.OBSOLETE;
+            clear = clearUnsafe();
         }
         cleanup(clear);
         return true;
@@ -426,6 +426,7 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
 
     @Nullable Runnable clearUnsafe()
     {
+        Invariants.checkState(state != State.PENDING);
         RegisteredTimeout cancelTimeout = timeout;
         Int2ObjectHashMap<LocalListeners.Registered> cancelRegistrations = 
registrations;
         timeout = null;
@@ -449,7 +450,7 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
     public void timeout()
     {
         timeout = null;
-        cleanup();
+        cancel();
     }
 
     public int stripe()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to