This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 107966ee CEP-15: (Accord) sequence EpochReady.coordinating to allow 
syncComplete to be learned from newer epochs (#103)
107966ee is described below

commit 107966eeaf69a968dcd736519b6d6199ee28acb1
Author: dcapwell <[email protected]>
AuthorDate: Fri Sep 27 16:14:41 2024 -0700

    CEP-15: (Accord) sequence EpochReady.coordinating to allow syncComplete to 
be learned from newer epochs (#103)
    
    patch by David Capwell; reviewed by Alex Petrov, Blake Eggleston for 
CASSANDRA-19769
---
 .../main/java/accord/coordinate/EpochTimeout.java  |  2 +-
 .../src/main/java/accord/coordinate/Exhausted.java | 18 ++---
 .../src/main/java/accord/coordinate/Timeout.java   |  5 ++
 .../accord/coordinate/tracking/QuorumTracker.java  |  9 +++
 accord-core/src/main/java/accord/local/Node.java   | 19 +++++-
 .../src/main/java/accord/primitives/Range.java     |  4 +-
 .../src/main/java/accord/primitives/Timestamp.java |  5 ++
 .../src/main/java/accord/topology/Shard.java       |  4 +-
 .../main/java/accord/topology/TopologyManager.java | 76 ++++++++++++----------
 .../src/main/java/accord/utils/Invariants.java     |  9 ++-
 .../src/main/java/accord/utils/RandomSource.java   |  7 ++
 .../src/test/java/accord/burn/BurnTest.java        | 26 +-------
 .../java/accord/primitives/TxnIdTest.java}         | 34 ++++------
 .../java/accord/topology/TopologyManagerTest.java  | 43 ++++--------
 .../src/test/java/accord/utils/AccordGens.java     | 17 +++--
 .../src/test/java/accord/utils/Property.java       |  7 ++
 16 files changed, 156 insertions(+), 129 deletions(-)

diff --git a/accord-core/src/main/java/accord/coordinate/EpochTimeout.java 
b/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
index 2a4fbf4a..42aee629 100644
--- a/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
+++ b/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
@@ -26,7 +26,7 @@ public class EpochTimeout extends Timeout
 
     public EpochTimeout(long epoch)
     {
-        super(null, null);
+        super(null, null, "Timeout waiting for epoch " + epoch);
         this.epoch = epoch;
     }
 
diff --git a/accord-core/src/main/java/accord/coordinate/Exhausted.java 
b/accord-core/src/main/java/accord/coordinate/Exhausted.java
index b116748b..81076912 100644
--- a/accord-core/src/main/java/accord/coordinate/Exhausted.java
+++ b/accord-core/src/main/java/accord/coordinate/Exhausted.java
@@ -32,27 +32,27 @@ import static accord.utils.Invariants.checkState;
 public class Exhausted extends CoordinationFailed
 {
     final Ranges unavailable;
-    private String message;
 
     public Exhausted(TxnId txnId, @Nullable RoutingKey homeKey, Ranges 
unavailable)
     {
-        super(txnId, homeKey);
+        super(txnId, homeKey, getMessage(txnId, unavailable));
         this.unavailable = unavailable;
     }
 
-    public String getMessage()
-    {
-        if (message == null)
-            message = unavailable == null ? "No more nodes to try" : "No more 
nodes to try for: " + unavailable;
-        return message;
-    }
-
     Exhausted(TxnId txnId, @Nullable RoutingKey homeKey, Ranges unavailable, 
Exhausted cause)
     {
         super(txnId, homeKey, cause);
         this.unavailable = unavailable;
     }
 
+    private static String getMessage(TxnId txnId, @Nullable Ranges unavailable)
+    {
+        String msg = "No more nodes to try for " + txnId;
+        if (unavailable != null)
+            msg += ": " + unavailable;
+        return msg;
+    }
+
     @Override
     public Exhausted wrap()
     {
diff --git a/accord-core/src/main/java/accord/coordinate/Timeout.java 
b/accord-core/src/main/java/accord/coordinate/Timeout.java
index a4400ece..6544fcaa 100644
--- a/accord-core/src/main/java/accord/coordinate/Timeout.java
+++ b/accord-core/src/main/java/accord/coordinate/Timeout.java
@@ -40,6 +40,11 @@ public class Timeout extends CoordinationFailed
         super(txnId, homeKey, cause);
     }
 
+    protected Timeout(@Nullable TxnId txnId, @Nullable RoutingKey homeKey, 
String message)
+    {
+        super(txnId, homeKey, message);
+    }
+
     @Override
     public Timeout wrap()
     {
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
index 70bc3529..81decf73 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
@@ -66,6 +66,15 @@ public class QuorumTracker extends 
AbstractSimpleTracker<QuorumTracker.QuorumSha
         {
             return failures > shard.maxFailures;
         }
+
+        @Override
+        public String toString()
+        {
+            return getClass().getSimpleName() + "{successes: "+successes+"," +
+                   "failures: "+failures+"," +
+                   "quorum?: "+hasReachedQuorum()+"," +
+                   "shard:"+shard+"}";
+        }
     }
 
     public QuorumTracker(Topologies topologies)
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index f008edf1..fd8fb110 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -128,6 +128,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
 
         public boolean equals(Id that)
         {
+            if (that == null) return false;
             return id == that.id;
         }
 
@@ -245,7 +246,23 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
     private synchronized EpochReady onTopologyUpdateInternal(Topology 
topology, boolean startSync)
     {
         Supplier<EpochReady> bootstrap = commandStores.updateTopology(this, 
topology, startSync);
-        return this.topology.onTopologyUpdate(topology, bootstrap);
+        Supplier<EpochReady> ordering = () -> {
+            if (this.topology.isEmpty()) return bootstrap.get();
+            return order(this.topology.epochReady(topology.epoch() - 1), 
bootstrap.get());
+        };
+        return this.topology.onTopologyUpdate(topology, ordering);
+    }
+
+    private static EpochReady order(EpochReady previous, EpochReady next)
+    {
+        if (previous.epoch + 1 != next.epoch)
+            throw new IllegalArgumentException("Attempted to order epochs but 
they are not next to each other... previous=" + previous.epoch + ", next=" + 
next.epoch);
+        if (previous.coordination.isDone()) return next;
+        return new EpochReady(next.epoch,
+                              next.metadata,
+                              previous.coordination.flatMap(ignore -> 
next.coordination).beginAsResult(),
+                              next.data,
+                              next.reads);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/primitives/Range.java 
b/accord-core/src/main/java/accord/primitives/Range.java
index f6521861..5b1c2ae5 100644
--- a/accord-core/src/main/java/accord/primitives/Range.java
+++ b/accord-core/src/main/java/accord/primitives/Range.java
@@ -216,12 +216,12 @@ public abstract class Range implements 
Comparable<RoutableKey>, Unseekable, Seek
         this.end = end;
     }
 
-    public final RoutingKey start()
+    public RoutingKey start()
     {
         return start;
     }
 
-    public final RoutingKey end()
+    public RoutingKey end()
     {
         return end;
     }
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java 
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 41c83265..af13a3f3 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -365,6 +365,11 @@ public class Timestamp implements Comparable<Timestamp>, 
EpochSupplier
 
     @Override
     public String toString()
+    {
+        return toStandardString();
+    }
+
+    public String toStandardString()
     {
         return "[" + epoch() + ',' + hlc() + ',' + 
Integer.toBinaryString(flags()) + ',' + node + ']';
     }
diff --git a/accord-core/src/main/java/accord/topology/Shard.java 
b/accord-core/src/main/java/accord/topology/Shard.java
index af1db18f..684515ee 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -52,7 +52,7 @@ public class Shard
         this.maxFailures = maxToleratedFailures(nodes.size());
         this.fastPathElectorate = ImmutableSet.copyOf(fastPathElectorate);
         this.joining = checkArgument(ImmutableSet.copyOf(joining), 
Iterables.all(joining, nodes::contains),
-                "joining nodes must also be present in nodes");
+                "joining nodes must also be present in nodes; joining=%s, 
nodes=%s", joining, nodes);
         int e = fastPathElectorate.size();
         this.recoveryFastPathSize = (maxFailures+1)/2;
         this.slowPathQuorumSize = slowPathQuorumSize(nodes.size());
@@ -116,6 +116,8 @@ public class Shard
                     sb.append('f');
             }
             sb.append(')');
+            if (!joining.isEmpty())
+                sb.append(":joining=").append(joining);
             s = sb.toString();
         }
         return s;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index c6974e00..fc741fa7 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -56,7 +56,6 @@ import accord.utils.async.AsyncResults;
 
 import static accord.coordinate.tracking.RequestStatus.Success;
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
-import static accord.primitives.Routables.Slice.Minimal;
 import static accord.utils.Invariants.checkArgument;
 import static accord.utils.Invariants.checkState;
 import static accord.utils.Invariants.illegalState;
