This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7f1d78c2 Fix infinite loop, and notify progress log of sync point 
durability while waiting to apply
7f1d78c2 is described below

commit 7f1d78c292c65e992bdb8044e2010634c20536a8
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Nov 19 17:45:40 2024 +0000

    Fix infinite loop, and notify progress log of sync point durability while 
waiting to apply
    
    patch by Benedict; reviewed by David for CASSANDRA-20125
---
 .../accord/coordinate/CoordinateShardDurable.java  |  4 +-
 .../accord/coordinate/CoordinateSyncPoint.java     | 19 ++++-
 .../accord/coordinate/CoordinationAdapter.java     | 40 ++++------
 .../java/accord/coordinate/ExecuteSyncPoint.java   | 91 +++++++++++++++++++---
 .../main/java/accord/coordinate/MaybeRecover.java  |  3 +-
 .../src/main/java/accord/coordinate/Persist.java   | 15 +---
 .../{QuorumTracker.java => QuorumIdTracker.java}   | 51 ++++++------
 .../accord/coordinate/tracking/QuorumTracker.java  |  2 +-
 .../java/accord/impl/DurabilityScheduling.java     |  1 +
 .../src/main/java/accord/messages/Apply.java       | 12 +--
 .../accord/messages/ApplyThenWaitUntilApplied.java | 14 ++++
 .../main/java/accord/messages/InformDurable.java   | 20 +++++
 .../src/main/java/accord/messages/Propagate.java   |  2 +-
 .../src/main/java/accord/messages/ReadData.java    |  7 ++
 .../accord/coordinate/CoordinateSyncPointTest.java |  2 +-
 .../src/test/java/accord/impl/list/ListStore.java  |  2 +-
 16 files changed, 198 insertions(+), 87 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
index e6892529..30b0e9ca 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateShardDurable.java
@@ -18,7 +18,7 @@
 
 package accord.coordinate;
 
-import accord.coordinate.ExecuteSyncPoint.ExecuteExclusiveSyncPoint;
+import accord.coordinate.ExecuteSyncPoint.ExecuteExclusive;
 import accord.coordinate.tracking.AllTracker;
 import accord.coordinate.tracking.RequestStatus;
 import accord.local.Node;
@@ -32,7 +32,7 @@ import accord.utils.Invariants;
 import accord.utils.SortedArrays.SortedArrayList;
 import accord.utils.async.AsyncResult;
 
-public class CoordinateShardDurable extends ExecuteExclusiveSyncPoint 
implements Callback<ReadReply>
+public class CoordinateShardDurable extends ExecuteExclusive implements 
Callback<ReadReply>
 {
     private CoordinateShardDurable(Node node, SyncPoint<Range> 
exclusiveSyncPoint)
     {
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index fc734b44..0c96bd98 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -20,6 +20,8 @@ package accord.coordinate;
 
 import java.util.List;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,6 +29,7 @@ import accord.coordinate.CoordinationAdapter.Adapters;
 import accord.coordinate.CoordinationAdapter.Adapters.SyncPointAdapter;
 import accord.local.Node;
 import accord.messages.Apply;
+import accord.messages.Callback;
 import accord.messages.PreAccept.PreAcceptOk;
 import accord.primitives.Ballot;
 import accord.primitives.Deps;
@@ -176,15 +179,25 @@ public class CoordinateSyncPoint<U extends Unseekable> 
extends CoordinatePreAcce
     {
         // TODO (required): consider, document and add invariants checking if 
this topologies is correct in all cases
         //  (notably ExclusiveSyncPoints should execute in earlier epochs for 
durability, but not for fetching )
-        Topologies executes = executes(node, syncPoint.route, 
syncPoint.syncId);
-        sendApply(node, to, syncPoint, executes);
+        sendApply(node, to, syncPoint, executes(node, syncPoint.route, 
syncPoint.syncId));
     }
+
     public static void sendApply(Node node, Node.Id to, SyncPoint<?> 
syncPoint, Topologies executes)
+    {
+        sendApply(node, to, syncPoint, executes, null);
+    }
+
+    public static void sendApply(Node node, Node.Id to, SyncPoint<?> 
syncPoint, @Nullable Callback<Apply.ApplyReply> callback)
+    {
+        sendApply(node, to, syncPoint, executes(node, syncPoint.route, 
syncPoint.syncId), callback);
+    }
+
+    public static void sendApply(Node node, Node.Id to, SyncPoint<?> 
syncPoint, Topologies executes, @Nullable Callback<Apply.ApplyReply> callback)
     {
         TxnId txnId = syncPoint.syncId;
         Timestamp executeAt = txnId;
         Txn txn = node.agent().emptySystemTxn(txnId.kind(), txnId.domain());
         Deps deps = syncPoint.waitFor;
-        Apply.sendMaximal(node, to, executes, txnId, syncPoint.route(), txn, 
executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, 
executeAt, null), syncPoint.route());
+        Apply.sendMaximal(node, to, executes, txnId, syncPoint.route(), txn, 
executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, 
executeAt, null), syncPoint.route(), callback);
     }
 }
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java 
b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
index 66d61cba..5c0aaf9f 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
@@ -24,7 +24,7 @@ import javax.annotation.Nullable;
 
 import accord.api.ProtocolModifiers;
 import accord.api.Result;
