This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eedd13ac Follow-up to: Do not contact faulty replicas, and support 
reporting slow replies for preaccept/read. Do not wait for stale or left nodes 
for durability.
eedd13ac is described below

commit eedd13ac4b7b9f61d574badb4bfc47611a838739
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Oct 11 09:45:06 2024 +0100

    Follow-up to: Do not contact faulty replicas, and support reporting slow 
replies for preaccept/read. Do not wait for stale or left nodes for durability.
---
 .../main/java/accord/impl/RequestCallbacks.java    | 34 ++++++++---
 .../src/main/java/accord/local/CommandStore.java   |  2 +-
 .../src/main/java/accord/utils/LogGroupTimers.java | 66 +++++++++++-----------
 3 files changed, 58 insertions(+), 44 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/RequestCallbacks.java 
b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
index fa44ab96..37e0f5fc 100644
--- a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
+++ b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
@@ -85,6 +85,7 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
                 if (deadline() == reportFailAt)
                 {
                     callbacks.remove(callbackId);
+                    cancelInFlight = true;
                     return this;
                 }
 
@@ -194,23 +195,38 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
 
         private <T, P> RegisteredCallback<T> safeInvoke(long callbackId, 
Node.Id from, P param, BiConsumer<RegisteredCallback<T>, P> invoker, boolean 
remove)
         {
+            RegisteredCallback<T> registered = null;
             long now = time.elapsed(MICROSECONDS);
             lock();
             try
             {
-                RegisteredCallback<T> registered = remove ? 
callbacks.remove(callbackId) : callbacks.get(callbackId);
-                if (registered == null)
-                    return null;
+                try
+                {
+                    registered = remove ? callbacks.remove(callbackId) : 
callbacks.get(callbackId);
+                    if (registered == null)
+                        return null;
 
-                if (remove) registered.cancelUnsafe();
-                Invariants.checkState(registered.to.equals(from));
-                registered.safeInvoke(invoker, param);
-                return registered;
+                    if (remove)
+                        registered.cancelUnsafe();
+                    Invariants.checkState(registered.to.equals(from));
+                }
+                finally
+                {
+                    unlock(now);
+                }
             }
-            finally
+            catch (Throwable t)
             {
-                unlock(now);
+                // we don't want to hold the lock when we invoke the callback,
+                // but we also want to make sure we invoke the callback even
+                // if some other callback throws an exception
+                try { if (registered != null) registered.safeInvoke(invoker, 
param); }
+                catch (Throwable t2) { t.addSuppressed(t2); }
+                throw t;
             }
+
+            registered.safeInvoke(invoker, param);
+            return registered;
         }
     }
 
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 6a26b825..4fdbcc09 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -487,8 +487,8 @@ public abstract class CommandStore implements AgentExecutor
         CollectCalculatedDeps.withCalculatedDeps(node, id, route, route, 
before, (deps, fail) -> {
             if (fail != null)
             {
+                node.scheduler().once(() -> fetchMajorityDeps(coordination, 
node, epoch, ranges), 10L, TimeUnit.SECONDS);
                 node.agent().onUncaughtException(fail);
-                node.scheduler().once(() -> fetchMajorityDeps(coordination, 
node, epoch, ranges), 1L, TimeUnit.MINUTES);
             }
             else
             {
diff --git a/accord-core/src/main/java/accord/utils/LogGroupTimers.java 
b/accord-core/src/main/java/accord/utils/LogGroupTimers.java
index 15274140..9d83a87d 100644
--- a/accord-core/src/main/java/accord/utils/LogGroupTimers.java
+++ b/accord-core/src/main/java/accord/utils/LogGroupTimers.java
@@ -345,46 +345,44 @@ public class LogGroupTimers<T extends 
LogGroupTimers.Timer>
                 head.heapify();
 
             Timer next = head.peekNode();
-            if (next == null)
+            if (next != null)
             {
-                if (head.end() >= curEpoch)
-                {
-                    wakeAt = head.end();
-                }
-                else
+                wakeAt = next.deadline;
+            }
+            else if (head.end() >= curEpoch)
+            {
+                wakeAt = head.end();
+            }
+            else
+            {
+                while (true)
                 {
-                    while (true)
+                    buckets[bucketsStart++] = null;
+                    if (head == addFinger) addFinger = null;
+                    if (bucketsStart == bucketsEnd)
+                    {
+                        wakeAt = Long.MAX_VALUE;
+                        return;
+                    }
+                    head = buckets[bucketsStart];
+                    if (head.epoch >= curEpoch)
+                    {
+                        wakeAt = head.epoch;
+                        return;
+                    }
+                    head.heapify();
+                    if (!head.isEmpty())
                     {
-                        buckets[bucketsStart++] = null;
-                        if (head == addFinger) addFinger = null;
-                        if (bucketsStart == bucketsEnd)
-                        {
-                            wakeAt = Long.MAX_VALUE;
-                            return;
-                        }
-                        head = buckets[bucketsStart];
-                        if (head.end() >= curEpoch)
-                        {
-                            if (head.epoch >= curEpoch)
-                            {
-                                wakeAt = head.epoch;
-                                return;
-                            }
-                            else
-                            {
-                                head.heapify();
-                                if (!head.isEmpty())
-                                {
-                                    wakeAt = head.peekNode().deadline();
-                                    return;
-                                }
-                            }
-                        }
-                        Invariants.checkState(head.isEmpty());
+                        wakeAt = head.peekNode().deadline();
+                        return;
+                    }
+                    else if (head.end() < curEpoch)
+                    {
+                        wakeAt = head.end();
+                        return;
                     }
                 }
             }
-            else wakeAt = next.deadline;
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to