This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 107966ee CEP-15: (Accord) sequence EpochReady.coordinating to allow
syncComplete to be learned from newer epochs (#103)
107966ee is described below
commit 107966eeaf69a968dcd736519b6d6199ee28acb1
Author: dcapwell <[email protected]>
AuthorDate: Fri Sep 27 16:14:41 2024 -0700
CEP-15: (Accord) sequence EpochReady.coordinating to allow syncComplete to
be learned from newer epochs (#103)
patch by David Capwell; reviewed by Alex Petrov, Blake Eggleston for
CASSANDRA-19769
---
.../main/java/accord/coordinate/EpochTimeout.java | 2 +-
.../src/main/java/accord/coordinate/Exhausted.java | 18 ++---
.../src/main/java/accord/coordinate/Timeout.java | 5 ++
.../accord/coordinate/tracking/QuorumTracker.java | 9 +++
accord-core/src/main/java/accord/local/Node.java | 19 +++++-
.../src/main/java/accord/primitives/Range.java | 4 +-
.../src/main/java/accord/primitives/Timestamp.java | 5 ++
.../src/main/java/accord/topology/Shard.java | 4 +-
.../main/java/accord/topology/TopologyManager.java | 76 ++++++++++++----------
.../src/main/java/accord/utils/Invariants.java | 9 ++-
.../src/main/java/accord/utils/RandomSource.java | 7 ++
.../src/test/java/accord/burn/BurnTest.java | 26 +-------
.../java/accord/primitives/TxnIdTest.java} | 34 ++++------
.../java/accord/topology/TopologyManagerTest.java | 43 ++++--------
.../src/test/java/accord/utils/AccordGens.java | 17 +++--
.../src/test/java/accord/utils/Property.java | 7 ++
16 files changed, 156 insertions(+), 129 deletions(-)
diff --git a/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
b/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
index 2a4fbf4a..42aee629 100644
--- a/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
+++ b/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
@@ -26,7 +26,7 @@ public class EpochTimeout extends Timeout
public EpochTimeout(long epoch)
{
- super(null, null);
+ super(null, null, "Timeout waiting for epoch " + epoch);
this.epoch = epoch;
}
diff --git a/accord-core/src/main/java/accord/coordinate/Exhausted.java
b/accord-core/src/main/java/accord/coordinate/Exhausted.java
index b116748b..81076912 100644
--- a/accord-core/src/main/java/accord/coordinate/Exhausted.java
+++ b/accord-core/src/main/java/accord/coordinate/Exhausted.java
@@ -32,27 +32,27 @@ import static accord.utils.Invariants.checkState;
public class Exhausted extends CoordinationFailed
{
final Ranges unavailable;
- private String message;
public Exhausted(TxnId txnId, @Nullable RoutingKey homeKey, Ranges
unavailable)
{
- super(txnId, homeKey);
+ super(txnId, homeKey, getMessage(txnId, unavailable));
this.unavailable = unavailable;
}
- public String getMessage()
- {
- if (message == null)
- message = unavailable == null ? "No more nodes to try" : "No more
nodes to try for: " + unavailable;
- return message;
- }
-
Exhausted(TxnId txnId, @Nullable RoutingKey homeKey, Ranges unavailable,
Exhausted cause)
{
super(txnId, homeKey, cause);
this.unavailable = unavailable;
}
+ private static String getMessage(TxnId txnId, @Nullable Ranges unavailable)
+ {
+ String msg = "No more nodes to try for " + txnId;
+ if (unavailable != null)
+ msg += ": " + unavailable;
+ return msg;
+ }
+
@Override
public Exhausted wrap()
{
diff --git a/accord-core/src/main/java/accord/coordinate/Timeout.java
b/accord-core/src/main/java/accord/coordinate/Timeout.java
index a4400ece..6544fcaa 100644
--- a/accord-core/src/main/java/accord/coordinate/Timeout.java
+++ b/accord-core/src/main/java/accord/coordinate/Timeout.java
@@ -40,6 +40,11 @@ public class Timeout extends CoordinationFailed
super(txnId, homeKey, cause);
}
+ protected Timeout(@Nullable TxnId txnId, @Nullable RoutingKey homeKey,
String message)
+ {
+ super(txnId, homeKey, message);
+ }
+
@Override
public Timeout wrap()
{
diff --git
a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
index 70bc3529..81decf73 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
@@ -66,6 +66,15 @@ public class QuorumTracker extends
AbstractSimpleTracker<QuorumTracker.QuorumSha
{
return failures > shard.maxFailures;
}
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{successes: "+successes+"," +
+ "failures: "+failures+"," +
+ "quorum?: "+hasReachedQuorum()+"," +
+ "shard:"+shard+"}";
+ }
}
public QuorumTracker(Topologies topologies)
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index f008edf1..fd8fb110 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -128,6 +128,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
public boolean equals(Id that)
{
+ if (that == null) return false;
return id == that.id;
}
@@ -245,7 +246,23 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
private synchronized EpochReady onTopologyUpdateInternal(Topology
topology, boolean startSync)
{
Supplier<EpochReady> bootstrap = commandStores.updateTopology(this,
topology, startSync);
- return this.topology.onTopologyUpdate(topology, bootstrap);
+ Supplier<EpochReady> ordering = () -> {
+ if (this.topology.isEmpty()) return bootstrap.get();
+ return order(this.topology.epochReady(topology.epoch() - 1),
bootstrap.get());
+ };
+ return this.topology.onTopologyUpdate(topology, ordering);
+ }
+
+ private static EpochReady order(EpochReady previous, EpochReady next)
+ {
+ if (previous.epoch + 1 != next.epoch)
+ throw new IllegalArgumentException("Attempted to order epochs but
they are not next to each other... previous=" + previous.epoch + ", next=" +
next.epoch);
+ if (previous.coordination.isDone()) return next;
+ return new EpochReady(next.epoch,
+ next.metadata,
+ previous.coordination.flatMap(ignore ->
next.coordination).beginAsResult(),
+ next.data,
+ next.reads);
}
@Override
diff --git a/accord-core/src/main/java/accord/primitives/Range.java
b/accord-core/src/main/java/accord/primitives/Range.java
index f6521861..5b1c2ae5 100644
--- a/accord-core/src/main/java/accord/primitives/Range.java
+++ b/accord-core/src/main/java/accord/primitives/Range.java
@@ -216,12 +216,12 @@ public abstract class Range implements
Comparable<RoutableKey>, Unseekable, Seek
this.end = end;
}
- public final RoutingKey start()
+ public RoutingKey start()
{
return start;
}
- public final RoutingKey end()
+ public RoutingKey end()
{
return end;
}
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 41c83265..af13a3f3 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -365,6 +365,11 @@ public class Timestamp implements Comparable<Timestamp>,
EpochSupplier
@Override
public String toString()
+ {
+ return toStandardString();
+ }
+
+ public String toStandardString()
{
return "[" + epoch() + ',' + hlc() + ',' +
Integer.toBinaryString(flags()) + ',' + node + ']';
}
diff --git a/accord-core/src/main/java/accord/topology/Shard.java
b/accord-core/src/main/java/accord/topology/Shard.java
index af1db18f..684515ee 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -52,7 +52,7 @@ public class Shard
this.maxFailures = maxToleratedFailures(nodes.size());
this.fastPathElectorate = ImmutableSet.copyOf(fastPathElectorate);
this.joining = checkArgument(ImmutableSet.copyOf(joining),
Iterables.all(joining, nodes::contains),
- "joining nodes must also be present in nodes");
+ "joining nodes must also be present in nodes; joining=%s,
nodes=%s", joining, nodes);
int e = fastPathElectorate.size();
this.recoveryFastPathSize = (maxFailures+1)/2;
this.slowPathQuorumSize = slowPathQuorumSize(nodes.size());
@@ -116,6 +116,8 @@ public class Shard
sb.append('f');
}
sb.append(')');
+ if (!joining.isEmpty())
+ sb.append(":joining=").append(joining);
s = sb.toString();
}
return s;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index c6974e00..fc741fa7 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -56,7 +56,6 @@ import accord.utils.async.AsyncResults;
import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
-import static accord.primitives.Routables.Slice.Minimal;
import static accord.utils.Invariants.checkArgument;
import static accord.utils.Invariants.checkState;
import static accord.utils.Invariants.illegalState;
@@ -94,10 +93,10 @@ public class TopologyManager
private final BitSet curShardSyncComplete;
private final Ranges addedRanges, removedRanges;
private EpochReady ready;
- private Ranges curSyncComplete, prevSyncComplete, syncComplete;
+ private Ranges syncComplete;
Ranges closed = Ranges.EMPTY, complete = Ranges.EMPTY;
- EpochState(Id node, Topology global, TopologySorter sorter, Ranges
prevRanges, Ranges prevSyncComplete)
+ EpochState(Id node, Topology global, TopologySorter sorter, Ranges
prevRanges)
{
this.self = node;
this.global = checkArgument(global, !global.isSubset());
@@ -110,37 +109,35 @@ public class TopologyManager
this.syncTracker = null;
this.addedRanges =
global.ranges.without(prevRanges).mergeTouching();
+
this.removedRanges =
prevRanges.mergeTouching().without(global.ranges);
- this.prevSyncComplete = addedRanges.union(MERGE_ADJACENT,
prevSyncComplete.without(removedRanges));
- this.curSyncComplete = this.syncComplete = addedRanges;
+ this.syncComplete = addedRanges;
}
- boolean markPrevSynced(Ranges newPrevSyncComplete)
+ public boolean hasReachedQuorum()
{
- newPrevSyncComplete = newPrevSyncComplete.union(MERGE_ADJACENT,
addedRanges).without(removedRanges);
- if (prevSyncComplete.containsAll(newPrevSyncComplete))
- return false;
- checkState(newPrevSyncComplete.containsAll(prevSyncComplete),
"Expected %s to contain all ranges in %s; but did not", newPrevSyncComplete,
prevSyncComplete);
- prevSyncComplete = newPrevSyncComplete;
- syncComplete = curSyncComplete.slice(newPrevSyncComplete,
Minimal).union(MERGE_ADJACENT, addedRanges);
- return true;
+ return syncTracker == null || syncTracker.hasReachedQuorum();
}
- public boolean hasReachedQuorum()
+ private boolean recordSyncCompleteFromFuture()
{
- return syncTracker == null || syncTracker.hasReachedQuorum();
+ if (syncTracker == null || syncComplete())
+ return false;
+ syncComplete = global.ranges.mergeTouching();
+ return true;
}
- public boolean recordSyncComplete(Id node)
+ enum NodeSyncStatus { Untracked, Complete, ShardUpdate, NoUpdate }
+
+ NodeSyncStatus recordSyncComplete(Id node)
{
if (syncTracker == null)
- return false;
+ return NodeSyncStatus.Untracked;
if (syncTracker.recordSuccess(node) == Success)
{
- curSyncComplete = global.ranges.mergeTouching();
- syncComplete = prevSyncComplete;
- return true;
+ syncComplete = global.ranges.mergeTouching();
+ return NodeSyncStatus.Complete;
}
else
{
@@ -150,13 +147,12 @@ public class TopologyManager
{
if (syncTracker.get(i).hasReachedQuorum() &&
!curShardSyncComplete.get(i))
{
- curSyncComplete =
curSyncComplete.union(MERGE_ADJACENT, Ranges.of(global.shards[i].range));
- syncComplete = curSyncComplete.slice(prevSyncComplete,
Minimal);
+ syncComplete = syncComplete.union(MERGE_ADJACENT,
Ranges.of(global.shards[i].range));
curShardSyncComplete.set(i);
updated = true;
}
}
- return updated;
+ return updated ? NodeSyncStatus.ShardUpdate :
NodeSyncStatus.NoUpdate;
}
}
@@ -312,10 +308,23 @@ public class TopologyManager
else
{
int i = indexOf(epoch);
- if (i < 0 || !epochs[i].recordSyncComplete(node))
+ if (i < 0)
return;
-
- while (--i >= 0 && epochs[i].markPrevSynced(epochs[i +
1].syncComplete)) {}
+ EpochState.NodeSyncStatus status =
epochs[i].recordSyncComplete(node);
+ switch (status)
+ {
+ case Complete:
+ i++;
+ for (; i < epochs.length &&
epochs[i].recordSyncCompleteFromFuture(); i++) {}
+ break;
+ case Untracked:
+ // don't have access to TopologyManager.this.node to
check if the nodes match... this state should not happen unless it is the same
node
+ case NoUpdate:
+ case ShardUpdate:
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown
status " + status);
+ }
}
}
@@ -489,14 +498,8 @@ public class TopologyManager
System.arraycopy(current.epochs, 0, nextEpochs, 1,
current.epochs.length);
- Ranges prevSynced, prevAll;
- if (current.epochs.length == 0) prevSynced = prevAll = Ranges.EMPTY;
- else
- {
- prevSynced = current.epochs[0].syncComplete;
- prevAll = current.epochs[0].global.ranges;
- }
- nextEpochs[0] = new EpochState(node, topology, sorter.get(topology),
prevAll, prevSynced);
+ Ranges prevAll = current.epochs.length == 0 ? Ranges.EMPTY :
current.epochs[0].global.ranges;
+ nextEpochs[0] = new EpochState(node, topology, sorter.get(topology),
prevAll);
notifications.syncComplete.forEach(nextEpochs[0]::recordSyncComplete);
nextEpochs[0].recordClosed(notifications.closed);
nextEpochs[0].recordComplete(notifications.complete);
@@ -604,6 +607,11 @@ public class TopologyManager
return epochs.currentLocal();
}
+ public boolean isEmpty()
+ {
+ return epochs == Epochs.EMPTY;
+ }
+
public long epoch()
{
return current().epoch;
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java
b/accord-core/src/main/java/accord/utils/Invariants.java
index f01d53dc..b9aa1ce8 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -38,9 +38,12 @@ public class Invariants
LOW, HIGH
}
- private static final int PARANOIA_COMPUTE =
Paranoia.valueOf(System.getProperty("accord.paranoia.cpu",
"NONE").toUpperCase()).ordinal();
- private static final int PARANOIA_MEMORY =
Paranoia.valueOf(System.getProperty("accord.paranoia.memory",
"NONE").toUpperCase()).ordinal();
- private static final int PARANOIA_FACTOR =
ParanoiaCostFactor.valueOf(System.getProperty("accord.paranoia.costfactor",
"LOW").toUpperCase()).ordinal();
+ public static final String KEY_PARANOIA_CPU = "accord.paranoia.cpu";
+ public static final String KEY_PARANOIA_MEMORY = "accord.paranoia.memory";
+ public static final String KEY_PARANOIA_COSTFACTOR =
"accord.paranoia.costfactor";
+ private static final int PARANOIA_COMPUTE =
Paranoia.valueOf(System.getProperty(KEY_PARANOIA_CPU,
"NONE").toUpperCase()).ordinal();
+ private static final int PARANOIA_MEMORY =
Paranoia.valueOf(System.getProperty(KEY_PARANOIA_MEMORY,
"NONE").toUpperCase()).ordinal();
+ private static final int PARANOIA_FACTOR =
ParanoiaCostFactor.valueOf(System.getProperty(KEY_PARANOIA_COSTFACTOR,
"LOW").toUpperCase()).ordinal();
private static boolean IS_PARANOID = PARANOIA_COMPUTE > 0 ||
PARANOIA_MEMORY > 0;
private static final boolean DEBUG = System.getProperty("accord.debug",
"false").equals("true");
diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java
b/accord-core/src/main/java/accord/utils/RandomSource.java
index c99308a9..4be82265 100644
--- a/accord-core/src/main/java/accord/utils/RandomSource.java
+++ b/accord-core/src/main/java/accord/utils/RandomSource.java
@@ -20,6 +20,7 @@ package accord.utils;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
@@ -294,6 +295,12 @@ public interface RandomSource
return Iterables.get(set, offset);
}
+ default <T extends Enum<T>> T pickOrderedSet(EnumSet<T> set)
+ {
+ int offset = nextInt(0, set.size());
+ return Iterables.get(set, offset);
+ }
+
default <T extends Comparable<? super T>> T pickUnorderedSet(Set<T> set)
{
if (set instanceof SortedSet)
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 6d6b1cfa..a95a0750 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -87,10 +87,8 @@ import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.RandomSource;
import accord.utils.Utils;
-import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncExecutor;
-import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults;
+import accord.utils.async.TimeoutUtils;
import accord.verify.CompositeVerifier;
import accord.verify.ElleVerifier;
import accord.verify.StrictSerializabilityVerifier;
@@ -581,40 +579,22 @@ public class BurnTest
private static void run(long seed)
{
Duration timeout = Duration.ofMinutes(3);
- Runnable fn = () -> run(seed, 1000);
- AsyncResult.Settable<?> promise = AsyncResults.settable();
- Thread t = new Thread(() -> {
- try
- {
- fn.run();
- promise.setSuccess(null);
- }
- catch (Throwable e)
- {
- promise.setFailure(e);
- }
- });
- t.setName("BurnTest with timeout");
- t.setDaemon(true);
try
{
- t.start();
- AsyncChains.getBlocking(promise, timeout.toNanos(),
TimeUnit.NANOSECONDS);
+ TimeoutUtils.runBlocking(timeout, "BurnTest with timeout", () ->
run(seed, 1000));
}
catch (Throwable thrown)
{
Throwable cause = thrown;
if (cause instanceof ExecutionException)
cause = cause.getCause();
- if (cause instanceof InterruptedException || cause instanceof
TimeoutException)
- t.interrupt();
if (cause instanceof TimeoutException)
{
TimeoutException override = new TimeoutException("test did not
complete within " + timeout);
override.setStackTrace(new StackTraceElement[0]);
cause = override;
}
- logger.error("Exception running burn test for seed {}:", seed, t);
+ logger.error("Exception running burn test for seed {}:", seed,
cause);
throw SimulationException.wrap(seed, cause);
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
b/accord-core/src/test/java/accord/primitives/TxnIdTest.java
similarity index 60%
copy from accord-core/src/main/java/accord/coordinate/EpochTimeout.java
copy to accord-core/src/test/java/accord/primitives/TxnIdTest.java
index 2a4fbf4a..5fb21f1a 100644
--- a/accord-core/src/main/java/accord/coordinate/EpochTimeout.java
+++ b/accord-core/src/test/java/accord/primitives/TxnIdTest.java
@@ -16,30 +16,24 @@
* limitations under the License.
*/
-package accord.coordinate;
+package accord.primitives;
-import static accord.utils.Invariants.checkState;
+import org.junit.jupiter.api.Test;
-public class EpochTimeout extends Timeout
-{
- public final long epoch;
+import accord.utils.AccordGens;
+import org.assertj.core.api.Assertions;
- public EpochTimeout(long epoch)
- {
- super(null, null);
- this.epoch = epoch;
- }
+import static accord.utils.Property.qt;
- private EpochTimeout(long epoch, EpochTimeout cause)
+class TxnIdTest
+{
+ @Test
+ void stringSerde()
{
- super(null, null, cause);
- this.epoch = epoch;
- }
+ qt().forAll(AccordGens.txnIds()).check(id -> {
+ Assertions.assertThat(TxnId.parse(id.toString())).isEqualTo(id);
- @Override
- public EpochTimeout wrap()
- {
- checkState(this.getClass() == EpochTimeout.class);
- return new EpochTimeout(epoch, this);
+
Assertions.assertThat(Timestamp.fromString(id.toStandardString())).isEqualTo(new
Timestamp(id));
+ });
}
-}
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index b839e868..aaa313e7 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -146,6 +146,8 @@ public class TopologyManagerTest
TopologyManager service = tracker();
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
+ // shards to nodes: [[1, 2, 3], [4, 5, 6]]
+ // by syncing node 1/2 shard 1 has reached quorum, but not shard 2
service.onEpochSyncComplete(id(1), 2);
service.onEpochSyncComplete(id(2), 2);
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
@@ -153,41 +155,24 @@ public class TopologyManagerTest
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncCompleteFor(keys(250).toParticipants()));
}
- /**
- * Epochs should only report being synced if every preceding epoch is also
reporting synced
- */
@Test
- void existingEpochPendingSync()
+ void syncCompletePastEpochs()
{
- Range range = range(100, 200);
- Topology topology1 = topology(1, shard(range, idList(1, 2, 3),
idSet(1, 2)));
- Topology topology2 = topology(2, shard(range, idList(1, 2, 3),
idSet(2, 3)));
- Topology topology3 = topology(3, shard(range, idList(1, 2, 3),
idSet(1, 2)));
-
TopologyManager service = testTopologyManager(SUPPLIER, ID);
- service.onTopologyUpdate(topology1, () -> null);
- service.onTopologyUpdate(topology2, () -> null);
- service.onTopologyUpdate(topology3, () -> null);
+ Shard[] shards = { shard(range(0, 100), idList(1, 2, 3), idSet(1, 2,
3)),
+ shard(range(100, 200), idList(3, 4, 5), idSet(3, 4,
5)) };
- Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
- Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
- Assertions.assertFalse(service.getEpochStateUnsafe(3).syncComplete());
+ service.onTopologyUpdate(topology(1, shards), () -> null);
+ service.onTopologyUpdate(topology(2, shards), () -> null);
+ service.onTopologyUpdate(topology(3, shards), () -> null);
- // sync epoch 3
- service.onEpochSyncComplete(id(1), 3);
- service.onEpochSyncComplete(id(2), 3);
-
- Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
- Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
- Assertions.assertFalse(service.getEpochStateUnsafe(3).syncComplete());
-
- // sync epoch 2
- service.onEpochSyncComplete(id(2), 2);
- service.onEpochSyncComplete(id(3), 2);
+ for (int i = 1; i <= 5; i++)
+ service.onEpochSyncComplete(id(i), service.epoch());
- Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
- Assertions.assertTrue(service.getEpochStateUnsafe(2).syncComplete());
- Assertions.assertTrue(service.getEpochStateUnsafe(3).syncComplete());
+ Ranges expected = service.current().ranges().mergeTouching();
+
org.assertj.core.api.Assertions.assertThat(service.syncComplete(3)).describedAs("Unexpected
sync complte for node 3").isEqualTo(expected);
+
org.assertj.core.api.Assertions.assertThat(service.syncComplete(2)).describedAs("Unexpected
sync complte for node 2").isEqualTo(expected);
+
org.assertj.core.api.Assertions.assertThat(service.syncComplete(1)).describedAs("Unexpected
sync complte for node 1").isEqualTo(expected);
}
/**
diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java
b/accord-core/src/test/java/accord/utils/AccordGens.java
index 69e48ddd..4af15d4b 100644
--- a/accord-core/src/test/java/accord/utils/AccordGens.java
+++ b/accord-core/src/test/java/accord/utils/AccordGens.java
@@ -73,7 +73,12 @@ public class AccordGens
public static Gen<Node.Id> nodes()
{
- return nodes(RandomSource::nextInt);
+ return nodes(nodeIdValues());
+ }
+
+ public static Gen.IntGen nodeIdValues()
+ {
+ return rs -> rs.nextInt(-1, Integer.MAX_VALUE) + 1;
}
public static Gen<Node.Id> nodes(Gen.IntGen nodes)
@@ -98,7 +103,7 @@ public class AccordGens
public static Gen<Timestamp> timestamps()
{
- return timestamps(epochs()::nextLong, hlcs(), flags(),
RandomSource::nextInt);
+ return timestamps(epochs()::nextLong, hlcs(), flags(), nodeIdValues());
}
public static Gen<Timestamp> timestamps(Gen.LongGen epochs, Gen.LongGen
hlcs, Gen.IntGen flags, Gen.IntGen nodes)
@@ -113,12 +118,12 @@ public class AccordGens
public static Gen<TxnId> txnIds(Gen<Txn.Kind> kinds)
{
- return txnIds(epochs(), hlcs(), RandomSource::nextInt, kinds);
+ return txnIds(epochs(), hlcs(), nodeIdValues(), kinds);
}
public static Gen<TxnId> txnIds(Gen<Txn.Kind> kinds, Gen<Routable.Domain>
domains)
{
- return txnIds(epochs(), hlcs(), RandomSource::nextInt, kinds, domains);
+ return txnIds(epochs(), hlcs(), nodeIdValues(), kinds, domains);
}
public static Gen<TxnId> txnIds(Gen.LongGen epochs, Gen.LongGen hlcs,
Gen.IntGen nodes)
@@ -138,7 +143,7 @@ public class AccordGens
public static Gen<Ballot> ballot()
{
- return ballot(epochs()::nextLong, hlcs(), flags(),
RandomSource::nextInt);
+ return ballot(epochs()::nextLong, hlcs(), flags(), nodeIdValues());
}
public static Gen<Ballot> ballot(Gen.LongGen epochs, Gen.LongGen hlcs,
Gen.IntGen flags, Gen.IntGen nodes)
@@ -571,7 +576,7 @@ public class AccordGens
Gen<? extends Key> keyGen =
Gens.pick(Iterators.toArray(((Keys) txn.keys()).iterator(), Key.class));
keyDepsGen = AccordGens.keyDeps(keyGen,
AccordGens.txnIds(Gens.longs().between(0, txnId.epoch()),
Gens.longs().between(0, txnId.hlc()),
-
RandomSource::nextInt,
+
nodeIdValues(),
Gens.pick(Txn.Kind.Write, Txn.Kind.Read),
ignore -> Routable.Domain.Key));
rangeDepsGen = i -> RangeDeps.NONE;
diff --git a/accord-core/src/test/java/accord/utils/Property.java
b/accord-core/src/test/java/accord/utils/Property.java
index e45642c1..724a9383 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -816,6 +816,12 @@ public class Property
CommandsBuilder.this.addIf(predicate, cmd);
return this;
}
+
+ @Override
+ public IfBuilder<State, SystemUnderTest>
addIf(Predicate<State> nextPredicate, Setup<State, SystemUnderTest> cmd) {
+ CommandsBuilder.this.addIf(predicate.and(nextPredicate),
cmd);
+ return this;
+ }
});
return this;
}
@@ -823,6 +829,7 @@ public class Property
public interface IfBuilder<State, SystemUnderTest>
{
IfBuilder<State, SystemUnderTest> add(Setup<State,
SystemUnderTest> cmd);
+ IfBuilder<State, SystemUnderTest> addIf(Predicate<State>
predicate, Setup<State, SystemUnderTest> cmd);
}
public CommandsBuilder<State, SystemUnderTest>
unknownWeight(Gen.IntGen unknownWeightGen)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]