-import accord.coordinate.ExecuteSyncPoint.ExecuteBlocking;
+import accord.coordinate.ExecuteSyncPoint.ExecuteInclusive;
 import accord.coordinate.tracking.FastPathTracker;
 import accord.coordinate.tracking.PreAcceptTracker;
 import accord.coordinate.tracking.QuorumTracker;
@@ -71,6 +71,11 @@ public interface CoordinationAdapter<R>
         @Override
         public <R> CoordinationAdapter<R> get(TxnId txnId, Step step)
         {
+            switch (txnId.kind())
+            {
+                case ExclusiveSyncPoint: return (CoordinationAdapter<R>) 
Adapters.exclusiveSyncPoint();
+                case SyncPoint: return (CoordinationAdapter<R>) 
Adapters.inclusiveSyncPoint();
+            }
             switch (step)
             {
                 default: throw new AssertionError("Unhandled step: " + step);
@@ -219,18 +224,17 @@ public interface CoordinationAdapter<R>
             @Override
             public void execute(Node node, Topologies any, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
             {
-                Topologies all = forExecution(node, route, txnId, executeAt, 
deps);
-                persist(node, all, route, txnId, txn, executeAt, deps, 
txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), 
callback);
+                persist(node, null, route, txnId, txn, executeAt, deps, 
txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), 
callback);
             }
 
             @Override
-            public void persist(Node node, Topologies any, FullRoute<?> route, 
Route<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result, BiConsumer<? super SyncPoint<U>, Throwable> 
callback)
+            public void persist(Node node, Topologies ignore, FullRoute<?> 
route, Route<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, Deps 
deps, Writes writes, Result result, BiConsumer<? super SyncPoint<U>, Throwable> 
callback)
             {
                 Topologies all = forExecution(node, route, txnId, executeAt, 
deps);
 
                 invokeSuccess(node, route, txnId, txn, deps, callback);
                 new PersistSyncPoint(node, all, txnId, route, txn, executeAt, 
deps, writes, result)
-                    .start(Apply.FACTORY, Maximal, any, writes, result);
+                .start(Apply.FACTORY, Maximal, all, writes, result);
             }
         }
 
@@ -283,18 +287,6 @@ public interface CoordinationAdapter<R>
             {
                 return node.topology().preciseEpochs(route, txnId.epoch(), 
executeAt.epoch());
             }
