This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 3aaec75 CEP-15 (C*) Integrate accord with repair 3aaec75 is described below commit 3aaec7566e389a0037b93b748867886fb68a0fd0 Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Mon Apr 8 14:32:50 2024 -0700 CEP-15 (C*) Integrate accord with repair Patch by Blake Eggleston; Reviewed by Ariel Weisberg and David Capwell for CASSANDRA-19472 --- .../src/main/java/accord/coordinate/Barrier.java | 34 ++++++++++++++++++---- .../accord/coordinate/CoordinateSyncPoint.java | 2 +- .../accord/coordinate/CoordinationAdapter.java | 6 ++-- .../java/accord/coordinate/ExecuteSyncPoint.java | 18 ++++++++---- .../coordinate/tracking/AbstractSimpleTracker.java | 4 +-- .../coordinate/tracking/AbstractTracker.java | 9 +++--- accord-core/src/main/java/accord/local/Node.java | 22 -------------- .../accord/messages/ApplyThenWaitUntilApplied.java | 14 +++++---- 8 files changed, 62 insertions(+), 47 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/Barrier.java b/accord-core/src/main/java/accord/coordinate/Barrier.java index 8ce342f..fba57e2 100644 --- a/accord-core/src/main/java/accord/coordinate/Barrier.java +++ b/accord-core/src/main/java/accord/coordinate/Barrier.java @@ -18,6 +18,7 @@ package accord.coordinate; +import java.util.function.BiFunction; import javax.annotation.Nonnull; import accord.local.*; @@ -71,8 +72,11 @@ public class Barrier<S extends Seekables<?, ?>> extends AsyncResults.AbstractRes @VisibleForTesting ExistingTransactionCheck existingTransactionCheck; - Barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType) + private final BiFunction<Node, S, AsyncResult<SyncPoint<S>>> syncPoint; + + Barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType, BiFunction<Node, S, AsyncResult<SyncPoint<S>>> syncPoint) { + this.syncPoint = syncPoint; checkArgument(keysOrRanges.domain() == Domain.Key || barrierType.global, "Ranges are only supported with global barriers"); checkArgument(keysOrRanges.size() == 1 || barrierType.global, "Only a single key is supported with local barriers"); this.node = node; @@ -81,9 +85,25 @@ public class Barrier<S extends Seekables<?, ?>> extends AsyncResults.AbstractRes this.barrierType = barrierType; } - public static <S extends Seekables<?, ?>> Barrier<S> barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType) + + /** + * Trigger one of several different kinds of barrier transactions on a key or range with different properties. Barriers ensure that all prior transactions + * have their side effects visible up to some point. + * + * Local barriers will look for a local transaction that was applied in minEpoch or later and returns when one exists or completes. + * It may, but it is not guaranteed to, trigger a global barrier transaction that effects the barrier at all replicas. + * + * A global barrier is guaranteed to create a distributed barrier transaction, and if it is synchronous will not return until the + * transaction has applied at a quorum globally (meaning all dependencies and their side effects are already visible). If it is asynchronous + * it will return once the barrier has been applied locally. + * + * Ranges are only supported for global barriers. + * + * Returns the Timestamp the barrier actually ended up occurring at. Keep in mind for local barriers it doesn't mean a new transaction was created. + */ + public static <S extends Seekables<?, ?>> Barrier<S> barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType, BiFunction<Node, S, AsyncResult<SyncPoint<S>>> syncPoint) { - Barrier<S> barrier = new Barrier(node, keysOrRanges, minEpoch, barrierType); + Barrier<S> barrier = new Barrier(node, keysOrRanges, minEpoch, barrierType, syncPoint); node.topology().awaitEpoch(minEpoch).begin((ignored, failure) -> { if (failure != null) { @@ -95,6 +115,11 @@ public class Barrier<S extends Seekables<?, ?>> extends AsyncResults.AbstractRes return barrier; } + public static <S extends Seekables<?, ?>> Barrier<S> barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType) + { + return barrier(node, keysOrRanges, minEpoch, barrierType, barrierType.async ? CoordinateSyncPoint::inclusive : CoordinateSyncPoint::inclusiveAndAwaitQuorum); + } + private void start() { // It may be possible to use local state to determine that the barrier is already satisfied or @@ -135,8 +160,7 @@ public class Barrier<S extends Seekables<?, ?>> extends AsyncResults.AbstractRes private void createSyncPoint() { - coordinateSyncPoint = barrierType.async ? CoordinateSyncPoint.inclusive(node, seekables) - : CoordinateSyncPoint.inclusiveAndAwaitQuorum(node, seekables); + coordinateSyncPoint = syncPoint.apply(node, seekables); coordinateSyncPoint.addCallback((syncPoint, syncPointFailure) -> { if (syncPointFailure != null) { diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java index 49e170b..0b849a8 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java @@ -88,7 +88,7 @@ public class CoordinateSyncPoint<S extends Seekables<?, ?>> extends CoordinatePr return coordinate(node, Kind.SyncPoint, keysOrRanges, Adapters.inclusiveSyncPointBlocking()); } - private static <S extends Seekables<?, ?>> AsyncResult<SyncPoint<S>> coordinate(Node node, Kind kind, S keysOrRanges, CoordinationAdapter<SyncPoint<S>> adapter) + public static <S extends Seekables<?, ?>> AsyncResult<SyncPoint<S>> coordinate(Node node, Kind kind, S keysOrRanges, CoordinationAdapter<SyncPoint<S>> adapter) { checkArgument(kind == Kind.SyncPoint || kind == ExclusiveSyncPoint); TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain()); diff --git a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java index 00d3460..0b08dd2 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java @@ -23,7 +23,7 @@ import java.util.function.BiConsumer; import javax.annotation.Nullable; import accord.api.Result; -import accord.coordinate.ExecuteSyncPoint.ExecuteAtQuorum; +import accord.coordinate.ExecuteSyncPoint.ExecuteBlocking; import accord.local.Node; import accord.messages.Apply; import accord.primitives.Ballot; @@ -201,7 +201,7 @@ public interface CoordinationAdapter<R> } } - static abstract class AbstractSyncPointAdapter<S extends Seekables<?, ?>> implements CoordinationAdapter<SyncPoint<S>> + public static abstract class AbstractSyncPointAdapter<S extends Seekables<?, ?>> implements CoordinationAdapter<SyncPoint<S>> { void invokeSuccess(Node node, FullRoute<?> route, TxnId txnId, Txn txn, Deps deps, BiConsumer<? super SyncPoint<S>, Throwable> callback) { @@ -268,7 +268,7 @@ public interface CoordinationAdapter<R> @Override public void execute(Node node, Topologies all, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<S>, Throwable> callback) { - ExecuteAtQuorum<S> execute = new ExecuteAtQuorum<S>(node, all, new SyncPoint<>(txnId, deps, (S)txn.keys(), route)); + ExecuteBlocking<S> execute = ExecuteBlocking.atQuorum(node, all, new SyncPoint<>(txnId, deps, (S)txn.keys(), route), executeAt); execute.addCallback(callback); execute.start(); } diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java index 713dd54..f46b2ba 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java @@ -30,6 +30,7 @@ import accord.messages.ReadData.ReadReply; import accord.primitives.Participants; import accord.primitives.Seekables; import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.Writes; import accord.topology.Topologies; @@ -39,15 +40,22 @@ public abstract class ExecuteSyncPoint<S extends Seekables<?, ?>> extends Settab { public static class SyncPointErased extends Throwable {} - public static class ExecuteAtQuorum<S extends Seekables<?, ?>> extends ExecuteSyncPoint<S> + public static class ExecuteBlocking<S extends Seekables<?, ?>> extends ExecuteSyncPoint<S> { - ExecuteAtQuorum(Node node, Topologies topologies, SyncPoint<S> syncPoint) + private final Timestamp executeAt; + public ExecuteBlocking(Node node, AbstractSimpleTracker<?> tracker, SyncPoint<S> syncPoint, Timestamp executeAt) { - super(node, new QuorumTracker(topologies), syncPoint); + super(node, tracker, syncPoint); + this.executeAt = executeAt; + } + + public static <S extends Seekables<?, ?>> ExecuteBlocking<S> atQuorum(Node node, Topologies topologies, SyncPoint<S> syncPoint, Timestamp executeAt) + { + return new ExecuteBlocking<>(node, new QuorumTracker(topologies), syncPoint, executeAt); } @Override - protected void start() + public void start() { Txn txn = node.agent().emptyTxn(syncPoint.syncId.kind(), syncPoint.keysOrRanges); Writes writes = txn.execute(syncPoint.syncId, syncPoint.syncId, null); @@ -55,7 +63,7 @@ public abstract class ExecuteSyncPoint<S extends Seekables<?, ?>> extends Settab node.send(tracker.topologies().nodes(), to -> { Seekables<?, ?> notify = to.equals(node.id()) ? null : syncPoint.keysOrRanges; Participants<?> participants = syncPoint.keysOrRanges.toParticipants(); - return new ApplyThenWaitUntilApplied(to, tracker.topologies(), syncPoint.route(), syncPoint.syncId, txn, syncPoint.waitFor, participants, syncPoint.syncId.epoch(), writes, result, notify); + return new ApplyThenWaitUntilApplied(to, tracker.topologies(), executeAt, syncPoint.route(), syncPoint.syncId, txn, syncPoint.waitFor, participants, syncPoint.syncId.epoch(), writes, result, notify); }, this); } } diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractSimpleTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractSimpleTracker.java index 845f334..b5c1e8c 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractSimpleTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractSimpleTracker.java @@ -27,12 +27,12 @@ import accord.topology.Topologies; public abstract class AbstractSimpleTracker<ST extends ShardTracker> extends AbstractTracker<ST> { - AbstractSimpleTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, Function<Shard, ST> trackerFactory) + public AbstractSimpleTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, Function<Shard, ST> trackerFactory) { super(topologies, arrayFactory, trackerFactory); } - AbstractSimpleTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, ShardFactory<ST> trackerFactory) + public AbstractSimpleTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, ShardFactory<ST> trackerFactory) { super(topologies, arrayFactory, trackerFactory); } diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java index 13880b2..f74bcee 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java @@ -85,11 +85,12 @@ public abstract class AbstractTracker<ST extends ShardTracker> protected final int maxShardsPerEpoch; protected int waitingOnShards; - AbstractTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, Function<Shard, ST> trackerFactory) + public AbstractTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, Function<Shard, ST> trackerFactory) { this(topologies, arrayFactory, (ignore, shard) -> trackerFactory.apply(shard)); } - AbstractTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, ShardFactory<ST> trackerFactory) + + public AbstractTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, ShardFactory<ST> trackerFactory) { Invariants.checkArgument(topologies.totalShards() > 0); int topologyCount = topologies.size(); @@ -126,13 +127,13 @@ public abstract class AbstractTracker<ST extends ShardTracker> protected RequestStatus trySendMore() { throw new UnsupportedOperationException(); } - <T extends AbstractTracker<ST>, P> + protected <T extends AbstractTracker<ST>, P> RequestStatus recordResponse(T self, Id node, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param) { return recordResponse(self, node, function, param, topologies.size()); } - <T extends AbstractTracker<ST>, P> + protected <T extends AbstractTracker<ST>, P> RequestStatus recordResponse(T self, Id node, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param, int topologyLimit) { Invariants.checkState(self == this); // we just accept self as parameter for type safety diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 3bf2186..1605f43 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -47,7 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Agent; -import accord.api.BarrierType; import accord.api.ConfigurationService; import accord.api.ConfigurationService.EpochReady; import accord.api.DataStore; @@ -58,7 +57,6 @@ import accord.api.Result; import accord.api.RoutingKey; import accord.api.Scheduler; import accord.api.TopologySorter; -import accord.coordinate.Barrier; import accord.config.LocalConfig; import accord.coordinate.CoordinateTransaction; import accord.coordinate.MaybeRecover; @@ -560,26 +558,6 @@ public class Node implements ConfigurationService.Listener, NodeTimeService return new TxnId(uniqueNow(), rw, domain); } - /** - * Trigger one of several different kinds of barrier transactions on a key or range with different properties. Barriers ensure that all prior transactions - * have their side effects visible up to some point. - * - * Local barriers will look for a local transaction that was applied in minEpoch or later and returns when one exists or completes. - * It may, but it is not guaranteed to, trigger a global barrier transaction that effects the barrier at all replicas. - * - * A global barrier is guaranteed to create a distributed barrier transaction, and if it is synchronous will not return until the - * transaction has applied at a quorum globally (meaning all dependencies and their side effects are already visible). If it is asynchronous - * it will return once the barrier has been applied locally. - * - * Ranges are only supported for global barriers. - * - * Returns the Timestamp the barrier actually ended up occurring at. Keep in mind for local barriers it doesn't mean a new transaction was created. - */ - public AsyncResult<Timestamp> barrier(Seekables keysOrRanges, long minEpoch, BarrierType barrierType) - { - return Barrier.barrier(this, keysOrRanges, minEpoch, barrierType); - } - public AsyncResult<Result> coordinate(Txn txn) { return coordinate(nextTxnId(txn.kind(), txn.keys().domain()), txn); diff --git a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java index ec06085..21d08cd 100644 --- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java +++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java @@ -38,6 +38,7 @@ import accord.primitives.PartialTxn; import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.Seekables; +import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; import accord.primitives.Writes; @@ -59,12 +60,13 @@ public class ApplyThenWaitUntilApplied extends WaitUntilApplied @SuppressWarnings("unused") public static class SerializerSupport { - public static ApplyThenWaitUntilApplied create(TxnId txnId, Participants<?> readScope, long executeAtEpoch, FullRoute<?> route, PartialTxn txn, PartialDeps deps, Writes writes, Result result, Seekables<?, ?> notify) + public static ApplyThenWaitUntilApplied create(TxnId txnId, Participants<?> readScope, long executeAtEpoch, Timestamp executeAt, FullRoute<?> route, PartialTxn txn, PartialDeps deps, Writes writes, Result result, Seekables<?, ?> notify) { - return new ApplyThenWaitUntilApplied(txnId, readScope, executeAtEpoch, route, txn, deps, writes, result, notify); + return new ApplyThenWaitUntilApplied(txnId, readScope, executeAtEpoch, executeAt, route, txn, deps, writes, result, notify); } } + public final Timestamp executeAt; public final FullRoute<?> route; public final PartialTxn txn; public final PartialDeps deps; @@ -72,9 +74,10 @@ public class ApplyThenWaitUntilApplied extends WaitUntilApplied public final Result result; public final Seekables<?, ?> notify; - public ApplyThenWaitUntilApplied(Node.Id to, Topologies topologies, FullRoute<?> route, TxnId txnId, Txn txn, Deps deps, Participants<?> readScope, long executeAtEpoch, Writes writes, Result result, Seekables<?, ?> notify) + public ApplyThenWaitUntilApplied(Node.Id to, Topologies topologies, Timestamp executeAt, FullRoute<?> route, TxnId txnId, Txn txn, Deps deps, Participants<?> readScope, long executeAtEpoch, Writes writes, Result result, Seekables<?, ?> notify) { super(to, topologies, txnId, readScope, executeAtEpoch); + this.executeAt = executeAt; Ranges slice = computeScope(to, topologies, null, 0, (i,r)->r, Ranges::with); this.route = route; this.txn = txn.slice(slice, true); @@ -84,9 +87,10 @@ public class ApplyThenWaitUntilApplied extends WaitUntilApplied this.notify = notify == null ? null : notify.slice(slice); } - protected ApplyThenWaitUntilApplied(TxnId txnId, Participants<?> readScope, long executeAtEpoch, FullRoute<?> route, PartialTxn txn, PartialDeps deps, Writes writes, Result result, Seekables<?, ?> notify) + protected ApplyThenWaitUntilApplied(TxnId txnId, Participants<?> readScope, long executeAtEpoch, Timestamp executeAt, FullRoute<?> route, PartialTxn txn, PartialDeps deps, Writes writes, Result result, Seekables<?, ?> notify) { super(txnId, readScope, executeAtEpoch); + this.executeAt = executeAt; this.route = route; this.txn = txn; this.deps = deps; @@ -105,7 +109,7 @@ public class ApplyThenWaitUntilApplied extends WaitUntilApplied public CommitOrReadNack apply(SafeCommandStore safeStore) { RoutingKey progressKey = TxnRequest.progressKey(node, txnId.epoch(), txnId, route); - ApplyReply applyReply = Apply.apply(safeStore, txn, txnId, txnId, deps, route, writes, result, progressKey); + ApplyReply applyReply = Apply.apply(safeStore, txn, txnId, executeAt, deps, route, writes, result, progressKey); switch (applyReply) { default: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org