This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3aaec75  CEP-15 (C*) Integrate accord with repair
3aaec75 is described below

commit 3aaec7566e389a0037b93b748867886fb68a0fd0
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Mon Apr 8 14:32:50 2024 -0700

    CEP-15 (C*) Integrate accord with repair
    
    Patch by Blake Eggleston; Reviewed by Ariel Weisberg and David Capwell for 
CASSANDRA-19472
---
 .../src/main/java/accord/coordinate/Barrier.java   | 34 ++++++++++++++++++----
 .../accord/coordinate/CoordinateSyncPoint.java     |  2 +-
 .../accord/coordinate/CoordinationAdapter.java     |  6 ++--
 .../java/accord/coordinate/ExecuteSyncPoint.java   | 18 ++++++++----
 .../coordinate/tracking/AbstractSimpleTracker.java |  4 +--
 .../coordinate/tracking/AbstractTracker.java       |  9 +++---
 accord-core/src/main/java/accord/local/Node.java   | 22 --------------
 .../accord/messages/ApplyThenWaitUntilApplied.java | 14 +++++----
 8 files changed, 62 insertions(+), 47 deletions(-)

diff --git a/accord-core/src/main/java/accord/coordinate/Barrier.java 
b/accord-core/src/main/java/accord/coordinate/Barrier.java
index 8ce342f..fba57e2 100644
--- a/accord-core/src/main/java/accord/coordinate/Barrier.java
+++ b/accord-core/src/main/java/accord/coordinate/Barrier.java
@@ -18,6 +18,7 @@
 
 package accord.coordinate;
 
+import java.util.function.BiFunction;
 import javax.annotation.Nonnull;
 
 import accord.local.*;
@@ -71,8 +72,11 @@ public class Barrier<S extends Seekables<?, ?>> extends 
AsyncResults.AbstractRes
     @VisibleForTesting
     ExistingTransactionCheck existingTransactionCheck;
 
