This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new eedd13ac Follow-up to: Do not contact faulty replicas, and support
reporting slow replies for preaccept/read. Do not wait for stale or left nodes
for durability.
eedd13ac is described below
commit eedd13ac4b7b9f61d574badb4bfc47611a838739
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Oct 11 09:45:06 2024 +0100
Follow-up to: Do not contact faulty replicas, and support reporting slow
replies for preaccept/read. Do not wait for stale or left nodes for durability.
---
.../main/java/accord/impl/RequestCallbacks.java | 34 ++++++++---
.../src/main/java/accord/local/CommandStore.java | 2 +-
.../src/main/java/accord/utils/LogGroupTimers.java | 66 +++++++++++-----------
3 files changed, 58 insertions(+), 44 deletions(-)
diff --git a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
index fa44ab96..37e0f5fc 100644
--- a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
+++ b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
@@ -85,6 +85,7 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
if (deadline() == reportFailAt)
{
callbacks.remove(callbackId);
+ cancelInFlight = true;
return this;
}
@@ -194,23 +195,38 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
private <T, P> RegisteredCallback<T> safeInvoke(long callbackId,
Node.Id from, P param, BiConsumer<RegisteredCallback<T>, P> invoker, boolean
remove)
{
+ RegisteredCallback<T> registered = null;
long now = time.elapsed(MICROSECONDS);
lock();
try
{
- RegisteredCallback<T> registered = remove ?
callbacks.remove(callbackId) : callbacks.get(callbackId);
- if (registered == null)
- return null;
+ try
+ {
+ registered = remove ? callbacks.remove(callbackId) :
callbacks.get(callbackId);
+ if (registered == null)
+ return null;
- if (remove) registered.cancelUnsafe();
- Invariants.checkState(registered.to.equals(from));
- registered.safeInvoke(invoker, param);
- return registered;
+ if (remove)
+ registered.cancelUnsafe();
+ Invariants.checkState(registered.to.equals(from));
+ }
+ finally
+ {
+ unlock(now);
+ }
}
- finally
+ catch (Throwable t)
{
- unlock(now);
+ // we don't want to hold the lock when we invoke the callback,
+ // but we also want to make sure we invoke the callback even
+ // if some other callback throws an exception
+ try { if (registered != null) registered.safeInvoke(invoker,
param); }
+ catch (Throwable t2) { t.addSuppressed(t2); }
+ throw t;
}
+
+ registered.safeInvoke(invoker, param);
+ return registered;
}
}
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index 6a26b825..4fdbcc09 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -487,8 +487,8 @@ public abstract class CommandStore implements AgentExecutor
CollectCalculatedDeps.withCalculatedDeps(node, id, route, route,
before, (deps, fail) -> {
if (fail != null)
{
+ node.scheduler().once(() -> fetchMajorityDeps(coordination,
node, epoch, ranges), 10L, TimeUnit.SECONDS);
node.agent().onUncaughtException(fail);
- node.scheduler().once(() -> fetchMajorityDeps(coordination,
node, epoch, ranges), 1L, TimeUnit.MINUTES);
}
else
{
diff --git a/accord-core/src/main/java/accord/utils/LogGroupTimers.java
b/accord-core/src/main/java/accord/utils/LogGroupTimers.java
index 15274140..9d83a87d 100644
--- a/accord-core/src/main/java/accord/utils/LogGroupTimers.java
+++ b/accord-core/src/main/java/accord/utils/LogGroupTimers.java
@@ -345,46 +345,44 @@ public class LogGroupTimers<T extends
LogGroupTimers.Timer>
head.heapify();
Timer next = head.peekNode();
- if (next == null)
+ if (next != null)
{
- if (head.end() >= curEpoch)
- {
- wakeAt = head.end();
- }
- else
+ wakeAt = next.deadline;
+ }
+ else if (head.end() >= curEpoch)
+ {
+ wakeAt = head.end();
+ }
+ else
+ {
+ while (true)
{
- while (true)
+ buckets[bucketsStart++] = null;
+ if (head == addFinger) addFinger = null;
+ if (bucketsStart == bucketsEnd)
+ {
+ wakeAt = Long.MAX_VALUE;
+ return;
+ }
+ head = buckets[bucketsStart];
+ if (head.epoch >= curEpoch)
+ {
+ wakeAt = head.epoch;
+ return;
+ }
+ head.heapify();
+ if (!head.isEmpty())
{
- buckets[bucketsStart++] = null;
- if (head == addFinger) addFinger = null;
- if (bucketsStart == bucketsEnd)
- {
- wakeAt = Long.MAX_VALUE;
- return;
- }
- head = buckets[bucketsStart];
- if (head.end() >= curEpoch)
- {
- if (head.epoch >= curEpoch)
- {
- wakeAt = head.epoch;
- return;
- }
- else
- {
- head.heapify();
- if (!head.isEmpty())
- {
- wakeAt = head.peekNode().deadline();
- return;
- }
- }
- }
- Invariants.checkState(head.isEmpty());
+ wakeAt = head.peekNode().deadline();
+ return;
+ }
+ else if (head.end() < curEpoch)
+ {
+ wakeAt = head.end();
+ return;
}
}
}
- else wakeAt = next.deadline;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]