-
-            @Override
-            public void execute(Node node, Topologies any, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
-            {
-                Topologies all = forExecution(node, route, txnId, executeAt, 
deps);
-
-                ExecuteBlocking<U> execute = ExecuteBlocking.atQuorum(node, 
all, new SyncPoint<>(txnId, deps, (FullRoute<U>) route), executeAt);
-                execute.start();
-                addOrExecuteCallback(execute, callback);
-            }
-
-            protected abstract void addOrExecuteCallback(ExecuteBlocking<U> 
execute, BiConsumer<? super SyncPoint<U>, Throwable> callback);
         }
 
         /*
@@ -310,14 +302,6 @@ public interface CoordinationAdapter<R>
             protected AsyncInclusiveSyncPointAdapter() {
                 super();
             }
-
-            @Override
-            protected void addOrExecuteCallback(ExecuteBlocking<U> execute, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
-            {
-                // If this is the async adapter then we want to invoke the 
callback immediately
-                // and the caller can wait on the txn locally if they want
-                callback.accept(execute.syncPoint, null);
-            }
         }
 
         private static class InclusiveSyncPointBlockingAdapter<U extends 
Unseekable> extends AbstractInclusiveSyncPointAdapter<U>
@@ -329,9 +313,13 @@ public interface CoordinationAdapter<R>
             }
 
             @Override
-            protected void addOrExecuteCallback(ExecuteBlocking<U> execute, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
+            public void execute(Node node, Topologies any, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
             {
+                Topologies all = forExecution(node, route, txnId, executeAt, 
deps);
+
+                ExecuteInclusive<U> execute = ExecuteInclusive.atQuorum(node, 
all, new SyncPoint<>(txnId, deps, (FullRoute<U>) route), executeAt);
                 execute.addCallback(callback);
+                execute.start();
             }
 
             @Override
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index dc010351..3a76cc85 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -24,17 +24,22 @@ import java.util.function.Function;
 
 import accord.api.Result;
 import accord.coordinate.CoordinationAdapter.Adapters;
+import accord.coordinate.tracking.QuorumIdTracker;
+import accord.coordinate.tracking.SimpleTracker;
 import accord.coordinate.tracking.QuorumTracker;
 import accord.coordinate.tracking.RequestStatus;
-import accord.coordinate.tracking.SimpleTracker;
 import accord.local.Node;
+import accord.messages.Apply;
 import accord.messages.ApplyThenWaitUntilApplied;
 import accord.messages.Callback;
+import accord.messages.InformDurable;
 import accord.messages.ReadData;
+import accord.messages.ReadData.CommitOrReadNack;
 import accord.messages.ReadData.ReadReply;
 import accord.messages.WaitUntilApplied;
 import accord.primitives.Participants;
 import accord.primitives.Range;
+import accord.primitives.Status;
 import accord.primitives.SyncPoint;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
@@ -45,25 +50,84 @@ import accord.utils.Invariants;
 import accord.utils.SortedArrays.SortedArrayList;
 import accord.utils.async.AsyncResults.SettableResult;
 
+import static accord.messages.Apply.ApplyReply.Insufficient;
+import static accord.messages.ReadData.CommitOrReadNack.Waiting;
+import static accord.primitives.Status.Durability.Majority;
 import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 
 public abstract class ExecuteSyncPoint<U extends Unseekable> extends 
SettableResult<SyncPoint<U>> implements Callback<ReadReply>
 {
     public static class SyncPointErased extends Throwable {}
 
-    public static class ExecuteBlocking<U extends Unseekable> extends 
ExecuteSyncPoint<U>
+    public static class ExecuteInclusive<U extends Unseekable> extends 
ExecuteSyncPoint<U>
     {
         private final Timestamp executeAt;
-        public ExecuteBlocking(Node node, SyncPoint<U> syncPoint, 
SimpleTracker<?> tracker, Timestamp executeAt)
+        private final QuorumIdTracker durableTracker;
+        private Callback<Apply.ApplyReply> insufficientCallback;
+
+        public ExecuteInclusive(Node node, SyncPoint<U> syncPoint, 
SimpleTracker<?> tracker, Timestamp executeAt)
         {
             super(node, syncPoint, tracker);
             Invariants.checkArgument(!syncPoint.syncId.awaitsOnlyDeps());
             this.executeAt = executeAt;
+            this.durableTracker = new QuorumIdTracker(tracker.topologies());
         }
 
-        public static <U extends Unseekable> ExecuteBlocking<U> atQuorum(Node 
node, Topologies topologies, SyncPoint<U> syncPoint, Timestamp executeAt)
+        public static <U extends Unseekable> ExecuteInclusive<U> atQuorum(Node 
node, Topologies topologies, SyncPoint<U> syncPoint, Timestamp executeAt)
         {
-            return new ExecuteBlocking<>(node, syncPoint, new 
QuorumTracker(topologies), executeAt);
+            return new ExecuteInclusive<>(node, syncPoint, new 
QuorumTracker(topologies), executeAt);
+        }
+
+        @Override
+        public void onSuccess(Node.Id from, ReadReply reply)
+        {
+            if (isDurableReply(reply))
+                onDurableSuccess(from);
+
+            super.onSuccess(from, reply);
+        }
+
+        private void onDurableSuccess(Node.Id from)
+        {
+            if (durableTracker.recordSuccess(from) == RequestStatus.Success)
+                InformDurable.informHome(node, tracker.topologies(), 
syncPoint.syncId, syncPoint.route, executeAt, Majority);
+        }
+
+        private static boolean isDurableReply(ReadReply reply)
+        {
+            if (reply.isOk())
+                return true;
+
+            switch ((CommitOrReadNack) reply)
+            {
+                case Waiting:
+                case Invalid:
+                case Redundant:
+                    return true;
+                case Insufficient:
+                case Rejected:
+                    return false;
+            }
+            return false;
+        }
+
+        protected void sendApply(Node.Id to)
+        {
+            if (insufficientCallback == null)
+            {
+                insufficientCallback = new Callback<>()
+                {
+                    @Override
+                    public void onSuccess(Node.Id from, Apply.ApplyReply reply)
+                    {
+                        if (reply != Insufficient)
+                            onDurableSuccess(from);
+                    }
+                    @Override public void onFailure(Node.Id from, Throwable 
failure) {}
+                    @Override public void onCallbackFailure(Node.Id from, 
Throwable failure) {}
+                };
+            }
+            CoordinateSyncPoint.sendApply(node, to, syncPoint, 
insufficientCallback);
         }
 
         @Override
@@ -79,16 +143,16 @@ public abstract class ExecuteSyncPoint<U extends 
Unseekable> extends SettableRes
         }
     }
 
-    public static class ExecuteExclusiveSyncPoint extends 
ExecuteSyncPoint<Range>
+    public static class ExecuteExclusive extends ExecuteSyncPoint<Range>
     {
         private long retryInFutureEpoch;
-        public ExecuteExclusiveSyncPoint(Node node, SyncPoint<Range> 
syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier)
+        public ExecuteExclusive(Node node, SyncPoint<Range> syncPoint, 
Function<Topologies, SimpleTracker<?>> trackerSupplier)
         {
             super(node, syncPoint, 
Adapters.exclusiveSyncPoint().forExecution(node, syncPoint.route(), 
syncPoint.syncId, syncPoint.syncId, syncPoint.waitFor), trackerSupplier);
             Invariants.checkArgument(syncPoint.syncId.kind() == 
ExclusiveSyncPoint);
         }
 
-        public ExecuteExclusiveSyncPoint(Node node, SyncPoint<Range> 
syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier, 
SimpleTracker<?> tracker)
+        public ExecuteExclusive(Node node, SyncPoint<Range> syncPoint, 
Function<Topologies, SimpleTracker<?>> trackerSupplier, SimpleTracker<?> 
tracker)
         {
             super(node, syncPoint, trackerSupplier, tracker);
             Invariants.checkArgument(syncPoint.syncId.kind() == 
ExclusiveSyncPoint);
@@ -116,7 +180,7 @@ public abstract class ExecuteSyncPoint<U extends 
Unseekable> extends SettableRes
         {
             if (retryInFutureEpoch > tracker.topologies().currentEpoch())
             {
-                ExecuteExclusiveSyncPoint continuation = new 
ExecuteExclusiveSyncPoint(node, syncPoint, trackerSupplier, 
trackerSupplier.apply(node.topology().preciseEpochs(syncPoint.route(), 
tracker.topologies().currentEpoch(), retryInFutureEpoch)));
+                ExecuteExclusive continuation = new ExecuteExclusive(node, 
syncPoint, trackerSupplier, 
trackerSupplier.apply(node.topology().preciseEpochs(syncPoint.route(), 
tracker.topologies().currentEpoch(), retryInFutureEpoch)));
                 continuation.addCallback((success, failure) -> {
                     if (failure == null) trySuccess(success);
                     else tryFailure(failure);
@@ -170,12 +234,12 @@ public abstract class ExecuteSyncPoint<U extends 
Unseekable> extends SettableRes
 
         if (!reply.isOk())
         {
-            switch ((ReadData.CommitOrReadNack)reply)
+            switch ((CommitOrReadNack)reply)
             {
                 default: throw new AssertionError("Unhandled: " + reply);
 
                 case Insufficient:
-                    CoordinateSyncPoint.sendApply(node, from, syncPoint, 
tracker.topologies());
+                    sendApply(from);
                     return;
 
                 case Redundant:
@@ -200,6 +264,11 @@ public abstract class ExecuteSyncPoint<U extends 
Unseekable> extends SettableRes
         trySuccess(syncPoint);
     }
 
+    protected void sendApply(Node.Id to)
+    {
+        CoordinateSyncPoint.sendApply(node, to, syncPoint);
+    }
+
     @Override
     public synchronized void onFailure(Node.Id from, Throwable failure)
     {
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java 
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 2c7e1fe3..306e3666 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -30,6 +30,7 @@ import accord.messages.CheckStatus.CheckStatusOk;
 import accord.messages.CheckStatus.IncludeInfo;
 
 import static 
accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback;
+import static accord.primitives.Status.Durability.Majority;
 
 /**
  * A result of null indicates the transaction is globally persistent
@@ -112,7 +113,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
                     if (hasMadeProgress(full))
                     {
                         if (full.durability.isDurable())
-                            
node.send(topologies.forEpoch(txnId.epoch()).forKey(route.homeKey()).nodes, to 
-> new InformDurable(to, topologies, route, txnId, full.executeAtIfKnown(), 
full.durability));
+                            InformDurable.informHome(node, topologies, txnId, 
route, full.executeAtIfKnown(), full.durability);
                         callback.accept(full.toProgressToken(), null);
                     }
                     else
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java 
b/accord-core/src/main/java/accord/coordinate/Persist.java
index 1082f631..bce9325d 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -18,9 +18,6 @@
 
 package accord.coordinate;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import accord.api.Result;
 import accord.coordinate.tracking.QuorumTracker;
 import accord.local.Node;
@@ -37,7 +34,6 @@ import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.topology.Topologies;
-import accord.topology.Topology;
 import accord.utils.SortedArrays;
 
 import static accord.coordinate.tracking.RequestStatus.Success;
@@ -55,8 +51,8 @@ public abstract class Persist implements Callback<ApplyReply>
     protected final Result result;
     protected final FullRoute<?> route;
     protected final Topologies topologies;
+    // TODO (expected): track separate ALL and Quorum, so we can report 
Universal durability to permit faster GC
     protected final QuorumTracker tracker;
-    protected final Set<Id> persistedOn;
     boolean isDone;
 
     protected Persist(Node node, Topologies all, TxnId txnId, Route<?> sendTo, 
Txn txn, Timestamp executeAt, Deps stableDeps, Writes writes, Result result, 
FullRoute<?> route)
@@ -72,7 +68,6 @@ public abstract class Persist implements Callback<ApplyReply>
         this.route = route;
         this.topologies = all;
         this.tracker = new QuorumTracker(all);
-        this.persistedOn = new HashSet<>();
     }
 
     @Override
@@ -83,18 +78,12 @@ public abstract class Persist implements 
Callback<ApplyReply>
             default: throw new IllegalStateException();
             case Redundant:
             case Applied:
-                persistedOn.add(from);
                 if (sendTo == route && tracker.recordSuccess(from) == Success)
                 {
                     if (!isDone)
                     {
                         isDone = true;
-                        Topologies topologies = tracker.topologies();
-                        Topology topology = topologies.forEpoch(txnId.epoch());
-                        int homeShardIndex = 
topology.indexForKey(route.homeKey());
-                        // we can persist only partially if some shards are 
already completed; in this case the home shard may not participate
-                        if (homeShardIndex >= 0)
-                            node.send(topology.get(homeShardIndex).nodes, to 
-> new InformDurable(to, topologies, route, txnId, executeAt, Majority));
+                        InformDurable.informHome(node, topologies, txnId, 
route, executeAt, Majority);
                     }
                 }
                 break;
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java
similarity index 53%
copy from 
accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
copy to 
accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java
index ee29a118..c8e9be55 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java
@@ -18,53 +18,60 @@
 
 package accord.coordinate.tracking;
 
+import java.util.Set;
+
 import accord.local.Node;
 import accord.topology.Shard;
 import accord.topology.Topologies;
+import org.agrona.collections.ObjectHashSet;
 
-import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
+import static 
accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success;
 
-public class QuorumTracker extends 
SimpleTracker<QuorumTracker.QuorumShardTracker> implements ResponseTracker
+public class QuorumIdTracker extends 
SimpleTracker<QuorumIdTracker.QuorumIdShardTracker> implements ResponseTracker
 {
-    public static class QuorumShardTracker extends ShardTracker
+    public static class QuorumIdShardTracker extends ShardTracker
     {
-        protected int successes;
-        protected int failures;
+        protected final Set<Node.Id> successes = new ObjectHashSet<>();
+        protected Set<Node.Id> failures;
 
-        public QuorumShardTracker(Shard shard)
+        public QuorumIdShardTracker(Shard shard)
         {
             super(shard);
         }
 
-        public ShardOutcomes onSuccess(Object ignore)
+        public ShardOutcomes onSuccess(Node.Id from)
         {
-            return ++successes == shard.slowPathQuorumSize ? Success : 
NoChange;
+            return successes.add(from) && successes.size() == 
shard.slowPathQuorumSize ? Success : NoChange;
         }
 
         // return true iff hasFailed()
-        public ShardOutcomes onFailure(Object ignore)
+        public ShardOutcomes onFailure(Node.Id from)
         {
-            return ++failures > shard.maxFailures ? Fail : NoChange;
+            if (failures == null)
+                failures = new ObjectHashSet<>();
+            return failures.add(from) && failures.size() == 1 + 
shard.maxFailures ? Fail : NoChange;
         }
 
         public boolean hasReachedQuorum()
         {
-            return successes >= shard.slowPathQuorumSize;
+            return successes.size() >= shard.slowPathQuorumSize;
         }
 
         boolean hasInFlight()
         {
-            return successes + failures < shard.rf();
+            return successes.size() + (failures == null ? 0 : failures.size()) 
< shard.rf();
         }
 
         boolean hasFailures()
         {
-            return failures > 0;
+            return failures != null;
         }
 
         boolean hasFailed()
         {
-            return failures > shard.maxFailures;
+            return failures != null && failures.size() > shard.maxFailures;
         }
 
         @Override
@@ -77,39 +84,39 @@ public class QuorumTracker extends 
SimpleTracker<QuorumTracker.QuorumShardTracke
         }
     }
 
-    public QuorumTracker(Topologies topologies)
+    public QuorumIdTracker(Topologies topologies)
     {
-        super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
+        super(topologies, QuorumIdShardTracker[]::new, 
QuorumIdShardTracker::new);
     }
 
     public RequestStatus recordSuccess(Node.Id node)
     {
-        return recordResponse(this, node, QuorumShardTracker::onSuccess, null);
+        return recordResponse(this, node, QuorumIdShardTracker::onSuccess, 
node);
     }
 
     // return true iff hasFailed()
     public RequestStatus recordFailure(Node.Id node)
     {
-        return recordResponse(this, node, QuorumShardTracker::onFailure, null);
+        return recordResponse(this, node, QuorumIdShardTracker::onFailure, 
node);
     }
 
     public boolean hasFailures()
     {
-        return any(QuorumShardTracker::hasFailures);
+        return any(QuorumIdShardTracker::hasFailures);
     }
 
     public boolean hasFailed()
     {
-        return any(QuorumShardTracker::hasFailed);
+        return any(QuorumIdShardTracker::hasFailed);
     }
 
     public boolean hasInFlight()
     {
-        return any(QuorumShardTracker::hasInFlight);
+        return any(QuorumIdShardTracker::hasInFlight);
     }
 
     public boolean hasReachedQuorum()
     {
-        return all(QuorumShardTracker::hasReachedQuorum);
+        return all(QuorumIdShardTracker::hasReachedQuorum);
     }
 }
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
index ee29a118..6f3faf4b 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
@@ -44,7 +44,7 @@ public class QuorumTracker extends 
SimpleTracker<QuorumTracker.QuorumShardTracke
         // return true iff hasFailed()
         public ShardOutcomes onFailure(Object ignore)
         {
-            return ++failures > shard.maxFailures ? Fail : NoChange;
+            return failures++ == shard.maxFailures ? Fail : NoChange;
         }
 
         public boolean hasReachedQuorum()
diff --git a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java 
b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
index bd370b93..b79a76ae 100644
--- a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
+++ b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java
@@ -307,6 +307,7 @@ public class DurabilityScheduling implements 
ConfigurationService.Listener
                         synchronized (ShardScheduler.this)
                         {
                             // TODO (expected): try to recover or invalidate 
prior sync point
+                            // TODO (required): do not increase number of 
splits if due to nodes being *DOWN* (or more quickly recover)
                             retry();
                             if (numberOfSplits * 2 <= maxNumberOfSplits)
                             {
diff --git a/accord-core/src/main/java/accord/messages/Apply.java 
b/accord-core/src/main/java/accord/messages/Apply.java
index a8c6e6c5..3d632730 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -113,14 +113,16 @@ public class Apply extends TxnRequest<ApplyReply>
     public static void sendMaximal(Node node, Id to, TxnId txnId, Route<?> 
sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, 
FullRoute<?> fullRoute)
     {
         Topologies executes = executes(node, sendTo, executeAt);
-        sendMaximal(node, to, executes, txnId, sendTo, txn, executeAt, deps, 
writes, result, fullRoute);
+        sendMaximal(node, to, executes, txnId, sendTo, txn, executeAt, deps, 
writes, result, fullRoute, null);
     }
 
-    public static void sendMaximal(Node node, Id to, Topologies executes, 
TxnId txnId, Route<?> sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes 
writes, Result result, FullRoute<?> fullRoute)
+    public static void sendMaximal(Node node, Id to, Topologies executes, 
TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes 
writes, Result result, FullRoute<?> fullRoute, @Nullable Callback<ApplyReply> 
callback)
     {
-        Topologies participates = participates(node, sendTo, txnId, executeAt, 
executes);
-        // TODO (expected): should this FACTORY be user configurable? Perhaps 
via CoordinationAdapter?
-        node.send(to, applyMaximal(FACTORY, to, participates, txnId, sendTo, 
txn, executeAt, deps, writes, result, fullRoute));
+        Topologies participates = participates(node, route, txnId, executeAt, 
executes);
+        // TODO (required): should this FACTORY be overridden by the 
implementation???
+        Apply apply = applyMaximal(FACTORY, to, participates, txnId, route, 
txn, executeAt, deps, writes, result, fullRoute);
+        if (callback == null) node.send(to, apply);
+        else node.send(to, apply, callback);
     }
 
     public static Topologies executes(Node node, Unseekables<?> route, 
Timestamp executeAt)
diff --git 
a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java 
b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
index 34b2dac8..df7f8612 100644
--- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
@@ -118,6 +118,20 @@ public class ApplyThenWaitUntilApplied extends 
WaitUntilApplied
         return super.apply(safeStore);
     }
 
+    @Override
+    public void accept(CommitOrReadNack reply, Throwable failure)
+    {
+        super.accept(reply, failure);
+
+        boolean waiting;
+        synchronized (this)
+        {
+            waiting = waitingOnCount >= 0;
+        }
+        if (waiting && reply == null && failure == null)
+            node.reply(replyTo, replyContext, CommitOrReadNack.Waiting, null);
+    }
+
     @Override
     public MessageType type()
     {
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java 
b/accord-core/src/main/java/accord/messages/InformDurable.java
index 061849a9..07424b1c 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -20,6 +20,7 @@ package accord.messages;
 import javax.annotation.Nullable;
 
 import accord.local.Commands;
+import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
@@ -30,11 +31,14 @@ import accord.local.StoreParticipants;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.topology.Shard;
 import accord.topology.Topologies;
+import accord.topology.Topology;
 import accord.utils.async.Cancellable;
 
 import static accord.local.PreLoadContext.contextFor;
 import static accord.messages.SimpleReply.Ok;
+import static accord.primitives.Status.Durability.Majority;
 
 public class InformDurable extends TxnRequest<Reply> implements PreLoadContext
 {
@@ -63,6 +67,22 @@ public class InformDurable extends TxnRequest<Reply> 
implements PreLoadContext
         this.durability = durability;
     }
 
+    public static void informHome(Node node, Topologies any, TxnId txnId, 
Route<?> route, Timestamp executeAt, Durability durability)
+    {
+        long homeEpoch = txnId.epoch();
+        Topology homeEpochTopology = any.forEpoch(homeEpoch);
+        int homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
+        if (homeShardIndex < 0)
+        {
+            homeEpochTopology = node.topology().globalForEpoch(homeEpoch);
+            homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
+        }
+
+        Shard homeShard = homeEpochTopology.get(homeShardIndex);
+        Topologies homeTopology = new Topologies.Single(any, new 
Topology(homeEpoch, homeShard));
+        node.send(homeShard.nodes, to -> new InformDurable(to, homeTopology, 
route.homeKeyOnlyRoute(), txnId, executeAt, durability));
+    }
+
     @Override
     public Cancellable submit()
     {
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java 
b/accord-core/src/main/java/accord/messages/Propagate.java
index 928a97dd..f7f1fa92 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -304,9 +304,9 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
             didNotAchieveTarget = owns;
         }
 
-        FetchResult current = fetchResult;
         while (true)
         {
+            FetchResult current = fetchResult;
             FetchResult next = current == null ? new FetchResult(achieved, 
achievedTarget, didNotAchieveTarget)
                                : new 
FetchResult(achieved.reduce(current.achieved),
                                                  
achievedTarget.with((Unseekables)current.achievedTarget),
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index cb5ae063..e21ad511 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -520,10 +520,17 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
          * The commit has been rejected due to stale ballot.
          */
         Rejected("CommitRejected"),
