This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 60dc8166b3a06ba713639ce41e0a7058e290b823 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Tue Nov 19 17:45:40 2024 +0000 Fix infinite loop, and notify progress log of sync point durability while waiting to apply patch by Benedict; reviewed by David for CASSANDRA-20125 --- .../accord/coordinate/CoordinateShardDurable.java | 4 +- .../accord/coordinate/CoordinateSyncPoint.java | 19 ++++- .../accord/coordinate/CoordinationAdapter.java | 40 ++++------ .../java/accord/coordinate/ExecuteSyncPoint.java | 90 +++++++++++++++++++--- .../main/java/accord/coordinate/MaybeRecover.java | 2 +- .../src/main/java/accord/coordinate/Persist.java | 15 +--- .../{QuorumTracker.java => QuorumIdTracker.java} | 51 ++++++------ .../accord/coordinate/tracking/QuorumTracker.java | 2 +- .../java/accord/impl/DurabilityScheduling.java | 1 + .../src/main/java/accord/messages/Apply.java | 12 +-- .../accord/messages/ApplyThenWaitUntilApplied.java | 14 ++++ .../main/java/accord/messages/InformDurable.java | 19 +++++ .../src/main/java/accord/messages/Propagate.java | 2 +- .../src/main/java/accord/messages/ReadData.java | 7 ++ .../accord/coordinate/CoordinateSyncPointTest.java | 2 +- .../src/test/java/accord/impl/list/ListStore.java | 2 +- 16 files changed, 195 insertions(+), 87 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java index e6892529..30b0e9ca 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java @@ -18,7 +18,7 @@ package accord.coordinate; -import accord.coordinate.ExecuteSyncPoint.ExecuteExclusiveSyncPoint; +import accord.coordinate.ExecuteSyncPoint.ExecuteExclusive; import accord.coordinate.tracking.AllTracker; import accord.coordinate.tracking.RequestStatus; import accord.local.Node; @@ -32,7 +32,7 @@ import accord.utils.Invariants; import accord.utils.SortedArrays.SortedArrayList; import accord.utils.async.AsyncResult; -public class CoordinateShardDurable extends ExecuteExclusiveSyncPoint implements Callback<ReadReply> +public class CoordinateShardDurable extends ExecuteExclusive implements Callback<ReadReply> { private CoordinateShardDurable(Node node, SyncPoint<Range> exclusiveSyncPoint) { diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java index fc734b44..0c96bd98 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java @@ -20,6 +20,8 @@ package accord.coordinate; import java.util.List; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +29,7 @@ import accord.coordinate.CoordinationAdapter.Adapters; import accord.coordinate.CoordinationAdapter.Adapters.SyncPointAdapter; import accord.local.Node; import accord.messages.Apply; +import accord.messages.Callback; import accord.messages.PreAccept.PreAcceptOk; import accord.primitives.Ballot; import accord.primitives.Deps; @@ -176,15 +179,25 @@ public class CoordinateSyncPoint<U extends Unseekable> extends CoordinatePreAcce { // TODO (required): consider, document and add invariants checking if this topologies is correct in all cases // (notably ExclusiveSyncPoints should execute in earlier epochs for durability, but not for fetching ) - Topologies executes = executes(node, syncPoint.route, syncPoint.syncId); - sendApply(node, to, syncPoint, executes); + sendApply(node, to, syncPoint, executes(node, syncPoint.route, syncPoint.syncId)); } + public static void sendApply(Node node, Node.Id to, SyncPoint<?> syncPoint, Topologies executes) + { + sendApply(node, to, syncPoint, executes, null); + } + + public static void sendApply(Node node, Node.Id to, SyncPoint<?> syncPoint, @Nullable Callback<Apply.ApplyReply> callback) + { + sendApply(node, to, syncPoint, executes(node, syncPoint.route, syncPoint.syncId), callback); + } + + public static void sendApply(Node node, Node.Id to, SyncPoint<?> syncPoint, Topologies executes, @Nullable Callback<Apply.ApplyReply> callback) { TxnId txnId = syncPoint.syncId; Timestamp executeAt = txnId; Txn txn = node.agent().emptySystemTxn(txnId.kind(), txnId.domain()); Deps deps = syncPoint.waitFor; - Apply.sendMaximal(node, to, executes, txnId, syncPoint.route(), txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), syncPoint.route()); + Apply.sendMaximal(node, to, executes, txnId, syncPoint.route(), txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), syncPoint.route(), callback); } } diff --git a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java index 66d61cba..5c0aaf9f 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java @@ -24,7 +24,7 @@ import javax.annotation.Nullable; import accord.api.ProtocolModifiers; import accord.api.Result; -import accord.coordinate.ExecuteSyncPoint.ExecuteBlocking; +import accord.coordinate.ExecuteSyncPoint.ExecuteInclusive; import accord.coordinate.tracking.FastPathTracker; import accord.coordinate.tracking.PreAcceptTracker; import accord.coordinate.tracking.QuorumTracker; @@ -71,6 +71,11 @@ public interface CoordinationAdapter<R> @Override public <R> CoordinationAdapter<R> get(TxnId txnId, Step step) { + switch (txnId.kind()) + { + case ExclusiveSyncPoint: return (CoordinationAdapter<R>) Adapters.exclusiveSyncPoint(); + case SyncPoint: return (CoordinationAdapter<R>) Adapters.inclusiveSyncPoint(); + } switch (step) { default: throw new AssertionError("Unhandled step: " + step); @@ -219,18 +224,17 @@ public interface CoordinationAdapter<R> @Override public void execute(Node node, Topologies any, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<U>, Throwable> callback) { - Topologies all = forExecution(node, route, txnId, executeAt, deps); - persist(node, all, route, txnId, txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), callback); + persist(node, null, route, txnId, txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), callback); } @Override - public void persist(Node node, Topologies any, FullRoute<?> route, Route<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? super SyncPoint<U>, Throwable> callback) + public void persist(Node node, Topologies ignore, FullRoute<?> route, Route<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? super SyncPoint<U>, Throwable> callback) { Topologies all = forExecution(node, route, txnId, executeAt, deps); invokeSuccess(node, route, txnId, txn, deps, callback); new PersistSyncPoint(node, all, txnId, route, txn, executeAt, deps, writes, result) - .start(Apply.FACTORY, Maximal, any, writes, result); + .start(Apply.FACTORY, Maximal, all, writes, result); } } @@ -283,18 +287,6 @@ public interface CoordinationAdapter<R> { return node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch()); } - - @Override - public void execute(Node node, Topologies any, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<U>, Throwable> callback) - { - Topologies all = forExecution(node, route, txnId, executeAt, deps); - - ExecuteBlocking<U> execute = ExecuteBlocking.atQuorum(node, all, new SyncPoint<>(txnId, deps, (FullRoute<U>) route), executeAt); - execute.start(); - addOrExecuteCallback(execute, callback); - } - - protected abstract void addOrExecuteCallback(ExecuteBlocking<U> execute, BiConsumer<? super SyncPoint<U>, Throwable> callback); } /* @@ -310,14 +302,6 @@ public interface CoordinationAdapter<R> protected AsyncInclusiveSyncPointAdapter() { super(); } - - @Override - protected void addOrExecuteCallback(ExecuteBlocking<U> execute, BiConsumer<? super SyncPoint<U>, Throwable> callback) - { - // If this is the async adapter then we want to invoke the callback immediately - // and the caller can wait on the txn locally if they want - callback.accept(execute.syncPoint, null); - } } private static class InclusiveSyncPointBlockingAdapter<U extends Unseekable> extends AbstractInclusiveSyncPointAdapter<U> @@ -329,9 +313,13 @@ public interface CoordinationAdapter<R> } @Override - protected void addOrExecuteCallback(ExecuteBlocking<U> execute, BiConsumer<? super SyncPoint<U>, Throwable> callback) + public void execute(Node node, Topologies any, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<U>, Throwable> callback) { + Topologies all = forExecution(node, route, txnId, executeAt, deps); + + ExecuteInclusive<U> execute = ExecuteInclusive.atQuorum(node, all, new SyncPoint<>(txnId, deps, (FullRoute<U>) route), executeAt); execute.addCallback(callback); + execute.start(); } @Override diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java index dc010351..493dae68 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java @@ -24,13 +24,17 @@ import java.util.function.Function; import accord.api.Result; import accord.coordinate.CoordinationAdapter.Adapters; +import accord.coordinate.tracking.QuorumIdTracker; +import accord.coordinate.tracking.SimpleTracker; import accord.coordinate.tracking.QuorumTracker; import accord.coordinate.tracking.RequestStatus; -import accord.coordinate.tracking.SimpleTracker; import accord.local.Node; +import accord.messages.Apply; import accord.messages.ApplyThenWaitUntilApplied; import accord.messages.Callback; +import accord.messages.InformDurable; import accord.messages.ReadData; +import accord.messages.ReadData.CommitOrReadNack; import accord.messages.ReadData.ReadReply; import accord.messages.WaitUntilApplied; import accord.primitives.Participants; @@ -45,25 +49,84 @@ import accord.utils.Invariants; import accord.utils.SortedArrays.SortedArrayList; import accord.utils.async.AsyncResults.SettableResult; +import static accord.messages.Apply.ApplyReply.Insufficient; +import static accord.messages.ReadData.CommitOrReadNack.Waiting; +import static accord.primitives.Status.Durability.Majority; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; public abstract class ExecuteSyncPoint<U extends Unseekable> extends SettableResult<SyncPoint<U>> implements Callback<ReadReply> { public static class SyncPointErased extends Throwable {} - public static class ExecuteBlocking<U extends Unseekable> extends ExecuteSyncPoint<U> + public static class ExecuteInclusive<U extends Unseekable> extends ExecuteSyncPoint<U> { private final Timestamp executeAt; - public ExecuteBlocking(Node node, SyncPoint<U> syncPoint, SimpleTracker<?> tracker, Timestamp executeAt) + private final QuorumIdTracker durableTracker; + private Callback<Apply.ApplyReply> insufficientCallback; + + public ExecuteInclusive(Node node, SyncPoint<U> syncPoint, SimpleTracker<?> tracker, Timestamp executeAt) { super(node, syncPoint, tracker); Invariants.checkArgument(!syncPoint.syncId.awaitsOnlyDeps()); this.executeAt = executeAt; + this.durableTracker = new QuorumIdTracker(tracker.topologies()); } - public static <U extends Unseekable> ExecuteBlocking<U> atQuorum(Node node, Topologies topologies, SyncPoint<U> syncPoint, Timestamp executeAt) + public static <U extends Unseekable> ExecuteInclusive<U> atQuorum(Node node, Topologies topologies, SyncPoint<U> syncPoint, Timestamp executeAt) { - return new ExecuteBlocking<>(node, syncPoint, new QuorumTracker(topologies), executeAt); + return new ExecuteInclusive<>(node, syncPoint, new QuorumTracker(topologies), executeAt); + } + + @Override + public void onSuccess(Node.Id from, ReadReply reply) + { + if (isDurableReply(reply)) + onDurableSuccess(from); + + super.onSuccess(from, reply); + } + + private void onDurableSuccess(Node.Id from) + { + if (durableTracker.recordSuccess(from) == RequestStatus.Success) + InformDurable.informHome(node, tracker.topologies(), syncPoint.syncId, syncPoint.route, executeAt, Majority); + } + + private static boolean isDurableReply(ReadReply reply) + { + if (reply.isOk()) + return true; + + switch ((CommitOrReadNack) reply) + { + case Waiting: + case Invalid: + case Redundant: + return true; + case Insufficient: + case Rejected: + return false; + } + return false; + } + + protected void sendApply(Node.Id to) + { + if (insufficientCallback == null) + { + insufficientCallback = new Callback<>() + { + @Override + public void onSuccess(Node.Id from, Apply.ApplyReply reply) + { + if (reply != Insufficient) + onDurableSuccess(from); + } + @Override public void onFailure(Node.Id from, Throwable failure) {} + @Override public void onCallbackFailure(Node.Id from, Throwable failure) {} + }; + } + CoordinateSyncPoint.sendApply(node, to, syncPoint, insufficientCallback); } @Override @@ -79,16 +142,16 @@ public abstract class ExecuteSyncPoint<U extends Unseekable> extends SettableRes } } - public static class ExecuteExclusiveSyncPoint extends ExecuteSyncPoint<Range> + public static class ExecuteExclusive extends ExecuteSyncPoint<Range> { private long retryInFutureEpoch; - public ExecuteExclusiveSyncPoint(Node node, SyncPoint<Range> syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier) + public ExecuteExclusive(Node node, SyncPoint<Range> syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier) { super(node, syncPoint, Adapters.exclusiveSyncPoint().forExecution(node, syncPoint.route(), syncPoint.syncId, syncPoint.syncId, syncPoint.waitFor), trackerSupplier); Invariants.checkArgument(syncPoint.syncId.kind() == ExclusiveSyncPoint); } - public ExecuteExclusiveSyncPoint(Node node, SyncPoint<Range> syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier, SimpleTracker<?> tracker) + public ExecuteExclusive(Node node, SyncPoint<Range> syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier, SimpleTracker<?> tracker) { super(node, syncPoint, trackerSupplier, tracker); Invariants.checkArgument(syncPoint.syncId.kind() == ExclusiveSyncPoint); @@ -116,7 +179,7 @@ public abstract class ExecuteSyncPoint<U extends Unseekable> extends SettableRes { if (retryInFutureEpoch > tracker.topologies().currentEpoch()) { - ExecuteExclusiveSyncPoint continuation = new ExecuteExclusiveSyncPoint(node, syncPoint, trackerSupplier, trackerSupplier.apply(node.topology().preciseEpochs(syncPoint.route(), tracker.topologies().currentEpoch(), retryInFutureEpoch))); + ExecuteExclusive continuation = new ExecuteExclusive(node, syncPoint, trackerSupplier, trackerSupplier.apply(node.topology().preciseEpochs(syncPoint.route(), tracker.topologies().currentEpoch(), retryInFutureEpoch))); continuation.addCallback((success, failure) -> { if (failure == null) trySuccess(success); else tryFailure(failure); @@ -170,12 +233,12 @@ public abstract class ExecuteSyncPoint<U extends Unseekable> extends SettableRes if (!reply.isOk()) { - switch ((ReadData.CommitOrReadNack)reply) + switch ((CommitOrReadNack)reply) { default: throw new AssertionError("Unhandled: " + reply); case Insufficient: - CoordinateSyncPoint.sendApply(node, from, syncPoint, tracker.topologies()); + sendApply(from); return; case Redundant: @@ -200,6 +263,11 @@ public abstract class ExecuteSyncPoint<U extends Unseekable> extends SettableRes trySuccess(syncPoint); } + protected void sendApply(Node.Id to) + { + CoordinateSyncPoint.sendApply(node, to, syncPoint); + } + @Override public synchronized void onFailure(Node.Id from, Throwable failure) { diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java index 2c7e1fe3..4db35c84 100644 --- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java +++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java @@ -112,7 +112,7 @@ public class MaybeRecover extends CheckShards<Route<?>> if (hasMadeProgress(full)) { if (full.durability.isDurable()) - node.send(topologies.forEpoch(txnId.epoch()).forKey(route.homeKey()).nodes, to -> new InformDurable(to, topologies, route, txnId, full.executeAtIfKnown(), full.durability)); + InformDurable.informHome(node, topologies, txnId, route, full.executeAtIfKnown(), full.durability); callback.accept(full.toProgressToken(), null); } else diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java b/accord-core/src/main/java/accord/coordinate/Persist.java index 1082f631..bce9325d 100644 --- a/accord-core/src/main/java/accord/coordinate/Persist.java +++ b/accord-core/src/main/java/accord/coordinate/Persist.java @@ -18,9 +18,6 @@ package accord.coordinate; -import java.util.HashSet; -import java.util.Set; - import accord.api.Result; import accord.coordinate.tracking.QuorumTracker; import accord.local.Node; @@ -37,7 +34,6 @@ import accord.primitives.Txn; import accord.primitives.TxnId; import accord.primitives.Writes; import accord.topology.Topologies; -import accord.topology.Topology; import accord.utils.SortedArrays; import static accord.coordinate.tracking.RequestStatus.Success; @@ -55,8 +51,8 @@ public abstract class Persist implements Callback<ApplyReply> protected final Result result; protected final FullRoute<?> route; protected final Topologies topologies; + // TODO (expected): track separate ALL and Quorum, so we can report Universal durability to permit faster GC protected final QuorumTracker tracker; - protected final Set<Id> persistedOn; boolean isDone; protected Persist(Node node, Topologies all, TxnId txnId, Route<?> sendTo, Txn txn, Timestamp executeAt, Deps stableDeps, Writes writes, Result result, FullRoute<?> route) @@ -72,7 +68,6 @@ public abstract class Persist implements Callback<ApplyReply> this.route = route; this.topologies = all; this.tracker = new QuorumTracker(all); - this.persistedOn = new HashSet<>(); } @Override @@ -83,18 +78,12 @@ public abstract class Persist implements Callback<ApplyReply> default: throw new IllegalStateException(); case Redundant: case Applied: - persistedOn.add(from); if (sendTo == route && tracker.recordSuccess(from) == Success) { if (!isDone) { isDone = true; - Topologies topologies = tracker.topologies(); - Topology topology = topologies.forEpoch(txnId.epoch()); - int homeShardIndex = topology.indexForKey(route.homeKey()); - // we can persist only partially if some shards are already completed; in this case the home shard may not participate - if (homeShardIndex >= 0) - node.send(topology.get(homeShardIndex).nodes, to -> new InformDurable(to, topologies, route, txnId, executeAt, Majority)); + InformDurable.informHome(node, topologies, txnId, route, executeAt, Majority); } } break; diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java similarity index 53% copy from accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java copy to accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java index ee29a118..c8e9be55 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java @@ -18,53 +18,60 @@ package accord.coordinate.tracking; +import java.util.Set; + import accord.local.Node; import accord.topology.Shard; import accord.topology.Topologies; +import org.agrona.collections.ObjectHashSet; -import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*; +import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail; +import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange; +import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success; -public class QuorumTracker extends SimpleTracker<QuorumTracker.QuorumShardTracker> implements ResponseTracker +public class QuorumIdTracker extends SimpleTracker<QuorumIdTracker.QuorumIdShardTracker> implements ResponseTracker { - public static class QuorumShardTracker extends ShardTracker + public static class QuorumIdShardTracker extends ShardTracker { - protected int successes; - protected int failures; + protected final Set<Node.Id> successes = new ObjectHashSet<>(); + protected Set<Node.Id> failures; - public QuorumShardTracker(Shard shard) + public QuorumIdShardTracker(Shard shard) { super(shard); } - public ShardOutcomes onSuccess(Object ignore) + public ShardOutcomes onSuccess(Node.Id from) { - return ++successes == shard.slowPathQuorumSize ? Success : NoChange; + return successes.add(from) && successes.size() == shard.slowPathQuorumSize ? Success : NoChange; } // return true iff hasFailed() - public ShardOutcomes onFailure(Object ignore) + public ShardOutcomes onFailure(Node.Id from) { - return ++failures > shard.maxFailures ? Fail : NoChange; + if (failures == null) + failures = new ObjectHashSet<>(); + return failures.add(from) && failures.size() == 1 + shard.maxFailures ? Fail : NoChange; } public boolean hasReachedQuorum() { - return successes >= shard.slowPathQuorumSize; + return successes.size() >= shard.slowPathQuorumSize; } boolean hasInFlight() { - return successes + failures < shard.rf(); + return successes.size() + (failures == null ? 0 : failures.size()) < shard.rf(); } boolean hasFailures() { - return failures > 0; + return failures != null; } boolean hasFailed() { - return failures > shard.maxFailures; + return failures != null && failures.size() > shard.maxFailures; } @Override @@ -77,39 +84,39 @@ public class QuorumTracker extends SimpleTracker<QuorumTracker.QuorumShardTracke } } - public QuorumTracker(Topologies topologies) + public QuorumIdTracker(Topologies topologies) { - super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new); + super(topologies, QuorumIdShardTracker[]::new, QuorumIdShardTracker::new); } public RequestStatus recordSuccess(Node.Id node) { - return recordResponse(this, node, QuorumShardTracker::onSuccess, null); + return recordResponse(this, node, QuorumIdShardTracker::onSuccess, node); } // return true iff hasFailed() public RequestStatus recordFailure(Node.Id node) { - return recordResponse(this, node, QuorumShardTracker::onFailure, null); + return recordResponse(this, node, QuorumIdShardTracker::onFailure, node); } public boolean hasFailures() { - return any(QuorumShardTracker::hasFailures); + return any(QuorumIdShardTracker::hasFailures); } public boolean hasFailed() { - return any(QuorumShardTracker::hasFailed); + return any(QuorumIdShardTracker::hasFailed); } public boolean hasInFlight() { - return any(QuorumShardTracker::hasInFlight); + return any(QuorumIdShardTracker::hasInFlight); } public boolean hasReachedQuorum() { - return all(QuorumShardTracker::hasReachedQuorum); + return all(QuorumIdShardTracker::hasReachedQuorum); } } diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java index ee29a118..6f3faf4b 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java @@ -44,7 +44,7 @@ public class QuorumTracker extends SimpleTracker<QuorumTracker.QuorumShardTracke // return true iff hasFailed() public ShardOutcomes onFailure(Object ignore) { - return ++failures > shard.maxFailures ? Fail : NoChange; + return failures++ == shard.maxFailures ? Fail : NoChange; } public boolean hasReachedQuorum() diff --git a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java index bd370b93..b79a76ae 100644 --- a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java +++ b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java @@ -307,6 +307,7 @@ public class DurabilityScheduling implements ConfigurationService.Listener synchronized (ShardScheduler.this) { // TODO (expected): try to recover or invalidate prior sync point + // TODO (required): do not increase number of splits if due to nodes being *DOWN* (or more quickly recover) retry(); if (numberOfSplits * 2 <= maxNumberOfSplits) { diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java index a8c6e6c5..3d632730 100644 --- a/accord-core/src/main/java/accord/messages/Apply.java +++ b/accord-core/src/main/java/accord/messages/Apply.java @@ -113,14 +113,16 @@ public class Apply extends TxnRequest<ApplyReply> public static void sendMaximal(Node node, Id to, TxnId txnId, Route<?> sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute) { Topologies executes = executes(node, sendTo, executeAt); - sendMaximal(node, to, executes, txnId, sendTo, txn, executeAt, deps, writes, result, fullRoute); + sendMaximal(node, to, executes, txnId, sendTo, txn, executeAt, deps, writes, result, fullRoute, null); } - public static void sendMaximal(Node node, Id to, Topologies executes, TxnId txnId, Route<?> sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute) + public static void sendMaximal(Node node, Id to, Topologies executes, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute, @Nullable Callback<ApplyReply> callback) { - Topologies participates = participates(node, sendTo, txnId, executeAt, executes); - // TODO (expected): should this FACTORY be user configurable? Perhaps via CoordinationAdapter? - node.send(to, applyMaximal(FACTORY, to, participates, txnId, sendTo, txn, executeAt, deps, writes, result, fullRoute)); + Topologies participates = participates(node, route, txnId, executeAt, executes); + // TODO (required): should this FACTORY be overridden by the implementation??? + Apply apply = applyMaximal(FACTORY, to, participates, txnId, route, txn, executeAt, deps, writes, result, fullRoute); + if (callback == null) node.send(to, apply); + else node.send(to, apply, callback); } public static Topologies executes(Node node, Unseekables<?> route, Timestamp executeAt) diff --git a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java index 34b2dac8..df7f8612 100644 --- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java +++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java @@ -118,6 +118,20 @@ public class ApplyThenWaitUntilApplied extends WaitUntilApplied return super.apply(safeStore); } + @Override + public void accept(CommitOrReadNack reply, Throwable failure) + { + super.accept(reply, failure); + + boolean waiting; + synchronized (this) + { + waiting = waitingOnCount >= 0; + } + if (waiting && reply == null && failure == null) + node.reply(replyTo, replyContext, CommitOrReadNack.Waiting, null); + } + @Override public MessageType type() { diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java index 061849a9..03534eb2 100644 --- a/accord-core/src/main/java/accord/messages/InformDurable.java +++ b/accord-core/src/main/java/accord/messages/InformDurable.java @@ -20,6 +20,7 @@ package accord.messages; import javax.annotation.Nullable; import accord.local.Commands; +import accord.local.Node; import accord.local.Node.Id; import accord.local.PreLoadContext; import accord.local.SafeCommand; @@ -30,7 +31,9 @@ import accord.local.StoreParticipants; import accord.primitives.Route; import accord.primitives.Timestamp; import accord.primitives.TxnId; +import accord.topology.Shard; import accord.topology.Topologies; +import accord.topology.Topology; import accord.utils.async.Cancellable; import static accord.local.PreLoadContext.contextFor; @@ -63,6 +66,22 @@ public class InformDurable extends TxnRequest<Reply> implements PreLoadContext this.durability = durability; } + public static void informHome(Node node, Topologies any, TxnId txnId, Route<?> route, Timestamp executeAt, Durability durability) + { + long homeEpoch = txnId.epoch(); + Topology homeEpochTopology = any.forEpoch(homeEpoch); + int homeShardIndex = homeEpochTopology.indexForKey(route.homeKey()); + if (homeShardIndex < 0) + { + homeEpochTopology = node.topology().globalForEpoch(homeEpoch); + homeShardIndex = homeEpochTopology.indexForKey(route.homeKey()); + } + + Shard homeShard = homeEpochTopology.get(homeShardIndex); + Topologies homeTopology = new Topologies.Single(any, new Topology(homeEpoch, homeShard)); + node.send(homeShard.nodes, to -> new InformDurable(to, homeTopology, route.homeKeyOnlyRoute(), txnId, executeAt, durability)); + } + @Override public Cancellable submit() { diff --git a/accord-core/src/main/java/accord/messages/Propagate.java b/accord-core/src/main/java/accord/messages/Propagate.java index 928a97dd..f7f1fa92 100644 --- a/accord-core/src/main/java/accord/messages/Propagate.java +++ b/accord-core/src/main/java/accord/messages/Propagate.java @@ -304,9 +304,9 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt didNotAchieveTarget = owns; } - FetchResult current = fetchResult; while (true) { + FetchResult current = fetchResult; FetchResult next = current == null ? new FetchResult(achieved, achievedTarget, didNotAchieveTarget) : new FetchResult(achieved.reduce(current.achieved), achievedTarget.with((Unseekables)current.achievedTarget), diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index cb5ae063..e21ad511 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -520,10 +520,17 @@ public abstract class ReadData implements PreLoadContext, Request, MapReduceCons * The commit has been rejected due to stale ballot. */ Rejected("CommitRejected"), + /** * Either not committed, or not stable */ Insufficient("CommitInsufficient"), + + /** + * PreApplied successfully, but the request is blocking so waiting to reply + */ + Waiting("ApplyWaiting"), + Redundant("CommitOrReadRedundant"); final String fullname; diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java index f9916144..ab40251f 100644 --- a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java +++ b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java @@ -102,7 +102,7 @@ class CoordinateSyncPointTest .flatMap(syncPoint -> // the test uses an executor that runs everything right away, so this gets called outside the CommandStore node.commandStores().forId(0).submit(() -> { - ExecuteSyncPoint.ExecuteExclusiveSyncPoint execute = new ExecuteSyncPoint.ExecuteExclusiveSyncPoint(node, syncPoint, AllTracker::new); + ExecuteSyncPoint.ExecuteExclusive execute = new ExecuteSyncPoint.ExecuteExclusive(node, syncPoint, AllTracker::new); execute.start(); return execute; }) diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java index 022b74f4..495383b9 100644 --- a/accord-core/src/test/java/accord/impl/list/ListStore.java +++ b/accord-core/src/test/java/accord/impl/list/ListStore.java @@ -675,7 +675,7 @@ public class ListStore implements DataStore }); } - private static class Await extends ExecuteSyncPoint.ExecuteExclusiveSyncPoint + private static class Await extends ExecuteSyncPoint.ExecuteExclusive { public Await(Node node, SyncPoint<Range> syncPoint) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