@@ -94,10 +93,10 @@ public class TopologyManager
         private final BitSet curShardSyncComplete;
         private final Ranges addedRanges, removedRanges;
         private EpochReady ready;
-        private Ranges curSyncComplete, prevSyncComplete, syncComplete;
+        private Ranges syncComplete;
         Ranges closed = Ranges.EMPTY, complete = Ranges.EMPTY;
 
-        EpochState(Id node, Topology global, TopologySorter sorter, Ranges 
prevRanges, Ranges prevSyncComplete)
+        EpochState(Id node, Topology global, TopologySorter sorter, Ranges 
prevRanges)
         {
             this.self = node;
             this.global = checkArgument(global, !global.isSubset());
@@ -110,37 +109,35 @@ public class TopologyManager
                 this.syncTracker = null;
 
             this.addedRanges = 
global.ranges.without(prevRanges).mergeTouching();
+
             this.removedRanges = 
prevRanges.mergeTouching().without(global.ranges);
-            this.prevSyncComplete = addedRanges.union(MERGE_ADJACENT, 
prevSyncComplete.without(removedRanges));
-            this.curSyncComplete = this.syncComplete = addedRanges;
+            this.syncComplete = addedRanges;
         }
 
-        boolean markPrevSynced(Ranges newPrevSyncComplete)
+        public boolean hasReachedQuorum()
         {
-            newPrevSyncComplete = newPrevSyncComplete.union(MERGE_ADJACENT, 
addedRanges).without(removedRanges);
-            if (prevSyncComplete.containsAll(newPrevSyncComplete))
-                return false;
-            checkState(newPrevSyncComplete.containsAll(prevSyncComplete), 
"Expected %s to contain all ranges in %s; but did not", newPrevSyncComplete, 
prevSyncComplete);
-            prevSyncComplete = newPrevSyncComplete;
-            syncComplete = curSyncComplete.slice(newPrevSyncComplete, 
Minimal).union(MERGE_ADJACENT, addedRanges);
-            return true;
+            return syncTracker == null || syncTracker.hasReachedQuorum();
         }
 
