This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new c319df65 Fix reportSlow race, and CoordinateShardDurable NPE
c319df65 is described below
commit c319df659b319bb6bf060e8492d1ec076011132d
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Oct 9 19:39:06 2024 +0100
Fix reportSlow race, and CoordinateShardDurable NPE
---
.../src/main/java/accord/coordinate/CoordinateShardDurable.java | 5 +++++
accord-core/src/main/java/accord/impl/RequestCallbacks.java | 5 ++++-
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index f5193a56..63a0c3eb 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -56,6 +56,11 @@ public class CoordinateShardDurable extends
ExecuteExclusiveSyncPoint implements
protected void start()
{
SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
+ if (contact == null)
+ {
+ tryFailure(new Exhausted(syncPoint.syncId,
syncPoint.route.homeKey(), null));
+ return;
+ }
SortedArrayList<Node.Id> allStaleNodes =
tracker.topologies().staleNodes();
SortedArrayList<Node.Id> allCurrentNodes =
tracker.topologies().current().nodes();
SortedArrayList<Node.Id> staleNodes = contact.without(id ->
!allStaleNodes.contains(id))
diff --git a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
index f3644061..fa44ab96 100644
--- a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
+++ b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
@@ -53,6 +53,7 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
final long registeredAt;
final long reportSlowAt;
final long reportFailAt;
+ boolean cancelInFlight;
public RegisteredCallback(AgentExecutor executor, long callbackId,
Callback<T> callback, Node.Id to, long registeredAt, long reportSlowAt, long
reportFailAt)
{
@@ -75,6 +76,7 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
{
if (isInHeap())
timeouts.remove(this);
+ cancelInFlight = true;
}
@Override
@@ -122,7 +124,8 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
private void unsafeOnSlow(Object ignore)
{
- callback.onSlowResponse(to);
+ if (!cancelInFlight)
+ callback.onSlowResponse(to);
}
<P> void safeInvoke(BiConsumer<RegisteredCallback<T>, P> invoker,
P param)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]