This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7f1d78c2 Fix infinite loop, and notify progress log of sync point
durability while waiting to apply
7f1d78c2 is described below
commit 7f1d78c292c65e992bdb8044e2010634c20536a8
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Nov 19 17:45:40 2024 +0000
Fix infinite loop, and notify progress log of sync point durability while
waiting to apply
patch by Benedict; reviewed by David for CASSANDRA-20125
---
.../accord/coordinate/CoordinateShardDurable.java | 4 +-
.../accord/coordinate/CoordinateSyncPoint.java | 19 ++++-
.../accord/coordinate/CoordinationAdapter.java | 40 ++++------
.../java/accord/coordinate/ExecuteSyncPoint.java | 91 +++++++++++++++++++---
.../main/java/accord/coordinate/MaybeRecover.java | 3 +-
.../src/main/java/accord/coordinate/Persist.java | 15 +---
.../{QuorumTracker.java => QuorumIdTracker.java} | 51 ++++++------
.../accord/coordinate/tracking/QuorumTracker.java | 2 +-
.../java/accord/impl/DurabilityScheduling.java | 1 +
.../src/main/java/accord/messages/Apply.java | 12 +--
.../accord/messages/ApplyThenWaitUntilApplied.java | 14 ++++
.../main/java/accord/messages/InformDurable.java | 20 +++++
.../src/main/java/accord/messages/Propagate.java | 2 +-
.../src/main/java/accord/messages/ReadData.java | 7 ++
.../accord/coordinate/CoordinateSyncPointTest.java | 2 +-
.../src/test/java/accord/impl/list/ListStore.java | 2 +-
16 files changed, 198 insertions(+), 87 deletions(-)
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index e6892529..30b0e9ca 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -18,7 +18,7 @@
package accord.coordinate;
-import accord.coordinate.ExecuteSyncPoint.ExecuteExclusiveSyncPoint;
+import accord.coordinate.ExecuteSyncPoint.ExecuteExclusive;
import accord.coordinate.tracking.AllTracker;
import accord.coordinate.tracking.RequestStatus;
import accord.local.Node;
@@ -32,7 +32,7 @@ import accord.utils.Invariants;
import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.async.AsyncResult;
-public class CoordinateShardDurable extends ExecuteExclusiveSyncPoint
implements Callback<ReadReply>
+public class CoordinateShardDurable extends ExecuteExclusive implements
Callback<ReadReply>
{
private CoordinateShardDurable(Node node, SyncPoint<Range>
exclusiveSyncPoint)
{
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index fc734b44..0c96bd98 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -20,6 +20,8 @@ package accord.coordinate;
import java.util.List;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +29,7 @@ import accord.coordinate.CoordinationAdapter.Adapters;
import accord.coordinate.CoordinationAdapter.Adapters.SyncPointAdapter;
import accord.local.Node;
import accord.messages.Apply;
+import accord.messages.Callback;
import accord.messages.PreAccept.PreAcceptOk;
import accord.primitives.Ballot;
import accord.primitives.Deps;
@@ -176,15 +179,25 @@ public class CoordinateSyncPoint<U extends Unseekable>
extends CoordinatePreAcce
{
// TODO (required): consider, document and add invariants checking if
this topologies is correct in all cases
// (notably ExclusiveSyncPoints should execute in earlier epochs for
durability, but not for fetching )
- Topologies executes = executes(node, syncPoint.route,
syncPoint.syncId);
- sendApply(node, to, syncPoint, executes);
+ sendApply(node, to, syncPoint, executes(node, syncPoint.route,
syncPoint.syncId));
}
+
public static void sendApply(Node node, Node.Id to, SyncPoint<?>
syncPoint, Topologies executes)
+ {
+ sendApply(node, to, syncPoint, executes, null);
+ }
+
+ public static void sendApply(Node node, Node.Id to, SyncPoint<?>
syncPoint, @Nullable Callback<Apply.ApplyReply> callback)
+ {
+ sendApply(node, to, syncPoint, executes(node, syncPoint.route,
syncPoint.syncId), callback);
+ }
+
+ public static void sendApply(Node node, Node.Id to, SyncPoint<?>
syncPoint, Topologies executes, @Nullable Callback<Apply.ApplyReply> callback)
{
TxnId txnId = syncPoint.syncId;
Timestamp executeAt = txnId;
Txn txn = node.agent().emptySystemTxn(txnId.kind(), txnId.domain());
Deps deps = syncPoint.waitFor;
- Apply.sendMaximal(node, to, executes, txnId, syncPoint.route(), txn,
executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId,
executeAt, null), syncPoint.route());
+ Apply.sendMaximal(node, to, executes, txnId, syncPoint.route(), txn,
executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId,
executeAt, null), syncPoint.route(), callback);
}
}
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
index 66d61cba..5c0aaf9f 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
@@ -24,7 +24,7 @@ import javax.annotation.Nullable;
import accord.api.ProtocolModifiers;
import accord.api.Result;
-import accord.coordinate.ExecuteSyncPoint.ExecuteBlocking;
+import accord.coordinate.ExecuteSyncPoint.ExecuteInclusive;
import accord.coordinate.tracking.FastPathTracker;
import accord.coordinate.tracking.PreAcceptTracker;
import accord.coordinate.tracking.QuorumTracker;
@@ -71,6 +71,11 @@ public interface CoordinationAdapter<R>
@Override
public <R> CoordinationAdapter<R> get(TxnId txnId, Step step)
{
+ switch (txnId.kind())
+ {
+ case ExclusiveSyncPoint: return (CoordinationAdapter<R>)
Adapters.exclusiveSyncPoint();
+ case SyncPoint: return (CoordinationAdapter<R>)
Adapters.inclusiveSyncPoint();
+ }
switch (step)
{
default: throw new AssertionError("Unhandled step: " + step);
@@ -219,18 +224,17 @@ public interface CoordinationAdapter<R>
@Override
public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
BiConsumer<? super SyncPoint<U>, Throwable> callback)
{
- Topologies all = forExecution(node, route, txnId, executeAt,
deps);
- persist(node, all, route, txnId, txn, executeAt, deps,
txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null),
callback);
+ persist(node, null, route, txnId, txn, executeAt, deps,
txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null),
callback);
}
@Override
- public void persist(Node node, Topologies any, FullRoute<?> route,
Route<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
Writes writes, Result result, BiConsumer<? super SyncPoint<U>, Throwable>
callback)
+ public void persist(Node node, Topologies ignore, FullRoute<?>
route, Route<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, Deps
deps, Writes writes, Result result, BiConsumer<? super SyncPoint<U>, Throwable>
callback)
{
Topologies all = forExecution(node, route, txnId, executeAt,
deps);
invokeSuccess(node, route, txnId, txn, deps, callback);
new PersistSyncPoint(node, all, txnId, route, txn, executeAt,
deps, writes, result)
- .start(Apply.FACTORY, Maximal, any, writes, result);
+ .start(Apply.FACTORY, Maximal, all, writes, result);
}
}
@@ -283,18 +287,6 @@ public interface CoordinationAdapter<R>
{
return node.topology().preciseEpochs(route, txnId.epoch(),
executeAt.epoch());
}
-
- @Override
- public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
BiConsumer<? super SyncPoint<U>, Throwable> callback)
- {
- Topologies all = forExecution(node, route, txnId, executeAt,
deps);
-
- ExecuteBlocking<U> execute = ExecuteBlocking.atQuorum(node,
all, new SyncPoint<>(txnId, deps, (FullRoute<U>) route), executeAt);
- execute.start();
- addOrExecuteCallback(execute, callback);
- }
-
- protected abstract void addOrExecuteCallback(ExecuteBlocking<U>
execute, BiConsumer<? super SyncPoint<U>, Throwable> callback);
}
/*
@@ -310,14 +302,6 @@ public interface CoordinationAdapter<R>
protected AsyncInclusiveSyncPointAdapter() {
super();
}
-
- @Override
- protected void addOrExecuteCallback(ExecuteBlocking<U> execute,
BiConsumer<? super SyncPoint<U>, Throwable> callback)
- {
- // If this is the async adapter then we want to invoke the
callback immediately
- // and the caller can wait on the txn locally if they want
- callback.accept(execute.syncPoint, null);
- }
}
private static class InclusiveSyncPointBlockingAdapter<U extends
Unseekable> extends AbstractInclusiveSyncPointAdapter<U>
@@ -329,9 +313,13 @@ public interface CoordinationAdapter<R>
}
@Override
- protected void addOrExecuteCallback(ExecuteBlocking<U> execute,
BiConsumer<? super SyncPoint<U>, Throwable> callback)
+ public void execute(Node node, Topologies any, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
BiConsumer<? super SyncPoint<U>, Throwable> callback)
{
+ Topologies all = forExecution(node, route, txnId, executeAt,
deps);
+
+ ExecuteInclusive<U> execute = ExecuteInclusive.atQuorum(node,
all, new SyncPoint<>(txnId, deps, (FullRoute<U>) route), executeAt);
execute.addCallback(callback);
+ execute.start();
}
@Override
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index dc010351..3a76cc85 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -24,17 +24,22 @@ import java.util.function.Function;
import accord.api.Result;
import accord.coordinate.CoordinationAdapter.Adapters;
+import accord.coordinate.tracking.QuorumIdTracker;
+import accord.coordinate.tracking.SimpleTracker;
import accord.coordinate.tracking.QuorumTracker;
import accord.coordinate.tracking.RequestStatus;
-import accord.coordinate.tracking.SimpleTracker;
import accord.local.Node;
+import accord.messages.Apply;
import accord.messages.ApplyThenWaitUntilApplied;
import accord.messages.Callback;
+import accord.messages.InformDurable;
import accord.messages.ReadData;
+import accord.messages.ReadData.CommitOrReadNack;
import accord.messages.ReadData.ReadReply;
import accord.messages.WaitUntilApplied;
import accord.primitives.Participants;
import accord.primitives.Range;
+import accord.primitives.Status;
import accord.primitives.SyncPoint;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
@@ -45,25 +50,84 @@ import accord.utils.Invariants;
import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.async.AsyncResults.SettableResult;
+import static accord.messages.Apply.ApplyReply.Insufficient;
+import static accord.messages.ReadData.CommitOrReadNack.Waiting;
+import static accord.primitives.Status.Durability.Majority;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
public abstract class ExecuteSyncPoint<U extends Unseekable> extends
SettableResult<SyncPoint<U>> implements Callback<ReadReply>
{
public static class SyncPointErased extends Throwable {}
- public static class ExecuteBlocking<U extends Unseekable> extends
ExecuteSyncPoint<U>
+ public static class ExecuteInclusive<U extends Unseekable> extends
ExecuteSyncPoint<U>
{
private final Timestamp executeAt;
- public ExecuteBlocking(Node node, SyncPoint<U> syncPoint,
SimpleTracker<?> tracker, Timestamp executeAt)
+ private final QuorumIdTracker durableTracker;
+ private Callback<Apply.ApplyReply> insufficientCallback;
+
+ public ExecuteInclusive(Node node, SyncPoint<U> syncPoint,
SimpleTracker<?> tracker, Timestamp executeAt)
{
super(node, syncPoint, tracker);
Invariants.checkArgument(!syncPoint.syncId.awaitsOnlyDeps());
this.executeAt = executeAt;
+ this.durableTracker = new QuorumIdTracker(tracker.topologies());
}
- public static <U extends Unseekable> ExecuteBlocking<U> atQuorum(Node
node, Topologies topologies, SyncPoint<U> syncPoint, Timestamp executeAt)
+ public static <U extends Unseekable> ExecuteInclusive<U> atQuorum(Node
node, Topologies topologies, SyncPoint<U> syncPoint, Timestamp executeAt)
{
- return new ExecuteBlocking<>(node, syncPoint, new
QuorumTracker(topologies), executeAt);
+ return new ExecuteInclusive<>(node, syncPoint, new
QuorumTracker(topologies), executeAt);
+ }
+
+ @Override
+ public void onSuccess(Node.Id from, ReadReply reply)
+ {
+ if (isDurableReply(reply))
+ onDurableSuccess(from);
+
+ super.onSuccess(from, reply);
+ }
+
+ private void onDurableSuccess(Node.Id from)
+ {
+ if (durableTracker.recordSuccess(from) == RequestStatus.Success)
+ InformDurable.informHome(node, tracker.topologies(),
syncPoint.syncId, syncPoint.route, executeAt, Majority);
+ }
+
+ private static boolean isDurableReply(ReadReply reply)
+ {
+ if (reply.isOk())
+ return true;
+
+ switch ((CommitOrReadNack) reply)
+ {
+ case Waiting:
+ case Invalid:
+ case Redundant:
+ return true;
+ case Insufficient:
+ case Rejected:
+ return false;
+ }
+ return false;
+ }
+
+ protected void sendApply(Node.Id to)
+ {
+ if (insufficientCallback == null)
+ {
+ insufficientCallback = new Callback<>()
+ {
+ @Override
+ public void onSuccess(Node.Id from, Apply.ApplyReply reply)
+ {
+ if (reply != Insufficient)
+ onDurableSuccess(from);
+ }
+ @Override public void onFailure(Node.Id from, Throwable
failure) {}
+ @Override public void onCallbackFailure(Node.Id from,
Throwable failure) {}
+ };
+ }
+ CoordinateSyncPoint.sendApply(node, to, syncPoint,
insufficientCallback);
}
@Override
@@ -79,16 +143,16 @@ public abstract class ExecuteSyncPoint<U extends
Unseekable> extends SettableRes
}
}
- public static class ExecuteExclusiveSyncPoint extends
ExecuteSyncPoint<Range>
+ public static class ExecuteExclusive extends ExecuteSyncPoint<Range>
{
private long retryInFutureEpoch;
- public ExecuteExclusiveSyncPoint(Node node, SyncPoint<Range>
syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier)
+ public ExecuteExclusive(Node node, SyncPoint<Range> syncPoint,
Function<Topologies, SimpleTracker<?>> trackerSupplier)
{
super(node, syncPoint,
Adapters.exclusiveSyncPoint().forExecution(node, syncPoint.route(),
syncPoint.syncId, syncPoint.syncId, syncPoint.waitFor), trackerSupplier);
Invariants.checkArgument(syncPoint.syncId.kind() ==
ExclusiveSyncPoint);
}
- public ExecuteExclusiveSyncPoint(Node node, SyncPoint<Range>
syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier,
SimpleTracker<?> tracker)
+ public ExecuteExclusive(Node node, SyncPoint<Range> syncPoint,
Function<Topologies, SimpleTracker<?>> trackerSupplier, SimpleTracker<?>
tracker)
{
super(node, syncPoint, trackerSupplier, tracker);
Invariants.checkArgument(syncPoint.syncId.kind() ==
ExclusiveSyncPoint);
@@ -116,7 +180,7 @@ public abstract class ExecuteSyncPoint<U extends
Unseekable> extends SettableRes
{
if (retryInFutureEpoch > tracker.topologies().currentEpoch())
{
- ExecuteExclusiveSyncPoint continuation = new
ExecuteExclusiveSyncPoint(node, syncPoint, trackerSupplier,
trackerSupplier.apply(node.topology().preciseEpochs(syncPoint.route(),
tracker.topologies().currentEpoch(), retryInFutureEpoch)));
+ ExecuteExclusive continuation = new ExecuteExclusive(node,
syncPoint, trackerSupplier,
trackerSupplier.apply(node.topology().preciseEpochs(syncPoint.route(),
tracker.topologies().currentEpoch(), retryInFutureEpoch)));
continuation.addCallback((success, failure) -> {
if (failure == null) trySuccess(success);
else tryFailure(failure);
@@ -170,12 +234,12 @@ public abstract class ExecuteSyncPoint<U extends
Unseekable> extends SettableRes
if (!reply.isOk())
{
- switch ((ReadData.CommitOrReadNack)reply)
+ switch ((CommitOrReadNack)reply)
{
default: throw new AssertionError("Unhandled: " + reply);
case Insufficient:
- CoordinateSyncPoint.sendApply(node, from, syncPoint,
tracker.topologies());
+ sendApply(from);
return;
case Redundant:
@@ -200,6 +264,11 @@ public abstract class ExecuteSyncPoint<U extends
Unseekable> extends SettableRes
trySuccess(syncPoint);
}
+ protected void sendApply(Node.Id to)
+ {
+ CoordinateSyncPoint.sendApply(node, to, syncPoint);
+ }
+
@Override
public synchronized void onFailure(Node.Id from, Throwable failure)
{
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 2c7e1fe3..306e3666 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -30,6 +30,7 @@ import accord.messages.CheckStatus.CheckStatusOk;
import accord.messages.CheckStatus.IncludeInfo;
import static
accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback;
+import static accord.primitives.Status.Durability.Majority;
/**
* A result of null indicates the transaction is globally persistent
@@ -112,7 +113,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
if (hasMadeProgress(full))
{
if (full.durability.isDurable())
-
node.send(topologies.forEpoch(txnId.epoch()).forKey(route.homeKey()).nodes, to
-> new InformDurable(to, topologies, route, txnId, full.executeAtIfKnown(),
full.durability));
+ InformDurable.informHome(node, topologies, txnId,
route, full.executeAtIfKnown(), full.durability);
callback.accept(full.toProgressToken(), null);
}
else
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java
b/accord-core/src/main/java/accord/coordinate/Persist.java
index 1082f631..bce9325d 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -18,9 +18,6 @@
package accord.coordinate;
-import java.util.HashSet;
-import java.util.Set;
-
import accord.api.Result;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.Node;
@@ -37,7 +34,6 @@ import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.topology.Topologies;
-import accord.topology.Topology;
import accord.utils.SortedArrays;
import static accord.coordinate.tracking.RequestStatus.Success;
@@ -55,8 +51,8 @@ public abstract class Persist implements Callback<ApplyReply>
protected final Result result;
protected final FullRoute<?> route;
protected final Topologies topologies;
+ // TODO (expected): track separate ALL and Quorum, so we can report
Universal durability to permit faster GC
protected final QuorumTracker tracker;
- protected final Set<Id> persistedOn;
boolean isDone;
protected Persist(Node node, Topologies all, TxnId txnId, Route<?> sendTo,
Txn txn, Timestamp executeAt, Deps stableDeps, Writes writes, Result result,
FullRoute<?> route)
@@ -72,7 +68,6 @@ public abstract class Persist implements Callback<ApplyReply>
this.route = route;
this.topologies = all;
this.tracker = new QuorumTracker(all);
- this.persistedOn = new HashSet<>();
}
@Override
@@ -83,18 +78,12 @@ public abstract class Persist implements
Callback<ApplyReply>
default: throw new IllegalStateException();
case Redundant:
case Applied:
- persistedOn.add(from);
if (sendTo == route && tracker.recordSuccess(from) == Success)
{
if (!isDone)
{
isDone = true;
- Topologies topologies = tracker.topologies();
- Topology topology = topologies.forEpoch(txnId.epoch());
- int homeShardIndex =
topology.indexForKey(route.homeKey());
- // we can persist only partially if some shards are
already completed; in this case the home shard may not participate
- if (homeShardIndex >= 0)
- node.send(topology.get(homeShardIndex).nodes, to
-> new InformDurable(to, topologies, route, txnId, executeAt, Majority));
+ InformDurable.informHome(node, topologies, txnId,
route, executeAt, Majority);
}
}
break;
diff --git
a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java
similarity index 53%
copy from
accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
copy to
accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java
index ee29a118..c8e9be55 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java
@@ -18,53 +18,60 @@
package accord.coordinate.tracking;
+import java.util.Set;
+
import accord.local.Node;
import accord.topology.Shard;
import accord.topology.Topologies;
+import org.agrona.collections.ObjectHashSet;
-import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
+import static
accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success;
-public class QuorumTracker extends
SimpleTracker<QuorumTracker.QuorumShardTracker> implements ResponseTracker
+public class QuorumIdTracker extends
SimpleTracker<QuorumIdTracker.QuorumIdShardTracker> implements ResponseTracker
{
- public static class QuorumShardTracker extends ShardTracker
+ public static class QuorumIdShardTracker extends ShardTracker
{
- protected int successes;
- protected int failures;
+ protected final Set<Node.Id> successes = new ObjectHashSet<>();
+ protected Set<Node.Id> failures;
- public QuorumShardTracker(Shard shard)
+ public QuorumIdShardTracker(Shard shard)
{
super(shard);
}
- public ShardOutcomes onSuccess(Object ignore)
+ public ShardOutcomes onSuccess(Node.Id from)
{
- return ++successes == shard.slowPathQuorumSize ? Success :
NoChange;
+ return successes.add(from) && successes.size() ==
shard.slowPathQuorumSize ? Success : NoChange;
}
// return true iff hasFailed()
- public ShardOutcomes onFailure(Object ignore)
+ public ShardOutcomes onFailure(Node.Id from)
{
- return ++failures > shard.maxFailures ? Fail : NoChange;
+ if (failures == null)
+ failures = new ObjectHashSet<>();
+ return failures.add(from) && failures.size() == 1 +
shard.maxFailures ? Fail : NoChange;
}
public boolean hasReachedQuorum()
{
- return successes >= shard.slowPathQuorumSize;
+ return successes.size() >= shard.slowPathQuorumSize;
}
boolean hasInFlight()
{
- return successes + failures < shard.rf();
+ return successes.size() + (failures == null ? 0 : failures.size())
< shard.rf();
}
boolean hasFailures()
{
- return failures > 0;
+ return failures != null;
}
boolean hasFailed()
{
- return failures > shard.maxFailures;
+ return failures != null && failures.size() > shard.maxFailures;
}
@Override
@@ -77,39 +84,39 @@ public class QuorumTracker extends
SimpleTracker<QuorumTracker.QuorumShardTracke
}
}
- public QuorumTracker(Topologies topologies)
+ public QuorumIdTracker(Topologies topologies)
{
- super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
+ super(topologies, QuorumIdShardTracker[]::new,
QuorumIdShardTracker::new);
}
public RequestStatus recordSuccess(Node.Id node)
{
- return recordResponse(this, node, QuorumShardTracker::onSuccess, null);
+ return recordResponse(this, node, QuorumIdShardTracker::onSuccess,
node);
}
// return true iff hasFailed()
public RequestStatus recordFailure(Node.Id node)
{
- return recordResponse(this, node, QuorumShardTracker::onFailure, null);
+ return recordResponse(this, node, QuorumIdShardTracker::onFailure,
node);
}
public boolean hasFailures()
{
- return any(QuorumShardTracker::hasFailures);
+ return any(QuorumIdShardTracker::hasFailures);
}
public boolean hasFailed()
{
- return any(QuorumShardTracker::hasFailed);
+ return any(QuorumIdShardTracker::hasFailed);
}
public boolean hasInFlight()
{
- return any(QuorumShardTracker::hasInFlight);
+ return any(QuorumIdShardTracker::hasInFlight);
}
public boolean hasReachedQuorum()
{
- return all(QuorumShardTracker::hasReachedQuorum);
+ return all(QuorumIdShardTracker::hasReachedQuorum);
}
}
diff --git
a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
index ee29a118..6f3faf4b 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
@@ -44,7 +44,7 @@ public class QuorumTracker extends
SimpleTracker<QuorumTracker.QuorumShardTracke
// return true iff hasFailed()
public ShardOutcomes onFailure(Object ignore)
{
- return ++failures > shard.maxFailures ? Fail : NoChange;
+ return failures++ == shard.maxFailures ? Fail : NoChange;
}
public boolean hasReachedQuorum()
diff --git a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
index bd370b93..b79a76ae 100644
--- a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
+++ b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
@@ -307,6 +307,7 @@ public class DurabilityScheduling implements
ConfigurationService.Listener
synchronized (ShardScheduler.this)
{
// TODO (expected): try to recover or invalidate
prior sync point
+ // TODO (required): do not increase number of
splits if due to nodes being *DOWN* (or more quickly recover)
retry();
if (numberOfSplits * 2 <= maxNumberOfSplits)
{
diff --git a/accord-core/src/main/java/accord/messages/Apply.java
b/accord-core/src/main/java/accord/messages/Apply.java
index a8c6e6c5..3d632730 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -113,14 +113,16 @@ public class Apply extends TxnRequest<ApplyReply>
public static void sendMaximal(Node node, Id to, TxnId txnId, Route<?>
sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result,
FullRoute<?> fullRoute)
{
Topologies executes = executes(node, sendTo, executeAt);
- sendMaximal(node, to, executes, txnId, sendTo, txn, executeAt, deps,
writes, result, fullRoute);
+ sendMaximal(node, to, executes, txnId, sendTo, txn, executeAt, deps,
writes, result, fullRoute, null);
}
- public static void sendMaximal(Node node, Id to, Topologies executes,
TxnId txnId, Route<?> sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes
writes, Result result, FullRoute<?> fullRoute)
+ public static void sendMaximal(Node node, Id to, Topologies executes,
TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes
writes, Result result, FullRoute<?> fullRoute, @Nullable Callback<ApplyReply>
callback)
{
- Topologies participates = participates(node, sendTo, txnId, executeAt,
executes);
- // TODO (expected): should this FACTORY be user configurable? Perhaps
via CoordinationAdapter?
- node.send(to, applyMaximal(FACTORY, to, participates, txnId, sendTo,
txn, executeAt, deps, writes, result, fullRoute));
+ Topologies participates = participates(node, route, txnId, executeAt,
executes);
+ // TODO (required): should this FACTORY be overridden by the
implementation???
+ Apply apply = applyMaximal(FACTORY, to, participates, txnId, route,
txn, executeAt, deps, writes, result, fullRoute);
+ if (callback == null) node.send(to, apply);
+ else node.send(to, apply, callback);
}
public static Topologies executes(Node node, Unseekables<?> route,
Timestamp executeAt)
diff --git
a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
index 34b2dac8..df7f8612 100644
--- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
@@ -118,6 +118,20 @@ public class ApplyThenWaitUntilApplied extends
WaitUntilApplied
return super.apply(safeStore);
}
+ @Override
+ public void accept(CommitOrReadNack reply, Throwable failure)
+ {
+ super.accept(reply, failure);
+
+ boolean waiting;
+ synchronized (this)
+ {
+ waiting = waitingOnCount >= 0;
+ }
+ if (waiting && reply == null && failure == null)
+ node.reply(replyTo, replyContext, CommitOrReadNack.Waiting, null);
+ }
+
@Override
public MessageType type()
{
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java
b/accord-core/src/main/java/accord/messages/InformDurable.java
index 061849a9..07424b1c 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -20,6 +20,7 @@ package accord.messages;
import javax.annotation.Nullable;
import accord.local.Commands;
+import accord.local.Node;
import accord.local.Node.Id;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
@@ -30,11 +31,14 @@ import accord.local.StoreParticipants;
import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.topology.Shard;
import accord.topology.Topologies;
+import accord.topology.Topology;
import accord.utils.async.Cancellable;
import static accord.local.PreLoadContext.contextFor;
import static accord.messages.SimpleReply.Ok;
+import static accord.primitives.Status.Durability.Majority;
public class InformDurable extends TxnRequest<Reply> implements PreLoadContext
{
@@ -63,6 +67,22 @@ public class InformDurable extends TxnRequest<Reply>
implements PreLoadContext
this.durability = durability;
}
+ public static void informHome(Node node, Topologies any, TxnId txnId,
Route<?> route, Timestamp executeAt, Durability durability)
+ {
+ long homeEpoch = txnId.epoch();
+ Topology homeEpochTopology = any.forEpoch(homeEpoch);
+ int homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
+ if (homeShardIndex < 0)
+ {
+ homeEpochTopology = node.topology().globalForEpoch(homeEpoch);
+ homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
+ }
+
+ Shard homeShard = homeEpochTopology.get(homeShardIndex);
+ Topologies homeTopology = new Topologies.Single(any, new
Topology(homeEpoch, homeShard));
+ node.send(homeShard.nodes, to -> new InformDurable(to, homeTopology,
route.homeKeyOnlyRoute(), txnId, executeAt, durability));
+ }
+
@Override
public Cancellable submit()
{
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java
b/accord-core/src/main/java/accord/messages/Propagate.java
index 928a97dd..f7f1fa92 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -304,9 +304,9 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
didNotAchieveTarget = owns;
}
- FetchResult current = fetchResult;
while (true)
{
+ FetchResult current = fetchResult;
FetchResult next = current == null ? new FetchResult(achieved,
achievedTarget, didNotAchieveTarget)
: new
FetchResult(achieved.reduce(current.achieved),
achievedTarget.with((Unseekables)current.achievedTarget),
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java
b/accord-core/src/main/java/accord/messages/ReadData.java
index cb5ae063..e21ad511 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -520,10 +520,17 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
* The commit has been rejected due to stale ballot.
*/
Rejected("CommitRejected"),
+
/**
* Either not committed, or not stable
*/
Insufficient("CommitInsufficient"),
+
+ /**
+ * PreApplied successfully, but the request is blocking so waiting to
reply
+ */
+ Waiting("ApplyWaiting"),
+
Redundant("CommitOrReadRedundant");
final String fullname;
diff --git
a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
index f9916144..ab40251f 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
@@ -102,7 +102,7 @@ class CoordinateSyncPointTest
.flatMap(syncPoint ->
// the test uses an
executor that runs everything right away, so this gets called outside the
CommandStore
node.commandStores().forId(0).submit(() -> {
-
ExecuteSyncPoint.ExecuteExclusiveSyncPoint execute = new
ExecuteSyncPoint.ExecuteExclusiveSyncPoint(node, syncPoint, AllTracker::new);
+
ExecuteSyncPoint.ExecuteExclusive execute = new
ExecuteSyncPoint.ExecuteExclusive(node, syncPoint, AllTracker::new);
execute.start();
return execute;
})
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index 022b74f4..495383b9 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -675,7 +675,7 @@ public class ListStore implements DataStore
});
}
- private static class Await extends
ExecuteSyncPoint.ExecuteExclusiveSyncPoint
+ private static class Await extends ExecuteSyncPoint.ExecuteExclusive
{
public Await(Node node, SyncPoint<Range> syncPoint)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]