-    Barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType)
+    private final BiFunction<Node, S, AsyncResult<SyncPoint<S>>> syncPoint;
+
+    Barrier(Node node, S keysOrRanges, long minEpoch, BarrierType barrierType, 
BiFunction<Node, S, AsyncResult<SyncPoint<S>>> syncPoint)
     {
+        this.syncPoint = syncPoint;
         checkArgument(keysOrRanges.domain() == Domain.Key || 
barrierType.global, "Ranges are only supported with global barriers");
         checkArgument(keysOrRanges.size() == 1 || barrierType.global, "Only a 
single key is supported with local barriers");
         this.node = node;
@@ -81,9 +85,25 @@ public class Barrier<S extends Seekables<?, ?>> extends 
AsyncResults.AbstractRes
         this.barrierType = barrierType;
     }
 
-    public static <S extends Seekables<?, ?>> Barrier<S> barrier(Node node, S 
keysOrRanges, long minEpoch, BarrierType barrierType)
+
+    /**
+     * Trigger one of several different kinds of barrier transactions on a key 
or range with different properties. Barriers ensure that all prior transactions
+     * have their side effects visible up to some point.
+     *
+     * Local barriers will look for a local transaction that was applied in 
minEpoch or later and returns when one exists or completes.
+     * It may, but it is not guaranteed to, trigger a global barrier 
transaction that effects the barrier at all replicas.
+     *
+     * A global barrier is guaranteed to create a distributed barrier 
transaction, and if it is synchronous will not return until the
+     * transaction has applied at a quorum globally (meaning all dependencies 
and their side effects are already visible). If it is asynchronous
+     * it will return once the barrier has been applied locally.
+     *
+     * Ranges are only supported for global barriers.
+     *
+     * Returns the Timestamp the barrier actually ended up occurring at. Keep 
in mind for local barriers it doesn't mean a new transaction was created.
+     */
+    public static <S extends Seekables<?, ?>> Barrier<S> barrier(Node node, S 
keysOrRanges, long minEpoch, BarrierType barrierType, BiFunction<Node, S, 
AsyncResult<SyncPoint<S>>> syncPoint)
     {
-        Barrier<S> barrier = new Barrier(node, keysOrRanges, minEpoch, 
barrierType);
+        Barrier<S> barrier = new Barrier(node, keysOrRanges, minEpoch, 
barrierType, syncPoint);
         node.topology().awaitEpoch(minEpoch).begin((ignored, failure) -> {
             if (failure != null)
             {
@@ -95,6 +115,11 @@ public class Barrier<S extends Seekables<?, ?>> extends 
AsyncResults.AbstractRes
         return barrier;
     }
 
+    public static <S extends Seekables<?, ?>> Barrier<S> barrier(Node node, S 
keysOrRanges, long minEpoch, BarrierType barrierType)
+    {
+        return barrier(node, keysOrRanges, minEpoch, barrierType, 
barrierType.async ? CoordinateSyncPoint::inclusive : 
CoordinateSyncPoint::inclusiveAndAwaitQuorum);
+    }
+
     private void start()
     {
         // It may be possible to use local state to determine that the barrier 
is already satisfied or
@@ -135,8 +160,7 @@ public class Barrier<S extends Seekables<?, ?>> extends 
AsyncResults.AbstractRes
 
     private void createSyncPoint()
     {
-        coordinateSyncPoint = barrierType.async ? 
CoordinateSyncPoint.inclusive(node, seekables)
-                                                : 
CoordinateSyncPoint.inclusiveAndAwaitQuorum(node, seekables);
+        coordinateSyncPoint = syncPoint.apply(node, seekables);
         coordinateSyncPoint.addCallback((syncPoint, syncPointFailure) -> {
             if (syncPointFailure != null)
             {
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index 49e170b..0b849a8 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -88,7 +88,7 @@ public class CoordinateSyncPoint<S extends Seekables<?, ?>> 
extends CoordinatePr
         return coordinate(node, Kind.SyncPoint, keysOrRanges, 
Adapters.inclusiveSyncPointBlocking());
     }
 
-    private static <S extends Seekables<?, ?>> AsyncResult<SyncPoint<S>> 
coordinate(Node node, Kind kind, S keysOrRanges, 
CoordinationAdapter<SyncPoint<S>> adapter)
+    public static <S extends Seekables<?, ?>> AsyncResult<SyncPoint<S>> 
coordinate(Node node, Kind kind, S keysOrRanges, 
CoordinationAdapter<SyncPoint<S>> adapter)
     {
         checkArgument(kind == Kind.SyncPoint || kind == ExclusiveSyncPoint);
         TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java 
b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
index 00d3460..0b08dd2 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
@@ -23,7 +23,7 @@ import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
 
 import accord.api.Result;
-import accord.coordinate.ExecuteSyncPoint.ExecuteAtQuorum;
+import accord.coordinate.ExecuteSyncPoint.ExecuteBlocking;
 import accord.local.Node;
 import accord.messages.Apply;
 import accord.primitives.Ballot;
@@ -201,7 +201,7 @@ public interface CoordinationAdapter<R>
             }
         }
 
-        static abstract class AbstractSyncPointAdapter<S extends Seekables<?, 
?>> implements CoordinationAdapter<SyncPoint<S>>
+        public static abstract class AbstractSyncPointAdapter<S extends 
Seekables<?, ?>> implements CoordinationAdapter<SyncPoint<S>>
         {
             void invokeSuccess(Node node, FullRoute<?> route, TxnId txnId, Txn 
txn, Deps deps, BiConsumer<? super SyncPoint<S>, Throwable> callback)
             {
@@ -268,7 +268,7 @@ public interface CoordinationAdapter<R>
             @Override
             public void execute(Node node, Topologies all, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super SyncPoint<S>, Throwable> callback)
             {
-                ExecuteAtQuorum<S> execute = new ExecuteAtQuorum<S>(node, all, 
new SyncPoint<>(txnId, deps, (S)txn.keys(), route));
+                ExecuteBlocking<S> execute = ExecuteBlocking.atQuorum(node, 
all, new SyncPoint<>(txnId, deps, (S)txn.keys(), route), executeAt);
                 execute.addCallback(callback);
                 execute.start();
             }
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index 713dd54..f46b2ba 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -30,6 +30,7 @@ import accord.messages.ReadData.ReadReply;
 import accord.primitives.Participants;
 import accord.primitives.Seekables;
 import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.Writes;
 import accord.topology.Topologies;
@@ -39,15 +40,22 @@ public abstract class ExecuteSyncPoint<S extends 
Seekables<?, ?>> extends Settab
 {
     public static class SyncPointErased extends Throwable {}
 
-    public static class ExecuteAtQuorum<S extends Seekables<?, ?>> extends 
ExecuteSyncPoint<S>
+    public static class ExecuteBlocking<S extends Seekables<?, ?>> extends 
ExecuteSyncPoint<S>
     {
-        ExecuteAtQuorum(Node node, Topologies topologies, SyncPoint<S> 
syncPoint)
+        private final Timestamp executeAt;
+        public ExecuteBlocking(Node node, AbstractSimpleTracker<?> tracker, 
SyncPoint<S> syncPoint, Timestamp executeAt)
         {
-            super(node, new QuorumTracker(topologies), syncPoint);
+            super(node, tracker, syncPoint);
+            this.executeAt = executeAt;
+        }
+
+        public static <S extends Seekables<?, ?>> ExecuteBlocking<S> 
atQuorum(Node node, Topologies topologies, SyncPoint<S> syncPoint, Timestamp 
executeAt)
+        {
+            return  new ExecuteBlocking<>(node, new QuorumTracker(topologies), 
syncPoint, executeAt);
         }
 
         @Override
-        protected void start()
+        public void start()
         {
             Txn txn = node.agent().emptyTxn(syncPoint.syncId.kind(), 
syncPoint.keysOrRanges);
             Writes writes = txn.execute(syncPoint.syncId, syncPoint.syncId, 
null);
@@ -55,7 +63,7 @@ public abstract class ExecuteSyncPoint<S extends Seekables<?, 
?>> extends Settab
             node.send(tracker.topologies().nodes(), to -> {
                 Seekables<?, ?> notify = to.equals(node.id()) ? null : 
syncPoint.keysOrRanges;
                 Participants<?> participants = 
syncPoint.keysOrRanges.toParticipants();
-                return new ApplyThenWaitUntilApplied(to, tracker.topologies(), 
syncPoint.route(), syncPoint.syncId, txn, syncPoint.waitFor, participants, 
syncPoint.syncId.epoch(), writes, result, notify);
+                return new ApplyThenWaitUntilApplied(to, tracker.topologies(), 
executeAt, syncPoint.route(), syncPoint.syncId, txn, syncPoint.waitFor, 
participants, syncPoint.syncId.epoch(), writes, result, notify);
             }, this);
         }
     }
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/AbstractSimpleTracker.java
 
b/accord-core/src/main/java/accord/coordinate/tracking/AbstractSimpleTracker.java
index 845f334..b5c1e8c 100644
--- 
a/accord-core/src/main/java/accord/coordinate/tracking/AbstractSimpleTracker.java
+++ 
b/accord-core/src/main/java/accord/coordinate/tracking/AbstractSimpleTracker.java
@@ -27,12 +27,12 @@ import accord.topology.Topologies;
 
 public abstract class AbstractSimpleTracker<ST extends ShardTracker> extends 
AbstractTracker<ST>
 {
-    AbstractSimpleTracker(Topologies topologies, IntFunction<ST[]> 
arrayFactory, Function<Shard, ST> trackerFactory)
+    public AbstractSimpleTracker(Topologies topologies, IntFunction<ST[]> 
arrayFactory, Function<Shard, ST> trackerFactory)
     {
         super(topologies, arrayFactory, trackerFactory);
     }
 
-    AbstractSimpleTracker(Topologies topologies, IntFunction<ST[]> 
arrayFactory, ShardFactory<ST> trackerFactory)
+    public AbstractSimpleTracker(Topologies topologies, IntFunction<ST[]> 
arrayFactory, ShardFactory<ST> trackerFactory)
     {
         super(topologies, arrayFactory, trackerFactory);
     }
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
index 13880b2..f74bcee 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
@@ -85,11 +85,12 @@ public abstract class AbstractTracker<ST extends 
ShardTracker>
     protected final int maxShardsPerEpoch;
     protected int waitingOnShards;
 
-    AbstractTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, 
Function<Shard, ST> trackerFactory)
+    public AbstractTracker(Topologies topologies, IntFunction<ST[]> 
arrayFactory, Function<Shard, ST> trackerFactory)
     {
         this(topologies, arrayFactory, (ignore, shard) -> 
trackerFactory.apply(shard));
     }
-    AbstractTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, 
ShardFactory<ST> trackerFactory)
+
+    public AbstractTracker(Topologies topologies, IntFunction<ST[]> 
arrayFactory, ShardFactory<ST> trackerFactory)
     {
         Invariants.checkArgument(topologies.totalShards() > 0);
         int topologyCount = topologies.size();
@@ -126,13 +127,13 @@ public abstract class AbstractTracker<ST extends 
ShardTracker>
 
     protected RequestStatus trySendMore() { throw new 
UnsupportedOperationException(); }
 
-    <T extends AbstractTracker<ST>, P>
+    protected <T extends AbstractTracker<ST>, P>
     RequestStatus recordResponse(T self, Id node, BiFunction<? super ST, P, ? 
extends ShardOutcome<? super T>> function, P param)
     {
         return recordResponse(self, node, function, param, topologies.size());
     }
 
-    <T extends AbstractTracker<ST>, P>
+    protected <T extends AbstractTracker<ST>, P>
     RequestStatus recordResponse(T self, Id node, BiFunction<? super ST, P, ? 
extends ShardOutcome<? super T>> function, P param, int topologyLimit)
     {
         Invariants.checkState(self == this); // we just accept self as 
parameter for type safety
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 3bf2186..1605f43 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -47,7 +47,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
-import accord.api.BarrierType;
 import accord.api.ConfigurationService;
 import accord.api.ConfigurationService.EpochReady;
 import accord.api.DataStore;
@@ -58,7 +57,6 @@ import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.api.Scheduler;
 import accord.api.TopologySorter;
-import accord.coordinate.Barrier;
 import accord.config.LocalConfig;
 import accord.coordinate.CoordinateTransaction;
 import accord.coordinate.MaybeRecover;
@@ -560,26 +558,6 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         return new TxnId(uniqueNow(), rw, domain);
     }
 
-    /**
-     * Trigger one of several different kinds of barrier transactions on a key 
or range with different properties. Barriers ensure that all prior transactions
-     * have their side effects visible up to some point.
-     *
-     * Local barriers will look for a local transaction that was applied in 
minEpoch or later and returns when one exists or completes.
-     * It may, but it is not guaranteed to, trigger a global barrier 
transaction that effects the barrier at all replicas.
-     *
-     * A global barrier is guaranteed to create a distributed barrier 
transaction, and if it is synchronous will not return until the
-     * transaction has applied at a quorum globally (meaning all dependencies 
and their side effects are already visible). If it is asynchronous
-     * it will return once the barrier has been applied locally.
-     *
-     * Ranges are only supported for global barriers.
-     *
-     * Returns the Timestamp the barrier actually ended up occurring at. Keep 
in mind for local barriers it doesn't mean a new transaction was created.
-     */
-    public AsyncResult<Timestamp> barrier(Seekables keysOrRanges, long 
minEpoch, BarrierType barrierType)
-    {
-        return Barrier.barrier(this, keysOrRanges, minEpoch, barrierType);
-    }
-
     public AsyncResult<Result> coordinate(Txn txn)
     {
         return coordinate(nextTxnId(txn.kind(), txn.keys().domain()), txn);
diff --git 
a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java 
b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
index ec06085..21d08cd 100644
--- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
@@ -38,6 +38,7 @@ import accord.primitives.PartialTxn;
 import accord.primitives.Participants;
 import accord.primitives.Ranges;
 import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
@@ -59,12 +60,13 @@ public class ApplyThenWaitUntilApplied extends 
WaitUntilApplied
     @SuppressWarnings("unused")
     public static class SerializerSupport
     {
-        public static ApplyThenWaitUntilApplied create(TxnId txnId, 
Participants<?> readScope, long executeAtEpoch, FullRoute<?> route, PartialTxn 
txn, PartialDeps deps, Writes writes, Result result, Seekables<?, ?> notify)
+        public static ApplyThenWaitUntilApplied create(TxnId txnId, 
Participants<?> readScope, long executeAtEpoch, Timestamp executeAt, 
FullRoute<?> route, PartialTxn txn, PartialDeps deps, Writes writes, Result 
result, Seekables<?, ?> notify)
         {
-            return new ApplyThenWaitUntilApplied(txnId, readScope, 
executeAtEpoch, route, txn, deps, writes, result, notify);
+            return new ApplyThenWaitUntilApplied(txnId, readScope, 
executeAtEpoch, executeAt, route, txn, deps, writes, result, notify);
         }
     }
 
+    public final Timestamp executeAt;
     public final FullRoute<?> route;
     public final PartialTxn txn;
     public final PartialDeps deps;
@@ -72,9 +74,10 @@ public class ApplyThenWaitUntilApplied extends 
WaitUntilApplied
     public final Result result;
     public final Seekables<?, ?> notify;
 
-    public ApplyThenWaitUntilApplied(Node.Id to, Topologies topologies, 
FullRoute<?> route, TxnId txnId, Txn txn, Deps deps, Participants<?> readScope, 
long executeAtEpoch, Writes writes, Result result, Seekables<?, ?> notify)
+    public ApplyThenWaitUntilApplied(Node.Id to, Topologies topologies, 
Timestamp executeAt, FullRoute<?> route, TxnId txnId, Txn txn, Deps deps, 
Participants<?> readScope, long executeAtEpoch, Writes writes, Result result, 
Seekables<?, ?> notify)
     {
         super(to, topologies, txnId, readScope, executeAtEpoch);
+        this.executeAt = executeAt;
         Ranges slice = computeScope(to, topologies, null, 0, (i,r)->r, 
Ranges::with);
         this.route = route;
         this.txn = txn.slice(slice, true);
@@ -84,9 +87,10 @@ public class ApplyThenWaitUntilApplied extends 
WaitUntilApplied
         this.notify = notify == null ? null : notify.slice(slice);
     }
 
-    protected ApplyThenWaitUntilApplied(TxnId txnId, Participants<?> 
readScope, long executeAtEpoch, FullRoute<?> route, PartialTxn txn, PartialDeps 
deps, Writes writes, Result result, Seekables<?, ?> notify)
+    protected ApplyThenWaitUntilApplied(TxnId txnId, Participants<?> 
readScope, long executeAtEpoch, Timestamp executeAt, FullRoute<?> route, 
PartialTxn txn, PartialDeps deps, Writes writes, Result result, Seekables<?, ?> 
notify)
     {
         super(txnId, readScope, executeAtEpoch);
+        this.executeAt = executeAt;
         this.route = route;
         this.txn = txn;
         this.deps = deps;
@@ -105,7 +109,7 @@ public class ApplyThenWaitUntilApplied extends 
WaitUntilApplied
     public CommitOrReadNack apply(SafeCommandStore safeStore)
     {
         RoutingKey progressKey = TxnRequest.progressKey(node, txnId.epoch(), 
txnId, route);
-        ApplyReply applyReply = Apply.apply(safeStore, txn, txnId, txnId, 
deps, route, writes, result, progressKey);
+        ApplyReply applyReply = Apply.apply(safeStore, txn, txnId, executeAt, 
deps, route, writes, result, progressKey);
         switch (applyReply)
         {
             default:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to