-        public boolean hasReachedQuorum()
+        private boolean recordSyncCompleteFromFuture()
         {
-            return syncTracker == null || syncTracker.hasReachedQuorum();
+            if (syncTracker == null || syncComplete())
+                return false;
+            syncComplete = global.ranges.mergeTouching();
+            return true;
         }
 
-        public boolean recordSyncComplete(Id node)
+        enum NodeSyncStatus { Untracked, Complete, ShardUpdate, NoUpdate }
+
+        NodeSyncStatus recordSyncComplete(Id node)
         {
             if (syncTracker == null)
-                return false;
+                return NodeSyncStatus.Untracked;
 
             if (syncTracker.recordSuccess(node) == Success)
             {
-                curSyncComplete = global.ranges.mergeTouching();
-                syncComplete = prevSyncComplete;
-                return true;
+                syncComplete = global.ranges.mergeTouching();
+                return NodeSyncStatus.Complete;
             }
             else
             {
@@ -150,13 +147,12 @@ public class TopologyManager
                 {
                     if (syncTracker.get(i).hasReachedQuorum() && 
!curShardSyncComplete.get(i))
                     {
-                        curSyncComplete = 
curSyncComplete.union(MERGE_ADJACENT, Ranges.of(global.shards[i].range));
-                        syncComplete = curSyncComplete.slice(prevSyncComplete, 
Minimal);
+                        syncComplete = syncComplete.union(MERGE_ADJACENT, 
Ranges.of(global.shards[i].range));
                         curShardSyncComplete.set(i);
                         updated = true;
                     }
                 }
-                return updated;
+                return updated ? NodeSyncStatus.ShardUpdate : 
NodeSyncStatus.NoUpdate;
             }
         }
 
