This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 291cbe70 Follow-up to: Do not contact faulty replicas, and support
reporting slow replies for preaccept/read. Do not wait for stale or left nodes
for durability.
291cbe70 is described below
commit 291cbe70ad82b0d5f875101f541d9f841912802f
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Oct 9 12:50:32 2024 +0100
Follow-up to: Do not contact faulty replicas, and support reporting slow
replies for preaccept/read. Do not wait for stale or left nodes for durability.
---
.../accord/coordinate/CoordinateTransaction.java | 3 -
.../src/main/java/accord/coordinate/Stabilise.java | 8 +-
.../coordinate/tracking/AbstractTracker.java | 4 +-
.../accord/coordinate/tracking/ReadTracker.java | 13 +-
.../java/accord/impl/AbstractFetchCoordinator.java | 6 +-
.../java/accord/impl/AbstractRequestTimeouts.java | 128 +++++-----
.../java/accord/impl/DefaultLocalListeners.java | 3 +-
.../main/java/accord/impl/RequestCallbacks.java | 43 ++--
.../src/main/java/accord/local/Commands.java | 1 +
accord-core/src/main/java/accord/local/Node.java | 2 +-
.../accord/messages/ApplyThenWaitUntilApplied.java | 10 -
.../src/main/java/accord/messages/Await.java | 88 +++----
.../src/main/java/accord/messages/Commit.java | 50 ++--
.../src/main/java/accord/messages/ReadData.java | 261 +++++++++++++--------
.../java/accord/messages/WaitUntilApplied.java | 2 +-
.../main/java/accord/topology/TopologyManager.java | 5 +
16 files changed, 348 insertions(+), 279 deletions(-)
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
index 5cc380c1..8f8cb2b1 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
@@ -47,12 +47,9 @@ import static
accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
*/
public class CoordinateTransaction extends CoordinatePreAccept<Result>
{
- final Txn txn;
-
private CoordinateTransaction(Node node, TxnId txnId, Txn txn,
FullRoute<?> route)
{
super(node, txnId, txn, route);
- this.txn = txn;
}
public static AsyncResult<Result> coordinate(Node node, FullRoute<?>
route, TxnId txnId, Txn txn)
diff --git a/accord-core/src/main/java/accord/coordinate/Stabilise.java
b/accord-core/src/main/java/accord/coordinate/Stabilise.java
index b6145100..05993692 100644
--- a/accord-core/src/main/java/accord/coordinate/Stabilise.java
+++ b/accord-core/src/main/java/accord/coordinate/Stabilise.java
@@ -36,6 +36,7 @@ import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Topologies;
+import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.SortedListMap;
import static accord.coordinate.ExecutePath.SLOW;
@@ -77,7 +78,12 @@ public abstract class Stabilise<R> implements
Callback<ReadReply>
void start()
{
- Commit.commitMinimal(node, stableTracker.topologies(), ballot, txnId,
txn, route, executeAt, stabiliseDeps, this);
+ SortedArrayList<Node.Id> contact =
stableTracker.filterAndRecordFaulty();
+ if (allTopologies.size() > 1)
+ contact =
contact.with(allTopologies.nodes().without(stableTracker.nodes()).without(allTopologies::isFaulty));
+
+ if (contact == null) callback.accept(null, new Exhausted(txnId,
route.homeKey(), null));
+ else Commit.commitMinimal(contact, node, stableTracker.topologies(),
allTopologies, ballot, txnId, txn, route, executeAt, stabiliseDeps, this);
}
@Override
diff --git
a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
index d156e286..846621b5 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
@@ -120,6 +120,8 @@ public abstract class AbstractTracker<ST extends
ShardTracker>
}
public abstract RequestStatus recordFailure(Id from);
+ // record failure before starting up; can skip any in-flight validation,
and perhaps be cheaper
+ public RequestStatus prerecordFailure(Id from) { return
recordFailure(from); }
protected int topologyOffset(int topologyIdx)
{
@@ -230,7 +232,7 @@ public abstract class AbstractTracker<ST extends
ShardTracker>
Id node = nodes.get(i);
if (sorter.isFaulty(node))
{
- if (Failed == reportTo.recordFailure(node))
+ if (Failed == reportTo.prerecordFailure(node))
return null;
if (buffer == null)
diff --git
a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
index 5e6c55ec..3dee3651 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
@@ -150,6 +150,12 @@ public class ReadTracker extends
AbstractTracker<ReadTracker.ReadShardTracker>
return ensureProgressOrFail();
}
+ public ShardOutcomes prerecordReadFailure(Object ignore)
+ {
+ ++contacted;
+ return ensureProgressOrFail();
+ }
+
private ShardOutcomes ensureProgressOrFail()
{
if (!shouldRead())
@@ -236,7 +242,7 @@ public class ReadTracker extends
AbstractTracker<ReadTracker.ReadShardTracker>
private boolean receiveResponseIsSlow(Id node)
{
- if (!inflight.remove(node.id))
+ if (!inflight.isEmpty() && !inflight.remove(node.id))
throw illegalState("Nothing in flight for " + node);
return slow != null && slow.remove(node.id);
@@ -292,6 +298,11 @@ public class ReadTracker extends
AbstractTracker<ReadTracker.ReadShardTracker>
return recordResponse(from, ReadShardTracker::recordReadFailure);
}
+ public RequestStatus prerecordFailure(Id from)
+ {
+ return recordResponse(this, from,
ReadShardTracker::prerecordReadFailure, null);
+ }
+
protected RequestStatus recordResponse(Id from, BiFunction<? super
ReadShardTracker, Boolean, ? extends ShardOutcome<? super ReadTracker>>
function)
{
boolean isSlow = receiveResponseIsSlow(from);
diff --git
a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index 2380a7f0..7bc1c4fa 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -276,11 +276,9 @@ public abstract class AbstractFetchCoordinator extends
FetchCoordinator
}
@Override
- protected void onAllSuccess(@Nullable Ranges unavailable, @Nullable
Data data, @Nullable Throwable fail)
+ protected ReadOk constructReadOk(Ranges unavailable, Data data)
{
- // TODO (review): If the fetch response actually does some
streaming, but we send back the error
- // it is a lot of work and data that might move and be unaccounted
for at the coordinator
- node.reply(replyTo, replyContext, fail == null ? new
FetchResponse(unavailable, data, maxApplied()) : null, fail);
+ return new FetchResponse(unavailable, data, maxApplied());
}
protected Timestamp maxApplied()
diff --git a/accord-core/src/main/java/accord/impl/AbstractRequestTimeouts.java
b/accord-core/src/main/java/accord/impl/AbstractRequestTimeouts.java
index e909fe2c..dd3c55e9 100644
--- a/accord-core/src/main/java/accord/impl/AbstractRequestTimeouts.java
+++ b/accord-core/src/main/java/accord/impl/AbstractRequestTimeouts.java
@@ -34,8 +34,8 @@ public class AbstractRequestTimeouts<S extends
AbstractRequestTimeouts.Stripe> i
{
protected interface Expiring
{
- Expiring prepareToExpire(Stripe stripe, long now);
- void onExpire(long now);
+ Expiring prepareToExpire();
+ void onExpire();
}
protected static class Stripe implements Runnable,
Function<Stripe.AbstractRegistered, Runnable>
@@ -69,13 +69,13 @@ public class AbstractRequestTimeouts<S extends
AbstractRequestTimeouts.Stripe> i
@Override
- public void onExpire(long now)
+ public void onExpire()
{
timeout.timeout();
}
@Override
- public Expiring prepareToExpire(Stripe stripe, long now)
+ public Expiring prepareToExpire()
{
return this;
}
@@ -93,57 +93,9 @@ public class AbstractRequestTimeouts<S extends
AbstractRequestTimeouts.Stripe> i
@Override
public void run()
{
- long now = 0;
- try (BufferList<Expiring> collect = new BufferList<>())
- {
- int i = 0;
- try
- {
- lock.lock();
- try
- {
- now = time.elapsed(MICROSECONDS);
- timeouts.advance(now, collect, BufferList::add);
-
- // prepare expiration while we hold the lock - this is
to permit expiring objects to
- // reschedule themselves while returning some
immediate expiry work to do
- for (int j = 0 ; j < collect.size() ; ++j)
- {
- Expiring in = collect.get(j);
- Expiring out = in.prepareToExpire(this, now);
- if (in != out)
- collect.set(j, out);
- }
- }
- finally
- {
- lock.unlock();
- }
-
- while (i < collect.size())
- collect.get(i++).onExpire(now);
- }
- catch (Throwable t)
- {
- while (i < collect.size())
- {
- try
- {
- collect.get(i++).onExpire(now);
- }
- catch (Throwable t2)
- {
- t.addSuppressed(t2);
- }
- }
- throw t;
- }
- }
- }
-
- protected void register(AbstractRegistered register, long deadline)
- {
- timeouts.add(deadline, register);
+ long now = time.elapsed(MICROSECONDS);
+ lock();
+ unlock(now);
}
@Override
@@ -160,7 +112,7 @@ public class AbstractRequestTimeouts<S extends
AbstractRequestTimeouts.Stripe> i
try
{
Registered registered = new Registered(timeout);
- register(registered, deadline);
+ timeouts.add(deadline, registered);
return registered;
}
finally
@@ -171,6 +123,7 @@ public class AbstractRequestTimeouts<S extends
AbstractRequestTimeouts.Stripe> i
protected void lock()
{
+ //noinspection LockAcquiredButNotSafelyReleased
lock.lock();
}
@@ -179,29 +132,60 @@ public class AbstractRequestTimeouts<S extends
AbstractRequestTimeouts.Stripe> i
return lock.tryLock();
}
- protected void unlock()
+ protected void unlock(long now)
{
+ int i = 0;
+ BufferList<Expiring> expire = null;
try
{
- if (timeouts.shouldWake(time.elapsed(MICROSECONDS)))
- run();
- }
- finally
- {
- lock.unlock();
- }
- }
+ try
+ {
+ if (!timeouts.shouldWake(now))
+ return;
- protected void unlock(long now)
- {
- try
+ expire = new BufferList<>();
+ timeouts.advance(now, expire, BufferList::add);
+
+ // prepare expiration while we hold the lock - this is to
permit expiring objects to
+ // reschedule themselves while returning some immediate
expiry work to do
+ for (int j = 0; j < expire.size(); ++j)
+ {
+ Expiring in = expire.get(j);
+ Expiring out = in.prepareToExpire();
+ if (in != out)
+ expire.set(j, out);
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ // we want to process these without the lock
+ while (i < expire.size())
+ expire.get(i++).onExpire();
+ }
+ catch (Throwable t)
{
- if (timeouts.shouldWake(now))
- run();
+ if (expire != null)
+ {
+ while (i < expire.size())
+ {
+ try
+ {
+ expire.get(i++).onExpire();
+ }
+ catch (Throwable t2)
+ {
+ t.addSuppressed(t2);
+ }
+ }
+ }
}
finally
{
- lock.unlock();
+ if (expire != null)
+ expire.close();
}
}
}
@@ -228,7 +212,7 @@ public class AbstractRequestTimeouts<S extends
AbstractRequestTimeouts.Stripe> i
public RegisteredTimeout register(Timeout timeout, long delay, TimeUnit
units)
{
long now = time.elapsed(MICROSECONDS);
- long deadline = now + Math.max(1, MICROSECONDS.convert(delay, units));
+ long deadline = now + Math.max(1, units.toMicros(delay));
int i = timeout.stripe() & (stripes.length - 1);
while (true)
{
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index 071987c6..bfcdb8f5 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -39,7 +39,7 @@ import accord.utils.Invariants;
import accord.utils.btree.BTree;
import accord.utils.btree.BTreeRemoval;
-// TODO (required): evict to disk
+// TODO (expected): evict to disk
public class DefaultLocalListeners implements LocalListeners
{
public static class Factory implements LocalListeners.Factory
@@ -462,6 +462,7 @@ public class DefaultLocalListeners implements LocalListeners
private void notifyComplexListeners(SafeCommandStore safeStore,
SafeCommand safeCommand)
{
+ // TODO (expected): potential for lock inversion on notify calls;
consider buffering notifies as we do elsewhere
complexListeners.compute(safeCommand.txnId(), (id, cur) -> {
if (cur == null)
return null;
diff --git a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
index f6ea93cc..f3644061 100644
--- a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
+++ b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
@@ -21,6 +21,9 @@ package accord.impl;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import accord.local.AgentExecutor;
import accord.local.Node;
import accord.local.TimeService;
@@ -32,6 +35,8 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS;
public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.CallbackStripe>
{
+ private static final Logger logger =
LoggerFactory.getLogger(RequestCallbacks.class);
+
public interface CallbackEntry
{
long registeredAt(TimeUnit units);
@@ -73,26 +78,26 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
}
@Override
- public Expiring prepareToExpire(Stripe stripe, long now)
+ public Expiring prepareToExpire()
{
-
Invariants.checkState(((CallbackStripe)stripe).callbacks.containsKey(callbackId));
- if (now >= reportFailAt)
+ if (deadline() == reportFailAt)
{
- ((CallbackStripe)stripe).callbacks.remove(callbackId);
+ callbacks.remove(callbackId);
return this;
}
- stripe.register(this, reportFailAt);
+ Invariants.checkState(callbacks.containsKey(callbackId));
+ timeouts.add(reportFailAt, this);
return new Expiring()
{
@Override
- public Expiring prepareToExpire(Stripe stripe, long now)
+ public Expiring prepareToExpire()
{
return this;
}
@Override
- public void onExpire(long now)
+ public void onExpire()
{
safeInvoke(RegisteredCallback::unsafeOnSlow, null);
}
@@ -100,7 +105,7 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
}
@Override
- public void onExpire(long now)
+ public void onExpire()
{
safeInvoke(RegisteredCallback::unsafeOnFailure, new
accord.coordinate.Timeout(null, null));
}
@@ -122,6 +127,8 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
<P> void safeInvoke(BiConsumer<RegisteredCallback<T>, P> invoker,
P param)
{
+ // TODO (expected): have executor provide inStore() function
so can invoke immediately
+ // BUT need to be careful no callers fail if we invok to
refactor a little as we cannot safely invoke callbacks before we have marked
them in-flight
executor.execute(() -> {
try
{
@@ -162,8 +169,8 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
{
RegisteredCallback<T> registered = new
RegisteredCallback<>(executor, callbackId, callback, to, now, reportSlowAt,
reportFailAt);
Object existing = callbacks.putIfAbsent(callbackId,
registered);
- timeouts.add(Math.min(reportSlowAt, reportFailAt), registered);
Invariants.checkState(existing == null);
+ timeouts.add(Math.min(reportSlowAt, reportFailAt), registered);
return registered;
}
finally
@@ -184,6 +191,7 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
private <T, P> RegisteredCallback<T> safeInvoke(long callbackId,
Node.Id from, P param, BiConsumer<RegisteredCallback<T>, P> invoker, boolean
remove)
{
+ long now = time.elapsed(MICROSECONDS);
lock();
try
{
@@ -198,7 +206,7 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
}
finally
{
- unlock();
+ unlock(now);
}
}
}
@@ -213,14 +221,12 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
super(time, stripeCount, CallbackStripe[]::new, CallbackStripe::new);
}
- public <T> void register(long callbackId, AgentExecutor executor,
Callback<T> callback, Node.Id to, long failDelay, TimeUnit units)
+ public <T> void registerWithDelay(long callbackId, AgentExecutor executor,
Callback<T> callback, Node.Id to, long failDelay, TimeUnit units)
{
- long now = time.elapsed(MICROSECONDS);
- long failDeadline = now + units.toMicros(failDelay);
- stripes[(int)callbackId & (stripes.length - 1)].register(callbackId,
executor, callback, to, now, failDeadline);
+ registerWithDelay(callbackId, executor, callback, to, Long.MAX_VALUE,
failDelay, units);
}
- public <T> void register(long callbackId, AgentExecutor executor,
Callback<T> callback, Node.Id to, long slowDelay, long failDelay, TimeUnit
units)
+ public <T> void registerWithDelay(long callbackId, AgentExecutor executor,
Callback<T> callback, Node.Id to, long slowDelay, long failDelay, TimeUnit
units)
{
long now = time.elapsed(MICROSECONDS);
long reportFailAt = now + units.toMicros(failDelay);
@@ -228,7 +234,12 @@ public class RequestCallbacks extends
AbstractRequestTimeouts<RequestCallbacks.C
stripes[(int)callbackId & (stripes.length - 1)].register(callbackId,
executor, callback, to, now, reportSlowAt, reportFailAt);
}
- public <T> void register(long callbackId, AgentExecutor executor,
Callback<T> callback, Node.Id to, long now, long reportSlowAt, long
reportFailAt, TimeUnit units)
+ public <T> void registerAt(long callbackId, AgentExecutor executor,
Callback<T> callback, Node.Id to, long now, long reportFailAt, TimeUnit units)
+ {
+ registerAt(callbackId, executor, callback, to, now, Long.MAX_VALUE,
reportFailAt, units);
+ }
+
+ public <T> void registerAt(long callbackId, AgentExecutor executor,
Callback<T> callback, Node.Id to, long now, long reportSlowAt, long
reportFailAt, TimeUnit units)
{
if (units != MICROSECONDS)
{
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index 3b57f409..ab0b4235 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -508,6 +508,7 @@ public class Commands
CommandStore unsafeStore = safeStore.commandStore();
// TODO (required, API): do we care about tracking the write
persistence latency, when this is just a memtable write?
// the only reason it will be slow is because Memtable flushes are
backed-up (which will be reported elsewhere)
+ // TODO (required): this is anyway non-monotonic and milliseconds
granularity
long t0 = safeStore.node().now();
return command.writes().apply(safeStore, applyRanges(safeStore,
command.executeAt()), command.partialTxn())
.flatMap(unused -> unsafeStore.submit(context, ss -> {
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index ee48e239..23b4728f 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -375,7 +375,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
public void withEpoch(long epoch, BiConsumer<Void, Throwable> callback)
{
- if (topology.hasEpoch(epoch))
+ if (topology.hasAtLeastEpoch(epoch))
{
callback.accept(null, null);
}
diff --git
a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
index 1340c882..e71660c7 100644
--- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
@@ -18,12 +18,9 @@
package accord.messages;
-import javax.annotation.Nullable;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.api.Data;
import accord.api.Result;
import accord.local.Node;
import accord.local.SafeCommandStore;
@@ -34,7 +31,6 @@ import accord.primitives.FullRoute;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
-import accord.primitives.Ranges;
import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
@@ -122,12 +118,6 @@ public class ApplyThenWaitUntilApplied extends
WaitUntilApplied
return super.apply(safeStore);
}
- @Override
- protected void onAllSuccess(@Nullable Ranges unavailable, @Nullable Data
data, @Nullable Throwable fail)
- {
- super.onAllSuccess(unavailable, data, fail);
- }
-
@Override
public MessageType type()
{
diff --git a/accord-core/src/main/java/accord/messages/Await.java
b/accord-core/src/main/java/accord/messages/Await.java
index fbece1e6..dfb7ee7e 100644
--- a/accord-core/src/main/java/accord/messages/Await.java
+++ b/accord-core/src/main/java/accord/messages/Await.java
@@ -30,6 +30,7 @@ import accord.api.LocalListeners;
import accord.api.ProgressLog.BlockedUntil;
import accord.api.RemoteListeners;
import accord.api.RequestTimeouts;
+import accord.api.RequestTimeouts.RegisteredTimeout;
import accord.local.Command;
import accord.local.Commands;
import accord.local.Node;
@@ -86,7 +87,7 @@ public class Await implements Request,
MapReduceConsume<SafeCommandStore, Void>,
private transient volatile Collection<LocalListeners.Registered>
syncRegistrations;
private static final AtomicReferenceFieldUpdater<Await,
Collection<LocalListeners.Registered>> syncRegistrationsUpdater =
AtomicReferenceFieldUpdater.newUpdater(Await.class, (Class)Collection.class,
"syncRegistrations");
- private transient RequestTimeouts.RegisteredTimeout timeout;
+ private transient RegisteredTimeout timeout;
// we use exactly -1 to make serialization easy (can increment by 1 and
store a non-negative integer)
private static final int SYNCHRONOUS_CALLBACKID = -1;
@@ -184,22 +185,28 @@ public class Await implements Request,
MapReduceConsume<SafeCommandStore, Void>,
}
else if (callbackId >= 0)
{
+ RemoteListeners.Registration asyncRegistration =
this.asyncRegistration;
AwaitOk reply = asyncRegistration == null || 0 ==
asyncRegistration.done() ? AwaitOk.Ready : AwaitOk.NotReady;
node.reply(replyTo, replyContext, reply, null);
}
else
{
- if (synchronouslyWaitingOn > 0)
+ int waitingOn =
synchronouslyWaitingOnUpdater.decrementAndGet(this);
+ if (waitingOn >= 0)
{
long replyTimeout = node.agent().replyTimeout(replyContext,
MILLISECONDS);
timeout = node.requestTimeouts().register(this, replyTimeout,
MILLISECONDS);
+ if (-1 == synchronouslyWaitingOn)
+ timeout.cancel(); // we could leave a dangling timeout in
this rare race condition
}
- if (-1 == synchronouslyWaitingOnUpdater.decrementAndGet(this))
+ else
+ {
node.reply(replyTo, replyContext, AwaitOk.Ready, null);
+ }
}
}
- synchronized public void timeout()
+ public void timeout()
{
timeout = null;
cancel();
@@ -210,20 +217,17 @@ public class Await implements Request,
MapReduceConsume<SafeCommandStore, Void>,
return txnId.hashCode();
}
- synchronized void cancel()
+ void cancel()
{
- if (timeout != null)
- {
- timeout.cancel();
- timeout = null;
- }
- if (syncRegistrations != null)
- {
- syncRegistrations.forEach(LocalListeners.Registered::cancel);
- syncRegistrations = null;
- }
- if (asyncRegistration != null)
- asyncRegistration = null;
+ RegisteredTimeout cancelTimeout = timeout;
+ Collection<LocalListeners.Registered> cancelRegistrations =
syncRegistrations;
+ if (cancelTimeout != null)
+ cancelTimeout.cancel();
+ if (cancelRegistrations != null)
+ cancelRegistrations.forEach(LocalListeners.Registered::cancel);
+ timeout = null;
+ syncRegistrations = null;
+ asyncRegistration = null;
}
@Override
@@ -249,6 +253,30 @@ public class Await implements Request,
MapReduceConsume<SafeCommandStore, Void>,
}
}
+ @Override
+ public long waitForEpoch()
+ {
+ return txnId.epoch();
+ }
+
+ @Override
+ public boolean notify(SafeCommandStore safeStore, SafeCommand safeCommand)
+ {
+ Command command = safeCommand.current();
+ if (command.saveStatus().compareTo(blockedUntil.minSaveStatus) >= 0)
+ return true;
+
+ if (-1 == synchronouslyWaitingOnUpdater.decrementAndGet(this))
+ {
+ node.reply(replyTo, replyContext, AwaitOk.Ready, null);
+ if (timeout != null)
+ timeout.cancel();
+ syncRegistrations = null;
+ }
+
+ return false;
+ }
+
public static class AsyncAwaitComplete implements Request, PreLoadContext,
Consumer<SafeCommandStore>
{
public final TxnId txnId;
@@ -292,32 +320,8 @@ public class Await implements Request,
MapReduceConsume<SafeCommandStore, Void>,
if (safeCommand == null || safeCommand.current().saveStatus() ==
SaveStatus.Uninitialised)
return;
- Command command = Commands.updateRoute(safeStore, safeCommand,
route);
+ Commands.updateRoute(safeStore, safeCommand, route);
safeStore.progressLog().remoteCallback(safeStore, safeCommand,
newStatus, callbackId, from);
}
}
-
- @Override
- public long waitForEpoch()
- {
- return txnId.epoch();
- }
-
- @Override
- public boolean notify(SafeCommandStore safeStore, SafeCommand safeCommand)
- {
- Command command = safeCommand.current();
- if (command.saveStatus().compareTo(blockedUntil.minSaveStatus) >= 0)
- return true;
-
- if (-1 == synchronouslyWaitingOnUpdater.decrementAndGet(this))
- {
- node.reply(replyTo, replyContext, AwaitOk.Ready, null);
- if (timeout != null)
- timeout.cancel();
- syncRegistrations = null;
- }
-
- return false;
- }
}
diff --git a/accord-core/src/main/java/accord/messages/Commit.java
b/accord-core/src/main/java/accord/messages/Commit.java
index 96930eba..f2b29a65 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -46,6 +46,7 @@ import accord.primitives.Unseekables;
import accord.topology.Topologies;
import accord.topology.Topology;
import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.TriFunction;
import org.agrona.collections.IntHashSet;
@@ -156,16 +157,13 @@ public class Commit extends
TxnRequest.WithUnsynced<CommitOrReadNack>
this.readData = readData;
}
- public static void commitMinimal(Node node, Topologies
coordinateEpochOnly, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route,
Timestamp executeAt, Deps unstableDeps, Callback<ReadReply> callback)
+ public static void commitMinimal(SortedArrayList<Id> contact, Node node,
Topologies stabilise, Topologies all, Ballot ballot, TxnId txnId, Txn txn,
FullRoute<?> route, Timestamp executeAt, Deps unstableDeps, Callback<ReadReply>
callback)
{
- Invariants.checkArgument(coordinateEpochOnly.size() == 1, "Invalid
coordinate epochs: %s", coordinateEpochOnly);
+ Invariants.checkArgument(stabilise.size() == 1, "Invalid coordinate
epochs: %s", stabilise);
// we want to send to everyone, and we want to include all of the
relevant data, but we stabilise on the coordination epoch replica responses
- Topology coordinates = coordinateEpochOnly.forEpoch(txnId.epoch());
- Topologies all = coordinateEpochOnly;
- if (txnId.epoch() != executeAt.epoch())
- all = node.topology().preciseEpochs(route, txnId.epoch(),
executeAt.epoch());
+ Topology coordinates = stabilise.forEpoch(txnId.epoch());
- send(null, (i1, i2) -> true, null, node, coordinates, coordinates,
all, Kind.CommitSlowPath, ballot,
+ send(contact, null, (i1, i2) -> true, null, node, coordinates,
coordinates, all, Kind.CommitSlowPath, ballot,
txnId, txn, route, executeAt, unstableDeps, callback);
}
@@ -179,35 +177,33 @@ public class Commit extends
TxnRequest.WithUnsynced<CommitOrReadNack>
Topology executes = executeEpochOnly.forEpoch(executeAt.epoch());
Topology coordinates = all.forEpoch(txnId.epoch());
- send(readSet, (set, id) -> set.contains(id.id), readScope, node,
coordinates, executes, all, kind, Ballot.ZERO,
+ SortedArrayList<Id> contact = all.nodes().without(all::isFaulty);
+ send(contact, readSet, (set, id) -> set.contains(id.id), readScope,
node, coordinates, executes, all, kind, Ballot.ZERO,
txnId, txn, route, executeAt, stableDeps, callback);
}
- private static <P> void send(P param, BiPredicate<P, Id>
shouldRegisterCallback, @Nullable Participants<?> readScopeIfCallback,
+ private static <P> void send(SortedArrayList<Id> contact, P param,
BiPredicate<P, Id> shouldRegisterCallback, @Nullable Participants<?>
readScopeIfCallback,
Node node, Topology coordinates, Topology
primary, Topologies all, Kind kind, Ballot ballot,
TxnId txnId, @Nullable Txn txn, FullRoute<?>
route, Timestamp executeAt, @Nullable Deps deps,
Callback<ReadReply> callback)
{
- for (Node.Id to : primary.nodes())
- {
- boolean registerCallback = shouldRegisterCallback.test(param, to);
- // if we register a callback, supply the provided readScope (which
may be null)
- Participants<?> readScope = registerCallback ? readScopeIfCallback
: null;
- Commit send = new Commit(kind, to, coordinates, all, txnId, txn,
route, ballot, executeAt, deps, readScope);
- if (registerCallback) node.send(to, send, callback);
- else node.send(to, send);
- }
- if (all.size() > 1)
+ for (Node.Id to : contact)
{
- for (Node.Id to : all.nodes())
+ if (all.size() == 1 || primary.contains(to))
+ {
+ boolean registerCallback = shouldRegisterCallback.test(param,
to);
+ // if we register a callback, supply the provided readScope
(which may be null)
+ Participants<?> readScope = registerCallback ?
readScopeIfCallback : null;
+ Commit send = new Commit(kind, to, coordinates, all, txnId,
txn, route, ballot, executeAt, deps, readScope);
+ if (registerCallback) node.send(to, send, callback);
+ else node.send(to, send);
+ }
+ else
{
- if (!primary.contains(to))
- {
- boolean registerCallback =
shouldRegisterCallback.test(param, to);
- Commit send = new Commit(kind, to, coordinates, all,
txnId, txn, route, ballot, executeAt, deps, (ReadTxnData) null);
- if (registerCallback) node.send(to, send, callback);
- else node.send(to, send);
- }
+ boolean registerCallback = shouldRegisterCallback.test(param,
to);
+ Commit send = new Commit(kind, to, coordinates, all, txnId,
txn, route, ballot, executeAt, deps, (ReadTxnData) null);
+ if (registerCallback) node.send(to, send, callback);
+ else node.send(to, send);
}
}
}
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java
b/accord-core/src/main/java/accord/messages/ReadData.java
index 5d75f0af..d403f48e 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -60,7 +60,7 @@ public abstract class ReadData extends
AbstractEpochRequest<ReadData.CommitOrRea
private static final Logger logger =
LoggerFactory.getLogger(ReadData.class);
private enum State { PENDING, RETURNED, OBSOLETE }
- protected enum Action { WAIT, EXECUTE, OBSOLETE }
+ protected enum StoreAction { WAIT, EXECUTE, OBSOLETE }
public enum ReadType
{
@@ -165,108 +165,120 @@ public abstract class ReadData extends
AbstractEpochRequest<ReadData.CommitOrRea
@Override
- public synchronized CommitOrReadNack apply(SafeCommandStore safeStore)
+ public CommitOrReadNack apply(SafeCommandStore safeStore)
{
StoreParticipants participants = StoreParticipants.execute(safeStore,
readScope, txnId, executeAtEpoch);
SafeCommand safeCommand = safeStore.get(txnId, participants);
return apply(safeStore, safeCommand, participants);
}
- protected synchronized CommitOrReadNack apply(SafeCommandStore safeStore,
SafeCommand safeCommand, StoreParticipants participants)
+ protected CommitOrReadNack apply(SafeCommandStore safeStore, SafeCommand
safeCommand, StoreParticipants participants)
{
- if (state != State.PENDING)
- return null;
-
- Command command = safeCommand.current();
- SaveStatus status = command.saveStatus();
- int storeId = safeStore.commandStore().id();
-
- logger.trace("{}: setting up read with status {} on {}", txnId,
status, safeStore);
- switch (actionForStatus(status))
+ synchronized (this)
{
- default: throw new AssertionError();
- case WAIT:
- registrations.put(storeId, safeStore.register(txnId, this));
- waitingOn.set(storeId);
- ++waitingOnCount;
+ if (state != State.PENDING)
+ return null;
- int c = status.compareTo(SaveStatus.Stable);
- if (c < 0) safeStore.progressLog().waiting(HasStableDeps,
safeStore, safeCommand, participants.route(), participants.owns());
- else if (c > 0 && status.compareTo(executeOn().min) >= 0 &&
status.compareTo(SaveStatus.PreApplied) < 0)
safeStore.progressLog().waiting(CanApply, safeStore, safeCommand, null,
readScope);
- return status.compareTo(SaveStatus.Stable) >= 0 ? null :
Insufficient;
+ Command command = safeCommand.current();
+ SaveStatus status = command.saveStatus();
+ int storeId = safeStore.commandStore().id();
- case OBSOLETE:
- state = State.OBSOLETE;
- return Redundant;
-
- case EXECUTE:
- registrations.put(storeId, safeStore.register(txnId, this));
- waitingOn.set(storeId);
- ++waitingOnCount;
- reading.set(storeId);
- read(safeStore, safeCommand.current());
- return null;
+ logger.trace("{}: setting up read with status {} on {}", txnId,
status, safeStore);
+ switch (actionForStatus(status))
+ {
+ default: throw new AssertionError();
+ case WAIT:
+ registrations.put(storeId, safeStore.register(txnId,
this));
+ waitingOn.set(storeId);
+ ++waitingOnCount;
+
+ int c = status.compareTo(SaveStatus.Stable);
+ if (c < 0) safeStore.progressLog().waiting(HasStableDeps,
safeStore, safeCommand, participants.route(), participants.owns());
+ else if (c > 0 && status.compareTo(executeOn().min) >= 0
&& status.compareTo(SaveStatus.PreApplied) < 0)
safeStore.progressLog().waiting(CanApply, safeStore, safeCommand, null,
readScope);
+ return status.compareTo(SaveStatus.Stable) >= 0 ? null :
Insufficient;
+
+ case OBSOLETE:
+ state = State.OBSOLETE;
+ return Redundant;
+
+ case EXECUTE:
+ registrations.put(storeId, safeStore.register(txnId,
this));
+ waitingOn.set(storeId);
+ ++waitingOnCount;
+ reading.set(storeId);
+ }
}
+
+ read(safeStore, safeCommand.current());
+ return null;
}
- protected final Action actionForStatus(SaveStatus status)
+ protected final StoreAction actionForStatus(SaveStatus status)
{
ExecuteOn executeOn = executeOn();
- if (status.compareTo(executeOn.min) < 0) return Action.WAIT;
- if (status.compareTo(executeOn.max) > 0) return Action.OBSOLETE;
- return Action.EXECUTE;
+ if (status.compareTo(executeOn.min) < 0) return StoreAction.WAIT;
+ if (status.compareTo(executeOn.max) > 0) return StoreAction.OBSOLETE;
+ return StoreAction.EXECUTE;
}
@Override
- public synchronized boolean notify(SafeCommandStore safeStore, SafeCommand
safeCommand)
+ public boolean notify(SafeCommandStore safeStore, SafeCommand safeCommand)
{
Command command = safeCommand.current();
logger.trace("{}: updating as listener in response to change on {}
with status {} ({})",
this, command.txnId(), command.status(), command);
- int storeId = safeStore.commandStore().id();
- if (state != State.PENDING)
- return false;
-
- switch (actionForStatus(command.saveStatus()))
+ boolean execute;
+ synchronized (this)
{
- default: throw new AssertionError("Unhandled Action: " +
actionForStatus(command.saveStatus()));
- case WAIT:
- return true;
-
- case OBSOLETE:
- onFailure(Redundant, null);
+ int storeId = safeStore.commandStore().id();
+ if (state != State.PENDING)
return false;
- case EXECUTE:
- if (!reading.get(storeId))
- {
+ switch (actionForStatus(command.saveStatus()))
+ {
+ default: throw new AssertionError("Unhandled Action: " +
actionForStatus(command.saveStatus()));
+ case WAIT:
+ return true;
+
+ case OBSOLETE:
+ execute = false;
+ break;
+
+ case EXECUTE:
+ if (reading.get(storeId))
+ return true;
+
if (!waitingOn.get(storeId))
{
waitingOn.set(storeId);
++waitingOnCount;
}
reading.set(storeId);
- logger.trace("{}: executing read", command.txnId());
- read(safeStore, command);
- }
- return true;
+ execute = true;
+ }
+ }
+
+ if (execute)
+ {
+ logger.trace("{}: executing read", command.txnId());
+ read(safeStore, command);
+ return true;
+ }
+ else
+ {
+ onFailure(Redundant, null);
+ return false;
}
}
@Override
- public synchronized void accept(CommitOrReadNack reply, Throwable failure)
+ public void accept(CommitOrReadNack reply, Throwable failure)
{
// Unless failed always ack to indicate setup has completed otherwise
the counter never gets to -1
if ((reply == null || !reply.isFinal()) && failure == null)
{
- onOneSuccess(-1, null);
- if (state == State.PENDING)
- {
- // Time out reads to avoid doing extra work for requests that
will have responses ignored
- long replyTimeout = node.agent().replyTimeout(replyContext,
MILLISECONDS);
- timeout = node.requestTimeouts().register(this, replyTimeout,
MILLISECONDS);
- }
+ onOneSuccess(-1, null, true);
if (reply != null)
node.reply(replyTo, replyContext, reply, null);
}
@@ -307,21 +319,33 @@ public abstract class ReadData extends
AbstractEpochRequest<ReadData.CommitOrRea
});
}
- protected synchronized void readComplete(CommandStore commandStore,
@Nullable Data result, @Nullable Ranges unavailable)
+ protected void readComplete(CommandStore commandStore, @Nullable Data
result, @Nullable Ranges unavailable)
{
- if (state == State.OBSOLETE)
- return;
+ LocalListeners.Registered cancel;
+ Runnable clear;
+ synchronized(this)
+ {
+ if (state == State.OBSOLETE)
+ return;
- logger.trace("{}: read completed on {}", txnId, commandStore);
- if (result != null)
- data = data == null ? result : data.merge(result);
+ logger.trace("{}: read completed on {}", txnId, commandStore);
+ if (result != null)
+ data = data == null ? result : data.merge(result);
- int storeId = commandStore.id();
- registrations.remove(storeId).cancel();
- onOneSuccess(storeId, unavailable);
+ int storeId = commandStore.id();
+ cancel = registrations.remove(storeId);
+ clear = onOneSuccessInternal(storeId, unavailable, false);
+ }
+ cancel.cancel();
+ cleanup(clear);
}
- protected void onOneSuccess(int storeId, @Nullable Ranges newUnavailable)
+ protected void onOneSuccess(int storeId, @Nullable Ranges newUnavailable,
boolean registration)
+ {
+ cleanup(onOneSuccessInternal(storeId, newUnavailable, registration));
+ }
+
+ protected synchronized Runnable onOneSuccessInternal(int storeId,
@Nullable Ranges newUnavailable, boolean registration)
{
if (storeId >= 0)
{
@@ -339,56 +363,93 @@ public abstract class ReadData extends
AbstractEpochRequest<ReadData.CommitOrRea
// wait for -1 to ensure the setup phase has also completed. Setup
calls ack in its callback
// and prevents races where we respond before dispatching all the
required reads (if the reads are
// completing faster than the reads can be setup on all required
shards)
- if (-1 == --waitingOnCount)
- onAllSuccess(this.unavailable, data, null);
- }
+ if (--waitingOnCount >= 0)
+ {
+ if (registration)
+ {
+ // Time out reads to avoid doing extra work for requests that
will have responses ignored
+ long replyTimeout = node.agent().replyTimeout(replyContext,
MILLISECONDS);
+ timeout = node.requestTimeouts().register(this, replyTimeout,
MILLISECONDS);
+ }
+ return null;
+ }
- protected void onAllSuccess(@Nullable Ranges unavailable, @Nullable Data
data, @Nullable Throwable fail)
- {
switch (state)
{
+ default:
+ throw new AssertionError("Unknown state: " + state);
+
case RETURNED:
throw illegalState("ReadOk was sent, yet ack called again");
case OBSOLETE:
- logger.debug("After the read completed for txn {}, the result
was marked obsolete", txnId);
- if (fail != null)
- node.agent().onUncaughtException(fail);
- break;
+ logger.debug("Before the read completed successfully for txn
{}, the result was marked obsolete", txnId);
+ return null;
case PENDING:
state = State.RETURNED;
- node.reply(replyTo, replyContext, fail == null ?
constructReadOk(unavailable, data) : null, fail);
- clear();
- break;
+ node.reply(replyTo, replyContext, constructReadOk(unavailable,
data), null);
+ return clearUnsafe();
+ }
+ }
- default:
- throw new AssertionError("Unknown state: " + state);
+ boolean cancel()
+ {
+ Runnable clear;
+ synchronized (this)
+ {
+ if (state != State.PENDING)
+ return false;
+
+ clear = clearUnsafe();
+ state = State.OBSOLETE;
+ }
+ cleanup(clear);
+ return true;
+ }
+
+ void cleanup()
+ {
+ Runnable clear;
+ synchronized (this)
+ {
+ clear = clearUnsafe();
}
+ cleanup(clear);
}
- void cancel()
+ private static void cleanup(@Nullable Runnable clear)
{
- state = State.OBSOLETE;
- clear();
+ if (clear != null)
+ clear.run();
}
- void clear()
+ @Nullable Runnable clearUnsafe()
{
- if (timeout != null)
- timeout.cancel();
- registrations.forEach((i, r) -> r.cancel());
+ RegisteredTimeout cancelTimeout = timeout;
+ Int2ObjectHashMap<LocalListeners.Registered> cancelRegistrations =
registrations;
+ timeout = null;
+ registrations = null;
waitingOn.clear();
reading.clear();
data = null;
unavailable = null;
- timeout = null;
+
+ if (cancelRegistrations == null && cancelTimeout == null)
+ return null;
+
+ return () -> {
+ if (cancelTimeout != null)
+ cancelTimeout.cancel();
+ if (cancelRegistrations != null)
+ cancelRegistrations.forEach((i, r) -> r.cancel());
+ };
}
- synchronized public void timeout()
+ public void timeout()
{
timeout = null;
- cancel();
+ cleanup();
}
public int stripe()
@@ -396,9 +457,11 @@ public abstract class ReadData extends
AbstractEpochRequest<ReadData.CommitOrRea
return txnId.hashCode();
}
- synchronized void onFailure(CommitOrReadNack failReply, Throwable
throwable)
+ void onFailure(CommitOrReadNack failReply, Throwable throwable)
{
- cancel();
+ if (!cancel())
+ return;
+
if (throwable != null)
{
node.reply(replyTo, replyContext, null, throwable);
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index fe4e210d..2d2e1d96 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -74,7 +74,7 @@ public class WaitUntilApplied extends ReadData
long retryInLaterEpoch =
ReadEphemeralTxnData.retryInLaterEpoch(executeAtEpoch, safeStore, command);
if (retryInLaterEpoch > this.retryInLaterEpoch)
this.retryInLaterEpoch = retryInLaterEpoch;
- onOneSuccess(safeStore.commandStore().id(), unavailable(safeStore,
command));
+ onOneSuccess(safeStore.commandStore().id(), unavailable(safeStore,
command), false);
}
@Override
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index af74ba09..da310314 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -872,6 +872,11 @@ public class TopologyManager
return epochs.get(epoch) != null;
}
+ public boolean hasAtLeastEpoch(long epoch)
+ {
+ return epochs.currentEpoch >= epoch;
+ }
+
public Topology localForEpoch(long epoch)
{
EpochState epochState = epochs.get(epoch);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]