+
         /**
          * Either not committed, or not stable
          */
         Insufficient("CommitInsufficient"),
+
+        /**
+         * PreApplied successfully, but the request is blocking so waiting to 
reply
+         */
+        Waiting("ApplyWaiting"),
+
         Redundant("CommitOrReadRedundant");
 
         final String fullname;
diff --git 
a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java 
b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
index f9916144..ab40251f 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
@@ -102,7 +102,7 @@ class CoordinateSyncPointTest
                                        .flatMap(syncPoint ->
                                                         // the test uses an 
executor that runs everything right away, so this gets called outside the 
CommandStore
                                                         
node.commandStores().forId(0).submit(() -> {
-                                                            
ExecuteSyncPoint.ExecuteExclusiveSyncPoint execute = new 
ExecuteSyncPoint.ExecuteExclusiveSyncPoint(node, syncPoint, AllTracker::new);
+                                                            
ExecuteSyncPoint.ExecuteExclusive execute = new 
ExecuteSyncPoint.ExecuteExclusive(node, syncPoint, AllTracker::new);
                                                             execute.start();
                                                             return execute;
                                                         })
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index 022b74f4..495383b9 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -675,7 +675,7 @@ public class ListStore implements DataStore
         });
     }
 
-    private static class Await extends 
ExecuteSyncPoint.ExecuteExclusiveSyncPoint
+    private static class Await extends ExecuteSyncPoint.ExecuteExclusive
     {
         public Await(Node node, SyncPoint<Range> syncPoint)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to