@@ -312,10 +308,23 @@ public class TopologyManager
             else
             {
                 int i = indexOf(epoch);
-                if (i < 0 || !epochs[i].recordSyncComplete(node))
+                if (i < 0)
                     return;
-
-                while (--i >= 0 && epochs[i].markPrevSynced(epochs[i + 
1].syncComplete)) {}
+                EpochState.NodeSyncStatus status = 
epochs[i].recordSyncComplete(node);
+                switch (status)
+                {
+                    case Complete:
+                        i++;
+                        for (; i < epochs.length && 
epochs[i].recordSyncCompleteFromFuture(); i++) {}
+                        break;
+                    case Untracked:
+                        // don't have access to TopologyManager.this.node to 
check if the nodes match... this state should not happen unless it is the same 
node
+                    case NoUpdate:
+                    case ShardUpdate:
+                        break;
+                    default:
+                        throw new UnsupportedOperationException("Unknown 
status " + status);
+                }
             }
         }
 
@@ -489,14 +498,8 @@ public class TopologyManager
 
         System.arraycopy(current.epochs, 0, nextEpochs, 1, 
current.epochs.length);
 
-        Ranges prevSynced, prevAll;
-        if (current.epochs.length == 0) prevSynced = prevAll = Ranges.EMPTY;
-        else
-        {
-            prevSynced = current.epochs[0].syncComplete;
-            prevAll = current.epochs[0].global.ranges;
-        }
-        nextEpochs[0] = new EpochState(node, topology, sorter.get(topology), 
prevAll, prevSynced);
+        Ranges prevAll = current.epochs.length == 0 ? Ranges.EMPTY : 
current.epochs[0].global.ranges;
+        nextEpochs[0] = new EpochState(node, topology, sorter.get(topology), 
prevAll);
         notifications.syncComplete.forEach(nextEpochs[0]::recordSyncComplete);
         nextEpochs[0].recordClosed(notifications.closed);
         nextEpochs[0].recordComplete(notifications.complete);
@@ -604,6 +607,11 @@ public class TopologyManager
         return epochs.currentLocal();
     }
 
+    public boolean isEmpty()
+    {
+        return epochs == Epochs.EMPTY;
+    }
+
     public long epoch()
     {
         return current().epoch;
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java 
b/accord-core/src/main/java/accord/utils/Invariants.java
index f01d53dc..b9aa1ce8 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -38,9 +38,12 @@ public class Invariants
         LOW, HIGH
     }
 
-    private static final int PARANOIA_COMPUTE = 
Paranoia.valueOf(System.getProperty("accord.paranoia.cpu", 
"NONE").toUpperCase()).ordinal();
-    private static final int PARANOIA_MEMORY = 
Paranoia.valueOf(System.getProperty("accord.paranoia.memory", 
"NONE").toUpperCase()).ordinal();
-    private static final int PARANOIA_FACTOR = 
ParanoiaCostFactor.valueOf(System.getProperty("accord.paranoia.costfactor", 
"LOW").toUpperCase()).ordinal();
+    public static final String KEY_PARANOIA_CPU = "accord.paranoia.cpu";
+    public static final String KEY_PARANOIA_MEMORY = "accord.paranoia.memory";
+    public static final String KEY_PARANOIA_COSTFACTOR = 
"accord.paranoia.costfactor";
+    private static final int PARANOIA_COMPUTE = 
Paranoia.valueOf(System.getProperty(KEY_PARANOIA_CPU, 
"NONE").toUpperCase()).ordinal();
+    private static final int PARANOIA_MEMORY = 
Paranoia.valueOf(System.getProperty(KEY_PARANOIA_MEMORY, 
"NONE").toUpperCase()).ordinal();
+    private static final int PARANOIA_FACTOR = 
ParanoiaCostFactor.valueOf(System.getProperty(KEY_PARANOIA_COSTFACTOR, 
"LOW").toUpperCase()).ordinal();
     private static boolean IS_PARANOID = PARANOIA_COMPUTE > 0 || 
PARANOIA_MEMORY > 0;
     private static final boolean DEBUG = System.getProperty("accord.debug", 
"false").equals("true");
 
diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java 
b/accord-core/src/main/java/accord/utils/RandomSource.java
index c99308a9..4be82265 100644
--- a/accord-core/src/main/java/accord/utils/RandomSource.java
+++ b/accord-core/src/main/java/accord/utils/RandomSource.java
@@ -20,6 +20,7 @@ package accord.utils;
 
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -294,6 +295,12 @@ public interface RandomSource
         return Iterables.get(set, offset);
     }
 
+    default <T extends Enum<T>> T pickOrderedSet(EnumSet<T> set)
+    {
+        int offset = nextInt(0, set.size());
+        return Iterables.get(set, offset);
+    }
+
     default <T extends Comparable<? super T>> T pickUnorderedSet(Set<T> set)
     {
         if (set instanceof SortedSet)
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 6d6b1cfa..a95a0750 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -87,10 +87,8 @@ import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
 import accord.utils.Utils;
-import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncExecutor;
-import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults;
+import accord.utils.async.TimeoutUtils;
 import accord.verify.CompositeVerifier;
 import accord.verify.ElleVerifier;
 import accord.verify.StrictSerializabilityVerifier;
@@ -581,40 +579,22 @@ public class BurnTest
     private static void run(long seed)
     {
         Duration timeout = Duration.ofMinutes(3);
-        Runnable fn = () -> run(seed, 1000);
-        AsyncResult.Settable<?> promise = AsyncResults.settable();
-        Thread t = new Thread(() -> {
-            try
-            {
-                fn.run();
-                promise.setSuccess(null);
-            }
-            catch (Throwable e)
-            {
-                promise.setFailure(e);
-            }
-        });
-        t.setName("BurnTest with timeout");
-        t.setDaemon(true);
         try
         {
-            t.start();
-            AsyncChains.getBlocking(promise, timeout.toNanos(), 
TimeUnit.NANOSECONDS);
+            TimeoutUtils.runBlocking(timeout, "BurnTest with timeout", () -> 
run(seed, 1000));
         }
         catch (Throwable thrown)
         {
             Throwable cause = thrown;
             if (cause instanceof ExecutionException)
                 cause = cause.getCause();
-            if (cause instanceof InterruptedException || cause instanceof 
TimeoutException)
-                t.interrupt();
             if (cause instanceof TimeoutException)
             {
                 TimeoutException override = new TimeoutException("test did not 
complete within " + timeout);
                 override.setStackTrace(new StackTraceElement[0]);
                 cause = override;
             }
-            logger.error("Exception running burn test for seed {}:", seed, t);
+            logger.error("Exception running burn test for seed {}:", seed, 
cause);
             throw SimulationException.wrap(seed, cause);
         }
     }
diff --git a/accord-core/src/main/java/accord/coordinate/EpochTimeout.java 
b/accord-core/src/test/java/accord/primitives/TxnIdTest.java
similarity index 60%
copy from accord-core/src/main/java/accord/coordinate/EpochTimeout.java
copy to accord-core/src/test/java/accord/primitives/TxnIdTest.java
index 2a4fbf4a..5fb21f1a 100644
--- a/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
+++ b/accord-core/src/test/java/accord/primitives/TxnIdTest.java
@@ -16,30 +16,24 @@
  * limitations under the License.
  */
 
-package accord.coordinate;
+package accord.primitives;
 
-import static accord.utils.Invariants.checkState;
+import org.junit.jupiter.api.Test;
 
-public class EpochTimeout extends Timeout
-{
-    public final long epoch;
+import accord.utils.AccordGens;
+import org.assertj.core.api.Assertions;
 
-    public EpochTimeout(long epoch)
-    {
-        super(null, null);
-        this.epoch = epoch;
-    }
+import static accord.utils.Property.qt;
 
-    private EpochTimeout(long epoch, EpochTimeout cause)
+class TxnIdTest
+{
+    @Test
+    void stringSerde()
     {
-        super(null, null, cause);
-        this.epoch = epoch;
-    }
+        qt().forAll(AccordGens.txnIds()).check(id -> {
+            Assertions.assertThat(TxnId.parse(id.toString())).isEqualTo(id);
 
-    @Override
-    public EpochTimeout wrap()
-    {
-        checkState(this.getClass() == EpochTimeout.class);
-        return new EpochTimeout(epoch, this);
+            
Assertions.assertThat(Timestamp.fromString(id.toStandardString())).isEqualTo(new
 Timestamp(id));
+        });
     }
-}
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java 
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index b839e868..aaa313e7 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -146,6 +146,8 @@ public class TopologyManagerTest
         TopologyManager service = tracker();
 
         Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
+        // shards to nodes: [[1, 2, 3], [4, 5, 6]]
+        // by syncing node 1/2 shard 1 has reached quorum, but not shard 2
         service.onEpochSyncComplete(id(1), 2);
         service.onEpochSyncComplete(id(2), 2);
         Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
@@ -153,41 +155,24 @@ public class TopologyManagerTest
         
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncCompleteFor(keys(250).toParticipants()));
     }
 
-    /**
-     * Epochs should only report being synced if every preceding epoch is also 
reporting synced
-     */
     @Test
-    void existingEpochPendingSync()
+    void syncCompletePastEpochs()
     {
-        Range range = range(100, 200);
-        Topology topology1 = topology(1, shard(range, idList(1, 2, 3), 
idSet(1, 2)));
-        Topology topology2 = topology(2, shard(range, idList(1, 2, 3), 
idSet(2, 3)));
-        Topology topology3 = topology(3, shard(range, idList(1, 2, 3), 
idSet(1, 2)));
-
         TopologyManager service = testTopologyManager(SUPPLIER, ID);
-        service.onTopologyUpdate(topology1, () -> null);
-        service.onTopologyUpdate(topology2, () -> null);
-        service.onTopologyUpdate(topology3, () -> null);
+        Shard[] shards = { shard(range(0, 100), idList(1, 2, 3), idSet(1, 2, 
3)),
+                           shard(range(100, 200), idList(3, 4, 5), idSet(3, 4, 
5)) };
 
-        Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
-        Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
-        Assertions.assertFalse(service.getEpochStateUnsafe(3).syncComplete());
+        service.onTopologyUpdate(topology(1, shards), () -> null);
+        service.onTopologyUpdate(topology(2, shards), () -> null);
+        service.onTopologyUpdate(topology(3, shards), () -> null);
 
-        // sync epoch 3
-        service.onEpochSyncComplete(id(1), 3);
-        service.onEpochSyncComplete(id(2), 3);
-
-        Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
-        Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
-        Assertions.assertFalse(service.getEpochStateUnsafe(3).syncComplete());
-
-        // sync epoch 2
-        service.onEpochSyncComplete(id(2), 2);
-        service.onEpochSyncComplete(id(3), 2);
+        for (int i = 1; i <= 5; i++)
+            service.onEpochSyncComplete(id(i), service.epoch());
 
-        Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
-        Assertions.assertTrue(service.getEpochStateUnsafe(2).syncComplete());
-        Assertions.assertTrue(service.getEpochStateUnsafe(3).syncComplete());
+        Ranges expected = service.current().ranges().mergeTouching();
+        
org.assertj.core.api.Assertions.assertThat(service.syncComplete(3)).describedAs("Unexpected
 sync complte for node 3").isEqualTo(expected);
+        
org.assertj.core.api.Assertions.assertThat(service.syncComplete(2)).describedAs("Unexpected
 sync complte for node 2").isEqualTo(expected);
+        
org.assertj.core.api.Assertions.assertThat(service.syncComplete(1)).describedAs("Unexpected
 sync complte for node 1").isEqualTo(expected);
     }
 
     /**
diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java 
b/accord-core/src/test/java/accord/utils/AccordGens.java
index 69e48ddd..4af15d4b 100644
--- a/accord-core/src/test/java/accord/utils/AccordGens.java
+++ b/accord-core/src/test/java/accord/utils/AccordGens.java
@@ -73,7 +73,12 @@ public class AccordGens
 
     public static Gen<Node.Id> nodes()
     {
-        return nodes(RandomSource::nextInt);
+        return nodes(nodeIdValues());
+    }
+
+    public static Gen.IntGen nodeIdValues()
+    {
+        return rs -> rs.nextInt(-1, Integer.MAX_VALUE) + 1;
     }
 
     public static Gen<Node.Id> nodes(Gen.IntGen nodes)
@@ -98,7 +103,7 @@ public class AccordGens
 
     public static Gen<Timestamp> timestamps()
     {
-        return timestamps(epochs()::nextLong, hlcs(), flags(), 
RandomSource::nextInt);
+        return timestamps(epochs()::nextLong, hlcs(), flags(), nodeIdValues());
     }
 
     public static Gen<Timestamp> timestamps(Gen.LongGen epochs, Gen.LongGen 
hlcs, Gen.IntGen flags, Gen.IntGen nodes)
@@ -113,12 +118,12 @@ public class AccordGens
 
     public static Gen<TxnId> txnIds(Gen<Txn.Kind> kinds)
     {
-        return txnIds(epochs(), hlcs(), RandomSource::nextInt, kinds);
+        return txnIds(epochs(), hlcs(), nodeIdValues(), kinds);
     }
 
     public static Gen<TxnId> txnIds(Gen<Txn.Kind> kinds, Gen<Routable.Domain> 
domains)
     {
-        return txnIds(epochs(), hlcs(), RandomSource::nextInt, kinds, domains);
+        return txnIds(epochs(), hlcs(), nodeIdValues(), kinds, domains);
     }
 
     public static Gen<TxnId> txnIds(Gen.LongGen epochs, Gen.LongGen hlcs, 
Gen.IntGen nodes)
@@ -138,7 +143,7 @@ public class AccordGens
 
     public static Gen<Ballot> ballot()
     {
-        return ballot(epochs()::nextLong, hlcs(), flags(), 
RandomSource::nextInt);
+        return ballot(epochs()::nextLong, hlcs(), flags(), nodeIdValues());
     }
 
     public static Gen<Ballot> ballot(Gen.LongGen epochs, Gen.LongGen hlcs, 
Gen.IntGen flags, Gen.IntGen nodes)
@@ -571,7 +576,7 @@ public class AccordGens
                 Gen<? extends Key> keyGen = 
Gens.pick(Iterators.toArray(((Keys) txn.keys()).iterator(), Key.class));
                 keyDepsGen = AccordGens.keyDeps(keyGen, 
AccordGens.txnIds(Gens.longs().between(0, txnId.epoch()),
                                                                           
Gens.longs().between(0, txnId.hlc()),
-                                                                          
RandomSource::nextInt,
+                                                                          
nodeIdValues(),
                                                                           
Gens.pick(Txn.Kind.Write, Txn.Kind.Read),
                                                                           
ignore -> Routable.Domain.Key));
                 rangeDepsGen = i -> RangeDeps.NONE;
diff --git a/accord-core/src/test/java/accord/utils/Property.java 
b/accord-core/src/test/java/accord/utils/Property.java
index e45642c1..724a9383 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -816,6 +816,12 @@ public class Property
                     CommandsBuilder.this.addIf(predicate, cmd);
                     return this;
                 }
+
+                @Override
+                public IfBuilder<State, SystemUnderTest> 
addIf(Predicate<State> nextPredicate, Setup<State, SystemUnderTest> cmd) {
+                    CommandsBuilder.this.addIf(predicate.and(nextPredicate), 
cmd);
+                    return this;
+                }
             });
             return this;
         }
@@ -823,6 +829,7 @@ public class Property
         public interface IfBuilder<State, SystemUnderTest>
         {
             IfBuilder<State, SystemUnderTest> add(Setup<State, 
SystemUnderTest> cmd);
+            IfBuilder<State, SystemUnderTest> addIf(Predicate<State> 
predicate, Setup<State, SystemUnderTest> cmd);
         }
 
         public CommandsBuilder<State, SystemUnderTest> 
unknownWeight(Gen.IntGen unknownWeightGen)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to