This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new fc3e1af1 CEP-15: (C*) Improve the chaos generation for Burn Tests:
slow/flakey connections and dropped messages (#57)
fc3e1af1 is described below
commit fc3e1af12554e3befe6e44f4664278d91b4c0415
Author: dcapwell <[email protected]>
AuthorDate: Wed Sep 13 15:52:34 2023 -0700
CEP-15: (C*) Improve the chaos generation for Burn Tests: slow/flakey
connections and dropped messages (#57)
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-18451
---
.../accord/impl/AbstractConfigurationService.java | 7 +
accord-core/src/main/java/accord/local/Node.java | 14 +-
.../src/main/java/accord/local/PreLoadContext.java | 25 +++
.../src/main/java/accord/messages/Accept.java | 2 +-
.../main/java/accord/messages/SafeCallback.java | 5 +
accord-core/src/test/java/accord/Utils.java | 2 +-
.../src/test/java/accord/burn/BurnTest.java | 66 ++++---
.../src/test/java/accord/burn/TopologyUpdates.java | 3 +
.../accord/burn/random/FrequentLargeRange.java | 134 +++++++++----
.../src/test/java/accord/burn/random/IntRange.java | 5 +-
.../test/java/accord/burn/random/RandomInt.java | 32 ---
.../java/accord/burn/random/RandomRangeTest.java | 6 +-
.../java/accord/burn/random/RandomWalkRange.java | 13 +-
.../burn/random/SegmentedRandomRangeTest.java | 8 +-
.../coordinate/tracking/TrackerReconciler.java | 2 +-
.../src/test/java/accord/impl/MessageListener.java | 219 +++++++++++++++++++++
.../src/test/java/accord/impl/basic/Cluster.java | 57 +++---
.../accord/impl/basic/DelayedCommandStores.java | 2 +-
.../src/test/java/accord/impl/basic/NodeSink.java | 135 +++++++++++--
.../test/java/accord/impl/basic/PendingQueue.java | 1 +
.../accord/impl/basic/PropagatingPendingQueue.java | 14 +-
.../java/accord/impl/basic/RandomDelayQueue.java | 27 ++-
.../basic/SimulatedDelayedExecutorService.java | 29 +--
.../basic/SimulatedFault.java} | 11 +-
.../src/test/java/accord/impl/list/ListAgent.java | 2 +-
.../test/java/accord/impl/list/ListRequest.java | 141 +++++++++----
.../src/test/java/accord/impl/list/ListResult.java | 57 ++++++
.../test/java/accord/impl/mock/MockCluster.java | 2 +-
.../java/accord/local/ImmutableCommandTest.java | 2 +-
accord-core/src/test/java/accord/utils/Gen.java | 68 ++++++-
.../src/test/java/accord/utils/GenTest.java | 76 +++++++
accord-core/src/test/java/accord/utils/Gens.java | 2 +-
.../src/main/java/accord/maelstrom/Cluster.java | 2 +-
.../src/main/java/accord/maelstrom/Main.java | 2 +-
.../src/main/groovy/accord.java-conventions.gradle | 4 +
35 files changed, 931 insertions(+), 246 deletions(-)
diff --git
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index d8ebab8f..d33c1231 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -269,6 +269,13 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
}
long lastAcked = epochs.lastAcknowledged;
+ // TODO (now, review): lastAcked == 0, lastReceived = 2
+ // if we wait for epoch=1.acknowledge the test seems to wait
forever... looks like burn test doesn't ack epoch=1
+ if (lastAcked == 0 && lastReceived > 0)
+ {
+ epochs.acknowledgeFuture(epochs.minEpoch()).addCallback(() ->
reportTopology(topology, startSync));
+ return;
+ }
if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
{
epochs.acknowledgeFuture(lastAcked + 1).addCallback(() ->
reportTopology(topology, startSync));
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index 1916f0bb..276f631e 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -169,10 +169,18 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
configService.registerListener(this);
}
- // TODO (cleanup, testing): remove, only used by Maelstrom
- public AsyncResult<Void> start()
+ /**
+ * This starts the node for tests and makes sure that the provided
topology is acknowledged correctly. This method is not
+ * safe for production systems as it doesn't handle restarts and partially
acknowledged histories
+ * @return {@link EpochReady#metadata}
+ */
+ @VisibleForTesting
+ public AsyncResult<Void> unsafeStart()
{
- return onTopologyUpdateInternal(configService.currentTopology(),
false).metadata;
+ EpochReady ready =
onTopologyUpdateInternal(configService.currentTopology(), false);
+ ready.coordination.addCallback(() ->
this.topology.onEpochSyncComplete(id, topology.epoch()));
+ configService.acknowledgeEpoch(ready, false);
+ return ready.metadata;
}
public CommandStores commandStores()
diff --git a/accord-core/src/main/java/accord/local/PreLoadContext.java
b/accord-core/src/main/java/accord/local/PreLoadContext.java
index cd569ace..0ef8f553 100644
--- a/accord-core/src/main/java/accord/local/PreLoadContext.java
+++ b/accord-core/src/main/java/accord/local/PreLoadContext.java
@@ -23,12 +23,15 @@ import accord.impl.CommandsForKey;
import accord.primitives.Keys;
import accord.primitives.Seekables;
import accord.primitives.TxnId;
+import com.google.common.collect.Iterators;
import net.nicoulaj.compilecommand.annotations.Inline;
import com.google.common.collect.Sets;
+import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.Nullable;
@@ -52,6 +55,28 @@ public interface PreLoadContext
*/
default Collection<TxnId> additionalTxnIds() { return
Collections.emptyList(); }
+ default Collection<TxnId> txnIds()
+ {
+ TxnId primaryTxnId = primaryTxnId();
+ Collection<TxnId> additional = additionalTxnIds();
+ if (primaryTxnId == null) return additional;
+ if (additional.isEmpty()) return Collections.singleton(primaryTxnId);
+ return new AbstractCollection<TxnId>()
+ {
+ @Override
+ public Iterator<TxnId> iterator()
+ {
+ return
Iterators.concat(Iterators.singletonIterator(primaryTxnId),
additional.iterator());
+ }
+
+ @Override
+ public int size()
+ {
+ return 1 + additional.size();
+ }
+ };
+ }
+
@Inline
default void forEachId(Consumer<TxnId> consumer)
{
diff --git a/accord-core/src/main/java/accord/messages/Accept.java
b/accord-core/src/main/java/accord/messages/Accept.java
index 1554e5dd..44abe4fd 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -79,7 +79,7 @@ public class Accept extends
TxnRequest.WithUnsynced<Accept.AcceptReply>
}
@Override
- public synchronized AcceptReply apply(SafeCommandStore safeStore)
+ public AcceptReply apply(SafeCommandStore safeStore)
{
// TODO (now): we previously checked isAffectedByBootstrap(txnId) here
and took this branch also, try to remember why
if (minUnsyncedEpoch < txnId.epoch())
diff --git a/accord-core/src/main/java/accord/messages/SafeCallback.java
b/accord-core/src/main/java/accord/messages/SafeCallback.java
index a8a1416f..0f3cd3a0 100644
--- a/accord-core/src/main/java/accord/messages/SafeCallback.java
+++ b/accord-core/src/main/java/accord/messages/SafeCallback.java
@@ -55,6 +55,11 @@ public class SafeCallback<T extends Reply>
failure(to, new Timeout(null, null));
}
+ public void onCallbackFailure(Node.Id to, Throwable t)
+ {
+ safeCall(to, t, Callback::onCallbackFailure);
+ }
+
private interface SafeCall<T, P>
{
void accept(Callback<T> callback, Node.Id id, P param) throws
Throwable;
diff --git a/accord-core/src/test/java/accord/Utils.java
b/accord-core/src/test/java/accord/Utils.java
index 256e5f84..ec1e81e0 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -152,7 +152,7 @@ public class Utils
SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
InMemoryCommandStores.Synchronized::new);
- awaitUninterruptibly(node.start());
+ awaitUninterruptibly(node.unsafeStart());
return node;
}
}
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java
b/accord-core/src/test/java/accord/burn/BurnTest.java
index ca63eb0a..d8bb0ae0 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -40,7 +40,10 @@ import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import accord.burn.random.FrequentLargeRange;
+import accord.impl.MessageListener;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +56,6 @@ import accord.impl.basic.RandomDelayQueue;
import accord.impl.basic.RandomDelayQueue.Factory;
import accord.impl.TopologyFactory;
import accord.impl.basic.Packet;
-import accord.impl.basic.PendingQueue;
import accord.impl.basic.SimulatedDelayedExecutorService;
import accord.impl.list.ListAgent;
import accord.impl.list.ListQuery;
@@ -80,7 +82,7 @@ public class BurnTest
{
private static final Logger logger =
LoggerFactory.getLogger(BurnTest.class);
- static List<Packet> generate(RandomSource random, Function<? super
CommandStore, AsyncExecutor> executor, List<Id> clients, List<Id> nodes, int
keyCount, int operations)
+ static List<Packet> generate(RandomSource random, MessageListener
listener, Function<? super CommandStore, AsyncExecutor> executor, List<Id>
clients, List<Id> nodes, int keyCount, int operations)
{
List<Key> keys = new ArrayList<>();
for (int i = 0 ; i < keyCount ; ++i)
@@ -108,7 +110,7 @@ public class BurnTest
Ranges ranges = Ranges.of(requestRanges.toArray(new Range[0]));
ListRead read = new ListRead(random.decide(readInCommandStore)
? Function.identity() : executor, ranges, ranges);
ListQuery query = new ListQuery(client, count);
- ListRequest request = new ListRequest(new Txn.InMemory(ranges,
read, query, null));
+ ListRequest request = new ListRequest(new Txn.InMemory(ranges,
read, query, null), listener);
packets.add(new Packet(client, node, count, request));
@@ -135,7 +137,7 @@ public class BurnTest
requestKeys.addAll(update.keySet());
ListRead read = new ListRead(random.decide(readInCommandStore)
? Function.identity() : executor, readKeys, new Keys(requestKeys));
ListQuery query = new ListQuery(client, count);
- ListRequest request = new ListRequest(new Txn.InMemory(new
Keys(requestKeys), read, query, update));
+ ListRequest request = new ListRequest(new Txn.InMemory(new
Keys(requestKeys), read, query, update), listener);
packets.add(new Packet(client, node, count, request));
}
}
@@ -192,7 +194,7 @@ public class BurnTest
{
List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
RandomDelayQueue delayQueue = new Factory(random).get();
- PendingQueue queue = new PropagatingPendingQueue(failures, delayQueue);
+ PropagatingPendingQueue queue = new PropagatingPendingQueue(failures,
delayQueue);
RandomSource retryRandom = random.fork();
ListAgent agent = new ListAgent(1000L, failures::add, retry -> {
long delay = retryRandom.nextInt(1, 15);
@@ -200,16 +202,23 @@ public class BurnTest
});
Supplier<LongSupplier> nowSupplier = () -> {
- RandomSource jitter = random.fork();
- // TODO (expected): jitter should be random walk with intermittent
long periods
- return () -> Math.max(0, delayQueue.nowInMillis() +
(jitter.nextInt(10) - 5));
+ RandomSource forked = random.fork();
+ return FrequentLargeRange.builder(forked)
+ .ratio(1, 5)
+ .small(50, 5000,
TimeUnit.MICROSECONDS)
+ .large(1, 10,
TimeUnit.MILLISECONDS)
+ .build()
+ .mapAsLong(j -> Math.max(0, queue.nowInMillis() + j))
+ .asLongSupplier(forked);
};
StrictSerializabilityVerifier strictSerializable = new
StrictSerializabilityVerifier(keyCount);
- SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, agent, random.fork());
+ SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, agent);
Function<CommandStore, AsyncExecutor> executor = ignore ->
globalExecutor;
- Packet[] requests = toArray(generate(random, executor, clients, nodes,
keyCount, operations), Packet[]::new);
+ MessageListener listener = MessageListener.get();
+
+ Packet[] requests = toArray(generate(random, listener, executor,
clients, nodes, keyCount, operations), Packet[]::new);
int[] starts = new int[requests.length];
Packet[] replies = new Packet[requests.length];
@@ -239,14 +248,14 @@ public class BurnTest
{
int i = requestIndex.getAndIncrement();
starts[i] = clock.incrementAndGet();
- queue.add(requests[i]);
+ queue.addNoDelay(requests[i]);
if (i == requests.length - 1)
onSubmitted.get().run();
}
try
{
- if (reply.responseKeys == null && reply.read != null &&
reply.read.length == 0)
+ if (!reply.isSuccess() && reply.fault() ==
ListResult.Fault.HeartBeat)
return; // interrupted; will fetch our actual reply once
rest of simulation is finished (but wanted to send another request to keep
correct number in flight)
int start = starts[(int)packet.replyId];
@@ -254,13 +263,19 @@ public class BurnTest
logger.debug("{} at [{}, {}]", reply, start, end);
replies[(int)packet.replyId] = packet;
- if (reply.responseKeys == null)
+ if (!reply.isSuccess())
{
- if (reply.read == null) nacks.incrementAndGet();
- else if (reply.read.length == 1) lost.incrementAndGet();
- else if (reply.read.length == 2)
truncated.incrementAndGet();
- else if (reply.read.length == 3)
failedToCheck.incrementAndGet();
- else throw new AssertionError();
+
+ switch (reply.fault())
+ {
+ case Lost: lost.incrementAndGet();
break;
+ case Invalidated: nacks.incrementAndGet();
break;
+ case Failure: failedToCheck.incrementAndGet();
break;
+ case Truncated: truncated.incrementAndGet();
break;
+ // txn was applied?, but client saw a timeout, so
response isn't known
+ case Other:
break;
+ default: throw new
AssertionError("Unexpected fault: " + reply.fault());
+ }
return;
}
@@ -292,7 +307,7 @@ public class BurnTest
EnumMap<MessageType, Cluster.Stats> messageStatsMap;
try
{
- messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), () ->
queue,
+ messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), listener,
() -> queue, queue::checkFailures,
responseSink, globalExecutor,
random::fork, nowSupplier,
topologyFactory,
initialRequests::poll,
@@ -313,12 +328,18 @@ public class BurnTest
logger.info("Message counts: {}", messageStatsMap.entrySet());
if (clock.get() != operations * 2)
{
+ StringBuilder sb = new StringBuilder();
for (int i = 0 ; i < requests.length ; ++i)
{
- logger.info("{}", requests[i]);
- logger.info("\t\t" + replies[i]);
+ // since this only happens when operations are lost, only log
the ones without a reply to lower the amount of noise
+ if (replies[i] == null)
+ {
+ sb.setLength(0);
+ sb.append(requests[i]).append("\n\t\t").append(replies[i]);
+ logger.info(sb.toString());
+ }
}
- throw new AssertionError("Incomplete set of responses");
+ throw new AssertionError("Incomplete set of responses; clock=" +
clock.get() + ", expected operations=" + (operations * 2));
}
}
@@ -355,6 +376,7 @@ public class BurnTest
}
@Test
+ @Timeout(value = 3, unit = TimeUnit.MINUTES)
public void testOne()
{
run(ThreadLocalRandom.current().nextLong(), 1000);
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index b196023f..ae3d3f0a 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -61,6 +61,9 @@ public class TopologyUpdates
public synchronized void syncComplete(Node originator, Collection<Node.Id>
cluster, long epoch)
{
+ // topology is init topology
+ if (pendingTopologies.isEmpty())
+ return;
Map<Node.Id, Ranges> pending = pendingTopologies.get(epoch);
if (pending == null || null == pending.remove(originator.id()))
throw new AssertionError();
diff --git
a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
index 42417980..41cae465 100644
--- a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
+++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
@@ -18,59 +18,121 @@
package accord.burn.random;
-import accord.utils.Invariants;
+import accord.utils.Gen;
+import accord.utils.Gen.LongGen;
+import accord.utils.Gens;
import accord.utils.RandomSource;
-public class FrequentLargeRange implements RandomLong
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+public class FrequentLargeRange implements LongGen
{
- private final RandomLong small, large;
- private final double ratio;
- private final int steps;
- private final double lower, upper;
- private int run = -1;
- private long smallCount = 0, largeCount = 0;
-
- public FrequentLargeRange(RandomLong small, RandomLong large, double ratio)
+ private final LongGen small, large;
+ private final Gen<Boolean> runs;
+
+ public FrequentLargeRange(LongGen small, LongGen large, double ratio)
{
- Invariants.checkArgument(ratio > 0 && ratio <= 1);
this.small = small;
this.large = large;
- this.ratio = ratio;
- this.steps = (int) (1 / ratio);
- this.lower = ratio * .8;
- this.upper = ratio * 1.2;
+ this.runs = Gens.bools().biasedRepeatingRuns(ratio);
}
@Override
- public long getLong(RandomSource randomSource)
+ public long nextLong(RandomSource randomSource)
+ {
+ if (runs.next(randomSource)) return large.nextLong(randomSource);
+ else return small.nextLong(randomSource);
+ }
+
+ public static Builder builder(RandomSource randomSource)
{
- if (run != -1)
+ return new Builder(randomSource);
+ }
+
+ public static class Builder
+ {
+ private final RandomSource random;
+ private Double ratio;
+ private LongGen small, large;
+
+ public Builder(RandomSource random)
+ {
+ this.random = random;
+ }
+
+ public Builder ratio(double ratio)
+ {
+ this.ratio = ratio;
+ return this;
+ }
+
+ public Builder ratio(int min, int max)
{
- run--;
- largeCount++;
- return large.getLong(randomSource);
+ this.ratio = random.nextInt(min, max) / 100.0D;
+ return this;
}
- double currentRatio = largeCount / (double) (smallCount + largeCount);
- if (currentRatio < lower)
+
+ public Builder small(Duration min, Duration max)
{
- // not enough large
- largeCount++;
- return large.getLong(randomSource);
+ small = create(min, max);
+ return this;
}
- if (currentRatio > upper)
+
+ public Builder small(long min, long max, TimeUnit unit)
+ {
+ small = create(min, max, unit);
+ return this;
+ }
+
+ public Builder small(long min, TimeUnit minUnit, long max, TimeUnit
maxUnit)
{
- // not enough small
- smallCount++;
- return small.getLong(randomSource);
+ small = create(min, minUnit, max, maxUnit);
+ return this;
}
- if (randomSource.nextDouble() < ratio)
+
+ public Builder large(Duration min, Duration max)
+ {
+ large = create(min, max);
+ return this;
+ }
+
+ public Builder large(long min, long max, TimeUnit unit)
+ {
+ large = create(min, max, unit);
+ return this;
+ }
+
+ public Builder large(long min, TimeUnit minUnit, long max, TimeUnit
maxUnit)
+ {
+ large = create(min, minUnit, max, maxUnit);
+ return this;
+ }
+
+ private RandomWalkRange create(Duration min, Duration max)
+ {
+ return new RandomWalkRange(random, min.toNanos(), max.toNanos());
+ }
+
+ private RandomWalkRange create(long min, long max, TimeUnit unit)
+ {
+ return create(min, unit, max, unit);
+ }
+
+ private RandomWalkRange create(long min, TimeUnit minUnit, long max,
TimeUnit maxUnit)
+ {
+ return new RandomWalkRange(random, minUnit.toNanos(min),
maxUnit.toNanos(max));
+ }
+
+ public FrequentLargeRange build()
{
- run = randomSource.nextInt(steps);
- run--;
- largeCount++;
- return large.getLong(randomSource);
+ if (small == null)
+ throw new IllegalStateException("Small range undefined");
+ if (large == null)
+ throw new IllegalStateException("Large range undefined");
+ if (ratio == null)
+ ratio(1, 11);
+ return new FrequentLargeRange(small, large, ratio);
}
- smallCount++;
- return small.getLong(randomSource);
}
}
diff --git a/accord-core/src/test/java/accord/burn/random/IntRange.java
b/accord-core/src/test/java/accord/burn/random/IntRange.java
index 5ebc2cd2..b9aec72a 100644
--- a/accord-core/src/test/java/accord/burn/random/IntRange.java
+++ b/accord-core/src/test/java/accord/burn/random/IntRange.java
@@ -18,9 +18,10 @@
package accord.burn.random;
+import accord.utils.Gen;
import accord.utils.RandomSource;
-public class IntRange implements RandomInt
+public class IntRange implements Gen.LongGen
{
public final int min, max;
private final int maxDelta;
@@ -34,7 +35,7 @@ public class IntRange implements RandomInt
}
@Override
- public int getInt(RandomSource randomSource)
+ public long nextLong(RandomSource randomSource)
{
return min + randomSource.nextInt(maxDelta);
}
diff --git a/accord-core/src/test/java/accord/burn/random/RandomInt.java
b/accord-core/src/test/java/accord/burn/random/RandomInt.java
deleted file mode 100644
index a8fbbd12..00000000
--- a/accord-core/src/test/java/accord/burn/random/RandomInt.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.burn.random;
-
-import accord.utils.RandomSource;
-
-public interface RandomInt extends RandomLong
-{
- int getInt(RandomSource randomSource);
-
- @Override
- default long getLong(RandomSource randomSource)
- {
- return getInt(randomSource);
- }
-}
diff --git a/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
b/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
index 68c134a8..d25be7c9 100644
--- a/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
+++ b/accord-core/src/test/java/accord/burn/random/RandomRangeTest.java
@@ -46,9 +46,9 @@ class RandomRangeTest
{
int samples = 1000;
qt().forAll(Gens.random(), ranges()).check((rs, range) -> {
- RandomLong randRange = factory.create(rs, range.min, range.max);
+ Gen.LongGen randRange = factory.create(rs, range.min, range.max);
for (int i = 0; i < samples; i++)
- Assertions.assertThat(randRange.getLong(rs)).isBetween((long)
range.min, (long) range.max);
+ Assertions.assertThat(randRange.nextLong(rs)).isBetween((long)
range.min, (long) range.max);
});
}
@@ -78,6 +78,6 @@ class RandomRangeTest
private interface Factory
{
- RandomLong create(RandomSource random, int min, int max);
+ Gen.LongGen create(RandomSource random, int min, int max);
}
}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
b/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
index c618cf7e..ae3c98e9 100644
--- a/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
+++ b/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java
@@ -18,15 +18,16 @@
package accord.burn.random;
+import accord.utils.Gen.LongGen;
import accord.utils.RandomSource;
-public class RandomWalkRange implements RandomLong
+public class RandomWalkRange implements LongGen
{
- public final int min, max;
- private final int maxStepSize;
+ public final long min, max;
+ private final long maxStepSize;
long cur;
- public RandomWalkRange(RandomSource random, int min, int max)
+ public RandomWalkRange(RandomSource random, long min, long max)
{
this.min = min;
this.max = max;
@@ -35,7 +36,7 @@ public class RandomWalkRange implements RandomLong
}
@Override
- public long getLong(RandomSource randomSource)
+ public long nextLong(RandomSource randomSource)
{
long step = randomSource.nextLong(-maxStepSize, maxStepSize + 1);
long cur = this.cur;
@@ -44,7 +45,7 @@ public class RandomWalkRange implements RandomLong
return cur;
}
- private static int maxStepSize(RandomSource random, int min, int max)
+ private static long maxStepSize(RandomSource random, long min, long max)
{
switch (random.nextInt(3))
{
diff --git
a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
b/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
index caddc241..b16cdecd 100644
--- a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
+++ b/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
@@ -53,17 +53,17 @@ class SegmentedRandomRangeTest
this.type = type;
}
- RandomLong min(RandomSource random)
+ Gen.LongGen min(RandomSource random)
{
return create(random, minSmall, maxSmall);
}
- RandomLong max(RandomSource random)
+ Gen.LongGen max(RandomSource random)
{
return create(random, minLarge, maxLarge);
}
- private RandomLong create(RandomSource random, int min, int max)
+ private Gen.LongGen create(RandomSource random, int min, int max)
{
switch (type)
{
@@ -127,7 +127,7 @@ class SegmentedRandomRangeTest
int resamples = 0;
for (int i = 0; i < numSamples; i++)
{
- long size = period.getLong(rs);
+ long size = period.nextLong(rs);
if (size > tc.maxSmall)
{
largeCount++;
diff --git
a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
index bfd83fa1..0c9bdea8 100644
---
a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
+++
b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
@@ -101,7 +101,7 @@ public abstract class TrackerReconciler<ST extends
ShardTracker, T extends Abstr
{
System.out.println("seed: " + seed);
RandomSource random = new DefaultRandom(seed);
- SimulatedDelayedExecutorService executor = new
SimulatedDelayedExecutorService(new RandomDelayQueue.Factory(random).get(), new
TestAgent(), random);
+ SimulatedDelayedExecutorService executor = new
SimulatedDelayedExecutorService(new RandomDelayQueue.Factory(random).get(), new
TestAgent());
return topologies(random, executor).map(topologies ->
constructor.apply(random, topologies))
.collect(Collectors.toList());
}
diff --git a/accord-core/src/test/java/accord/impl/MessageListener.java
b/accord-core/src/test/java/accord/impl/MessageListener.java
new file mode 100644
index 00000000..faddb09f
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/MessageListener.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.impl.basic.NodeSink;
+import accord.local.Node;
+import accord.local.PreLoadContext;
+import accord.messages.Message;
+import accord.messages.Request;
+import accord.messages.TxnRequest;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+public interface MessageListener
+{
+ enum ClientAction {SUBMIT, SUCCESS, FAILURE, UNKNOWN}
+
+ void onMessage(NodeSink.Action action, Node.Id src, Node.Id to, long id,
Message message);
+
+ void onClientAction(ClientAction action, Node.Id from, TxnId id, Object
message);
+
+ static MessageListener get()
+ {
+ if (!DebugListener.DEBUG) return Noop.INSTANCE;
+ return new DebugListener();
+ }
+
+ enum Noop implements MessageListener
+ {
+ INSTANCE;
+
+ @Override
+ public void onMessage(NodeSink.Action action, Node.Id src, Node.Id to,
long id, Message message)
+ {
+
+ }
+
+ @Override
+ public void onClientAction(ClientAction action, Node.Id from, TxnId
id, Object message)
+ {
+
+ }
+ }
+
+ class DebugListener implements MessageListener
+ {
+ private static final Logger logger =
LoggerFactory.getLogger(DebugListener.class);
+ /**
+ * When running tests, if this is enabled log events will happen for
each matching message type
+ */
+ private static final boolean DEBUG = false;
+ /**
+ * When this set is empty all txn events will be logged, but if only
specific txn are desired then this filter will limit the logging to just those
events
+ */
+ private static final Set<TxnId> txnIdFilter = ImmutableSet.of();
+ private static final Set<TxnReplyId> txnReplies = new HashSet<>();
+
+ private static int ACTION_SIZE =
Stream.of(NodeSink.Action.values()).map(Enum::name).mapToInt(String::length).max().getAsInt();
+ private static int CLIENT_ACTION_SIZE =
Stream.of(ClientAction.values()).map(Enum::name).mapToInt(String::length).max().getAsInt();
+ private static int ALL_ACTION_SIZE = Math.max(ACTION_SIZE,
CLIENT_ACTION_SIZE);
+
+ @Override
+ public void onMessage(NodeSink.Action action, Node.Id from, Node.Id
to, long id, Message message)
+ {
+ if (txnIdFilter.isEmpty() || containsTxnId(from, to, id, message))
+ logger.debug("Message {}: From {}, To {}, id {}, Message {}",
normalize(action), normalize(from), normalize(to), normalizeMessageId(id),
message);
+ }
+
+ @Override
+ public void onClientAction(ClientAction action, Node.Id from, TxnId
id, Object message)
+ {
+ if (txnIdFilter.isEmpty() || txnIdFilter.contains(id))
+ {
+ String log = message instanceof Throwable ?
+ "Client {}: From {}, To {}, id {}" :
+ "Client {}: From {}, To {}, id {}, Message {}";
+ logger.debug(log, normalize(action), normalize(from),
normalize(from), normalize(id), normalizeClientMessage(message));
+ }
+ }
+
+ private static Object normalizeClientMessage(Object o)
+ {
+ if (o instanceof Throwable)
+ trimStackTrace((Throwable) o);
+ return o;
+ }
+
+ private static void trimStackTrace(Throwable input)
+ {
+ for (Throwable current = input; current != null; current =
current.getCause())
+ {
+ StackTraceElement[] stack = current.getStackTrace();
+ // remove junit as its super dense and not helpful
+ OptionalInt first = IntStream.range(0, stack.length).filter(i
-> stack[i].getClassName().startsWith("org.junit")).findFirst();
+ if (first.isPresent())
+ current.setStackTrace(Arrays.copyOfRange(stack, 0,
first.getAsInt()));
+ for (Throwable sup : current.getSuppressed())
+ trimStackTrace(sup);
+ }
+ }
+
+ private static String normalize(NodeSink.Action action)
+ {
+ return Strings.padStart(action.name(), ALL_ACTION_SIZE, ' ');
+ }
+
+ private static String normalize(ClientAction action)
+ {
+ return Strings.padStart(action.name(), ALL_ACTION_SIZE, ' ');
+ }
+
+ private static String normalize(Node.Id id)
+ {
+ return Strings.padStart(id.toString(), 4, ' ');
+ }
+
+ private static String normalizeMessageId(long id)
+ {
+ return Strings.padStart(Long.toString(id), 14, ' ');
+ }
+
+ private static String normalize(Timestamp ts)
+ {
+ return Strings.padStart(ts.toString(), 14, ' ');
+ }
+
+ public static boolean containsTxnId(Node.Id from, Node.Id to, long id,
Message message)
+ {
+ if (message instanceof Request)
+ {
+ if (containsAny((Request) message))
+ {
+ txnReplies.add(new TxnReplyId(from, to, id));
+ return true;
+ }
+ return false;
+ }
+ else
+ return txnReplies.contains(new TxnReplyId(to, from, id));
+ }
+
+ private static boolean containsAny(Request message)
+ {
+ if (message instanceof TxnRequest<?>)
+ return txnIdFilter.contains(((TxnRequest<?>) message).txnId);
+ // this includes txn that depend on the txn, should this limit for
the first txnId?
+ if (message instanceof PreLoadContext)
+ return ((PreLoadContext)
message).txnIds().stream().anyMatch(txnIdFilter::contains);
+ return false;
+ }
+
+ private static class TxnReplyId
+ {
+ final Node.Id from;
+ final Node.Id to;
+ final long id;
+
+ private TxnReplyId(Node.Id from, Node.Id to, long id)
+ {
+ this.from = from;
+ this.to = to;
+ this.id = id;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TxnReplyId that = (TxnReplyId) o;
+ return id == that.id && Objects.equals(from, that.from) &&
Objects.equals(to, that.to);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(from, to, id);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TxnReplyId{" +
+ "from=" + from +
+ ", to=" + to +
+ ", id=" + id +
+ '}';
+ }
+ }
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 5dad1dee..853dfe71 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -35,6 +35,7 @@ import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import accord.impl.MessageListener;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ import accord.local.Node.Id;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
import accord.messages.MessageType;
+import accord.messages.Message;
import accord.messages.Reply;
import accord.messages.Request;
import accord.messages.SafeCallback;
@@ -81,18 +83,24 @@ public class Cluster implements Scheduler
EnumMap<MessageType, Stats> statsMap = new EnumMap<>(MessageType.class);
+ final RandomSource randomSource;
final Function<Id, Node> lookup;
final PendingQueue pending;
+ final Runnable checkFailures;
final List<Runnable> onDone = new ArrayList<>();
final Consumer<Packet> responseSink;
final Map<Id, NodeSink> sinks = new HashMap<>();
+ final MessageListener messageListener;
int clock;
int recurring;
Set<Id> partitionSet;
- public Cluster(Supplier<PendingQueue> queueSupplier, Function<Id, Node>
lookup, Consumer<Packet> responseSink)
+ public Cluster(RandomSource randomSource, MessageListener messageListener,
Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id,
Node> lookup, Consumer<Packet> responseSink)
{
+ this.randomSource = randomSource;
+ this.messageListener = messageListener;
this.pending = queueSupplier.get();
+ this.checkFailures = checkFailures;
this.lookup = lookup;
this.responseSink = responseSink;
this.partitionSet = new HashSet<>();
@@ -105,26 +113,16 @@ public class Cluster implements Scheduler
return sink;
}
- private void add(Packet packet)
+ void add(Packet packet, long delay, TimeUnit unit)
{
MessageType type = packet.message.type();
if (type != null)
statsMap.computeIfAbsent(type, ignore -> new Stats()).count++;
- boolean isReply = packet.message instanceof Reply;
if (trace.isTraceEnabled())
- trace.trace("{} {} {}", clock++, isReply ? "RPLY" : "SEND",
packet);
+ trace.trace("{} {} {}", clock++, packet.message instanceof Reply ?
"RPLY" : "SEND", packet);
if (lookup.apply(packet.dst) == null) responseSink.accept(packet);
- else pending.add(packet);
- }
-
- void add(Id from, Id to, long messageId, Request send)
- {
- add(new Packet(from, to, messageId, send));
- }
+ else pending.add(packet, delay, unit);
- void add(Id from, Id to, long replyId, Reply send)
- {
- add(new Packet(from, to, replyId, send));
}
public void processAll()
@@ -139,6 +137,7 @@ public class Cluster implements Scheduler
public boolean processPending()
{
+ checkFailures.run();
if (pending.size() == recurring)
return false;
@@ -147,6 +146,7 @@ public class Cluster implements Scheduler
return false;
processNext(next);
+ checkFailures.run();
return true;
}
@@ -157,18 +157,6 @@ public class Cluster implements Scheduler
Packet deliver = (Packet) next;
Node on = lookup.apply(deliver.dst);
- // TODO (required, testing): random drop chance independent of
partition; also port flaky connections etc. from simulator
- // Drop the message if it goes across the partition
- boolean drop = ((Packet) next).src.id >= 0 &&
- !(partitionSet.contains(deliver.src) &&
partitionSet.contains(deliver.dst)
- || !partitionSet.contains(deliver.src) &&
!partitionSet.contains(deliver.dst));
- if (drop)
- {
- if (trace.isTraceEnabled())
- trace.trace("{} DROP[{}] {}", clock++, on.epoch(),
deliver);
- return;
- }
-
if (trace.isTraceEnabled())
trace.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver);
@@ -190,6 +178,12 @@ public class Cluster implements Scheduler
}
}
+ public void notifyDropped(Node.Id from, Node.Id to, long id, Message
message)
+ {
+ if (trace.isTraceEnabled())
+ trace.trace("{} DROP[{}] (from:{}, to:{}, {}:{}, body:{})",
clock++, lookup.apply(to).epoch(), from, to, message instanceof Reply ?
"replyTo" : "id", id, message);
+ }
+
@Override
public Scheduled recurring(Runnable run, long delay, TimeUnit units)
{
@@ -219,13 +213,13 @@ public class Cluster implements Scheduler
run.run();
}
- public static EnumMap<MessageType, Stats> run(Id[] nodes,
Supplier<PendingQueue> queueSupplier, Consumer<Packet> responseSink,
AgentExecutor executor, Supplier<RandomSource> randomSupplier,
Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory topologyFactory,
Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal)
+ public static EnumMap<MessageType, Stats> run(Id[] nodes, MessageListener
messageListener, Supplier<PendingQueue> queueSupplier, Runnable checkFailures,
Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource>
randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory
topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal)
{
Topology topology = topologyFactory.toTopology(nodes);
Map<Id, Node> lookup = new LinkedHashMap<>();
try
{
- Cluster sinks = new Cluster(queueSupplier, lookup::get,
responseSink);
+ Cluster sinks = new Cluster(randomSupplier.get(), messageListener,
queueSupplier, checkFailures, lookup::get, responseSink);
TopologyUpdates topologyUpdates = new TopologyUpdates(executor);
TopologyRandomizer configRandomizer = new
TopologyRandomizer(randomSupplier, topology, topologyUpdates, lookup::get);
List<CoordinateDurabilityScheduling> durabilityScheduling = new
ArrayList<>();
@@ -248,7 +242,7 @@ public class Cluster implements Scheduler
}
// startup
- AsyncResult<?> startup =
AsyncChains.reduce(lookup.values().stream().map(Node::start).collect(toList()),
(a, b) -> null).beginAsResult();
+ AsyncResult<?> startup =
AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()),
(a, b) -> null).beginAsResult();
while (sinks.processPending());
Assertions.assertTrue(startup.isDone());
@@ -270,7 +264,7 @@ public class Cluster implements Scheduler
Packet next;
while ((next = in.get()) != null)
- sinks.add(next);
+ sinks.add(next, 0, TimeUnit.NANOSECONDS);
while (sinks.processPending());
@@ -289,8 +283,9 @@ public class Cluster implements Scheduler
while (!sinks.onDone.isEmpty())
{
- sinks.onDone.forEach(Runnable::run);
+ List<Runnable> onDone = new ArrayList<>(sinks.onDone);
sinks.onDone.clear();
+ onDone.forEach(Runnable::run);
while (sinks.processPending());
}
diff --git
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index cc026c8e..78d76a71 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -49,7 +49,7 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
public static CommandStores.Factory factory(PendingQueue pending)
{
return (time, agent, store, random, shardDistributor,
progressLogFactory) ->
- new DelayedCommandStores(time, agent, store, random,
shardDistributor, progressLogFactory, new
SimulatedDelayedExecutorService(pending, agent, random));
+ new DelayedCommandStores(time, agent, store, random,
shardDistributor, progressLogFactory, new
SimulatedDelayedExecutorService(pending, agent));
}
public static class DelayedCommandStore extends InMemoryCommandStore
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index e14fb523..a19d6964 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -18,13 +18,22 @@
package accord.impl.basic;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
import accord.api.MessageSink;
import accord.local.AgentExecutor;
+import accord.burn.random.FrequentLargeRange;
+import accord.messages.SafeCallback;
+import accord.messages.Message;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.RandomSource;
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.Callback;
@@ -32,13 +41,16 @@ import accord.messages.Reply;
import accord.messages.Reply.FailureReply;
import accord.messages.ReplyContext;
import accord.messages.Request;
-import accord.messages.SafeCallback;
-import accord.utils.RandomSource;
import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID;
+import static java.util.concurrent.TimeUnit.SECONDS;
public class NodeSink implements MessageSink
{
+ public enum Action {DELIVER, DROP, DROP_PARTITIONED, DELIVER_WITH_FAILURE,
FAILURE}
+
+ private final Map<Id, Supplier<Action>> nodeActions = new HashMap<>();
+ private final Map<Id, LongSupplier> networkJitter = new HashMap<>();
final Id self;
final Function<Id, Node> lookup;
final Cluster parent;
@@ -56,9 +68,9 @@ public class NodeSink implements MessageSink
}
@Override
- public synchronized void send(Id to, Request send)
+ public void send(Id to, Request send)
{
- parent.add(self, to, SENTINEL_MESSAGE_ID, send);
+ maybeEnqueue(to, SENTINEL_MESSAGE_ID, send, null);
}
@Override
@@ -67,21 +79,116 @@ public class NodeSink implements MessageSink
long messageId = nextMessageId++;
SafeCallback sc = new SafeCallback(executor, callback);
callbacks.put(messageId, sc);
- parent.add(self, to, messageId, send);
- parent.pending.add((PendingRunnable) () -> {
- if (sc == callbacks.get(messageId))
- sc.slowResponse(to);
- }, 100 + random.nextInt(200), TimeUnit.MILLISECONDS);
- parent.pending.add((PendingRunnable) () -> {
- if (sc == callbacks.remove(messageId))
- sc.timeout(to);
- }, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
+ if (maybeEnqueue(to, messageId, send, sc))
+ {
+ parent.pending.add((PendingRunnable) () -> {
+ if (sc == callbacks.get(messageId))
+ sc.slowResponse(to);
+ }, 100 + random.nextInt(200), TimeUnit.MILLISECONDS);
+ parent.pending.add((PendingRunnable) () -> {
+ if (sc == callbacks.remove(messageId))
+ sc.timeout(to);
+ }, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
+ }
}
@Override
public void reply(Id replyToNode, ReplyContext replyContext, Reply reply)
{
- parent.add(self, replyToNode, Packet.getMessageId(replyContext),
reply);
+ maybeEnqueue(replyToNode, Packet.getMessageId(replyContext), reply,
null);
+ }
+
+ private boolean maybeEnqueue(Node.Id to, long id, Message message,
SafeCallback callback)
+ {
+ Runnable task = () -> {
+ Packet packet;
+ if (message instanceof Reply) packet = new Packet(self, to, id,
(Reply) message);
+ else packet = new Packet(self, to, id,
(Request) message);
+ parent.add(packet, networkJitterNanos(to), TimeUnit.NANOSECONDS);
+ };
+ if (to.equals(self) || lookup.apply(to) == null /* client */)
+ {
+ parent.messageListener.onMessage(Action.DELIVER, self, to, id,
message);
+ task.run();
+ return true;
+ }
+
+ Action action = partitioned(to) ? Action.DROP_PARTITIONED
+ // call actions() per node so each one
has different "runs" state
+ : nodeActions.computeIfAbsent(to,
ignore -> actions()).get();
+ parent.messageListener.onMessage(action, self, to, id, message);
+ switch (action)
+ {
+ case DELIVER:
+ task.run();
+ return true;
+ case DELIVER_WITH_FAILURE:
+ task.run();
+ case FAILURE:
+
+ if (action == Action.FAILURE)
+ parent.notifyDropped(self, to, id, message);
+ if (callback != null)
+ {
+ parent.pending.add((PendingRunnable) () -> {
+ if (callback == callbacks.remove(id))
+ {
+ try
+ {
+ callback.failure(to, new
SimulatedFault("Simulation Failure; src=" + self + ", to=" + to + ", id=" + id
+ ", message=" + message));
+ }
+ catch (Throwable t)
+ {
+ callback.onCallbackFailure(to, t);
+
lookup.apply(self).agent().onUncaughtException(t);
+ }
+ }
+ }, 1000 + random.nextInt(1000), TimeUnit.MILLISECONDS);
+ }
+ return false;
+ case DROP_PARTITIONED:
+ case DROP:
+ // TODO (consistency): parent.notifyDropped is a trace logger
that is very similar in spirit to MessageListener; can we unify?
+ parent.notifyDropped(self, to, id, message);
+ return true;
+ default:
+ throw new AssertionError("Unexpected action: " + action);
+ }
+ }
+
+ private long networkJitterNanos(Node.Id dst)
+ {
+ return networkJitter.computeIfAbsent(dst, ignore -> defaultJitter())
+ .getAsLong();
+ }
+
+ private LongSupplier defaultJitter()
+ {
+ return FrequentLargeRange.builder(random)
+ .ratio(1, 5)
+ .small(500, TimeUnit.MICROSECONDS, 5,
TimeUnit.MILLISECONDS)
+ .large(50, TimeUnit.MILLISECONDS, 5, SECONDS)
+ .build()
+ .asLongSupplier(random);
+ }
+
+ private boolean partitioned(Id to)
+ {
+ return parent.partitionSet.contains(self) !=
parent.partitionSet.contains(to);
+ }
+
+ private Supplier<Action> actions()
+ {
+ Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01);
+ Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01);
+ Gen<Action> actionGen = rs -> {
+ if (drops.next(rs))
+ return Action.DROP;
+ return failures.next(rs) ?
+ rs.nextBoolean() ? Action.FAILURE :
Action.DELIVER_WITH_FAILURE
+ : Action.DELIVER;
+ };
+ return actionGen.asSupplier(random);
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
b/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
index e070dd81..fd4932d4 100644
--- a/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
public interface PendingQueue
{
void add(Pending item);
+ void addNoDelay(Pending item);
void add(Pending item, long delay, TimeUnit units);
boolean remove(Pending item);
Pending poll();
diff --git
a/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
b/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
index a45b53e8..bd25ac8f 100644
--- a/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/PropagatingPendingQueue.java
@@ -38,6 +38,12 @@ public class PropagatingPendingQueue implements PendingQueue
wrapped.add(item);
}
+ @Override
+ public void addNoDelay(Pending item)
+ {
+ wrapped.addNoDelay(item);
+ }
+
@Override
public void add(Pending item, long delay, TimeUnit units)
{
@@ -52,6 +58,12 @@ public class PropagatingPendingQueue implements PendingQueue
@Override
public Pending poll()
+ {
+ checkFailures();
+ return wrapped.poll();
+ }
+
+ public void checkFailures()
{
if (!failures.isEmpty())
{
@@ -74,8 +86,6 @@ public class PropagatingPendingQueue implements PendingQueue
}
throw assertion;
}
-
- return wrapped.poll();
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
index b0cec4d0..85eb2365 100644
--- a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
@@ -18,12 +18,14 @@
package accord.impl.basic;
+import accord.burn.random.FrequentLargeRange;
+import accord.utils.RandomSource;
+
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
-import accord.utils.RandomSource;
-
public class RandomDelayQueue implements PendingQueue
{
public static class Factory implements Supplier<RandomDelayQueue>
@@ -87,25 +89,38 @@ public class RandomDelayQueue implements PendingQueue
}
final PriorityQueue<Item> queue = new PriorityQueue<>();
- final RandomSource random;
+ private final LongSupplier jitterMillis;
long now;
int seq;
RandomDelayQueue(RandomSource random)
{
- this.random = random;
+ this.jitterMillis = FrequentLargeRange.builder(random)
+ .small(0, 50,
TimeUnit.MICROSECONDS)
+ .large(50,
TimeUnit.MICROSECONDS, 5, TimeUnit.MILLISECONDS)
+ .build()
+
.mapAsLong(TimeUnit.NANOSECONDS::toMillis)
+ .asLongSupplier(random);
}
@Override
public void add(Pending item)
{
- add(item, random.nextInt(500), TimeUnit.MILLISECONDS);
+ add(item, 0, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public void addNoDelay(Pending item)
+ {
+ queue.add(new Item(now, seq++, item));
}
@Override
public void add(Pending item, long delay, TimeUnit units)
{
- queue.add(new Item(now + units.toMillis(delay), seq++, item));
+ if (delay < 0)
+ throw new IllegalArgumentException("Delay must be positive or 0,
but given " + delay);
+ queue.add(new Item(now + units.toMillis(delay) +
jitterMillis.getAsLong(), seq++, item));
}
@Override
diff --git
a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
index 2f3bd1b1..d1971b51 100644
---
a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
+++
b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -26,10 +26,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import accord.api.Agent;
-import accord.burn.random.FrequentLargeRange;
-import accord.burn.random.RandomLong;
-import accord.burn.random.RandomWalkRange;
-import accord.utils.RandomSource;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -134,42 +130,23 @@ public class SimulatedDelayedExecutorService extends
TaskExecutorService impleme
private final PendingQueue pending;
private final Agent agent;
- private final RandomSource random;
- private final RandomLong jitterInNano;
private long sequenceNumber;
- public SimulatedDelayedExecutorService(PendingQueue pending, Agent agent,
RandomSource random)
+ public SimulatedDelayedExecutorService(PendingQueue pending, Agent agent)
{
this.pending = pending;
this.agent = agent;
- this.random = random;
- // this is different from Apache Cassandra Simulator as this is
computed differently for each executor
- // rather than being a global config
- double ratio = random.nextInt(1, 11) / 100.0D;
- this.jitterInNano = new FrequentLargeRange(new RandomWalkRange(random,
microToNanos(0), microToNanos(50)),
- new RandomWalkRange(random,
microToNanos(50), msToNanos(5)),
- ratio);
- }
-
- private static int msToNanos(int value)
- {
- return Math.toIntExact(TimeUnit.MILLISECONDS.toNanos(value));
- }
-
- private static int microToNanos(int value)
- {
- return Math.toIntExact(TimeUnit.MICROSECONDS.toNanos(value));
}
@Override
public void execute(Task<?> task)
{
- pending.add(task, jitterInNano.getLong(random), TimeUnit.NANOSECONDS);
+ pending.add(task);
}
private void schedule(Task<?> task, long delay, TimeUnit unit)
{
- pending.add(task, unit.toNanos(delay) + jitterInNano.getLong(random),
TimeUnit.NANOSECONDS);
+ pending.add(task, delay, unit);
}
@Override
diff --git a/accord-core/src/test/java/accord/burn/random/RandomLong.java
b/accord-core/src/test/java/accord/impl/basic/SimulatedFault.java
similarity index 82%
rename from accord-core/src/test/java/accord/burn/random/RandomLong.java
rename to accord-core/src/test/java/accord/impl/basic/SimulatedFault.java
index 1bc8c482..3af4ead8 100644
--- a/accord-core/src/test/java/accord/burn/random/RandomLong.java
+++ b/accord-core/src/test/java/accord/impl/basic/SimulatedFault.java
@@ -16,11 +16,12 @@
* limitations under the License.
*/
-package accord.burn.random;
+package accord.impl.basic;
-import accord.utils.RandomSource;
-
-public interface RandomLong
+public class SimulatedFault extends AssertionError
{
- long getLong(RandomSource randomSource);
+ public SimulatedFault(Object detailMessage)
+ {
+ super(detailMessage);
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 5fb3924c..1a9611c4 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -68,7 +68,7 @@ public class ListAgent implements Agent
@Override
public void onInconsistentTimestamp(Command command, Timestamp prev,
Timestamp next)
{
- throw new AssertionError("Inconsistent execution timestamp detected
for txnId " + command.txnId() + ": " + prev + " != " + next);
+ throw new AssertionError("Inconsistent execution timestamp detected
for command " + command + ": " + prev + " != " + next);
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index c6127a30..2f2084a6 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -18,18 +18,20 @@
package accord.impl.list;
-import java.util.function.BiConsumer;
-
import accord.api.Result;
import accord.api.RoutingKey;
import accord.coordinate.CheckShards;
import accord.coordinate.CoordinationFailed;
import accord.coordinate.Invalidated;
import accord.coordinate.Truncated;
+import accord.coordinate.Timeout;
+import accord.impl.MessageListener;
import accord.impl.basic.Cluster;
import accord.impl.basic.Packet;
+import accord.impl.basic.SimulatedFault;
import accord.local.Node;
import accord.local.Node.Id;
+import accord.local.SaveStatus;
import accord.local.Status;
import accord.messages.CheckStatus.CheckStatusOk;
import accord.messages.CheckStatus.IncludeInfo;
@@ -40,7 +42,11 @@ import accord.primitives.RoutingKeys;
import accord.primitives.Txn;
import accord.primitives.TxnId;
+import javax.annotation.Nullable;
+
import static accord.local.Status.Phase.Cleanup;
+import java.util.function.BiConsumer;
+
import static accord.local.Status.PreApplied;
import static accord.local.Status.PreCommitted;
@@ -68,6 +74,10 @@ public class ListRequest implements Request
protected Action checkSufficient(Id from, CheckStatusOk ok)
{
++count;
+ // this method is called for each reply, so if we see a reply
where the status is not known, it may be known on others;
+ // once all status are merged, then onDone will apply aditional
logic to make sure things are safe.
+ if (ok.saveStatus == SaveStatus.Uninitialised)
+ return Action.ApproveIfQuorum;
return ok.saveStatus.hasBeen(PreApplied) ? Action.Approve :
Action.Reject;
}
@@ -94,74 +104,125 @@ public class ListRequest implements Request
final Node node;
final Id client;
final ReplyContext replyContext;
+ final MessageListener listener;
+ final TxnId id;
final Txn txn;
- ResultCallback(Node node, Id client, ReplyContext replyContext, Txn
txn)
+ ResultCallback(Node node, Id client, ReplyContext replyContext,
MessageListener listener, TxnId id, Txn txn)
{
this.node = node;
this.client = client;
this.replyContext = replyContext;
+ this.listener = listener;
+ this.id = id;
this.txn = txn;
}
@Override
public void accept(Result success, Throwable fail)
{
- if (fail instanceof CoordinationFailed)
+ if (fail != null)
{
- RoutingKey homeKey = ((CoordinationFailed) fail).homeKey();
- TxnId txnId = ((CoordinationFailed) fail).txnId();
- if (fail instanceof Invalidated)
+ listener.onClientAction(MessageListener.ClientAction.FAILURE,
node.id(), id, fail);
+ if (fail instanceof CoordinationFailed)
{
- node.reply(client, replyContext, new ListResult(client,
((Packet) replyContext).requestId, txnId, null, null, null, null), null);
- return;
+ RoutingKey homeKey = ((CoordinationFailed) fail).homeKey();
+ TxnId txnId = ((CoordinationFailed) fail).txnId();
+ if (fail instanceof Invalidated)
+ {
+ node.reply(client, replyContext,
ListResult.invalidated(client, ((Packet)replyContext).requestId, txnId), null);
+ return;
+ }
+
+ node.reply(client, replyContext,
ListResult.heartBeat(client, ((Packet)replyContext).requestId, txnId), null);
+ ((Cluster) node.scheduler()).onDone(() ->
checkOnResult(homeKey, txnId, 0, null));
+ }
+ else if (fail instanceof SimulatedFault)
+ {
+ node.reply(client, replyContext,
ListResult.heartBeat(client, ((Packet)replyContext).requestId, id), null);
+ ((Cluster) node.scheduler()).onDone(() ->
checkOnResult(null, id, 0, null));
+ }
+ else
+ {
+ node.agent().onUncaughtException(fail);
}
-
- node.reply(client, replyContext, new ListResult(client,
((Packet)replyContext).requestId, txnId, null, null, new int[0][], null), null);
- ((Cluster)node.scheduler()).onDone(() -> {
- node.commandStores()
- .select(homeKey)
- .execute(() -> CheckOnResult.checkOnResult(node,
txnId, homeKey, (s, f) -> {
- if (f != null)
- {
- node.reply(client, replyContext, new
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, f
instanceof Truncated ? new int[2][] : new int[3][], null), null);
- return;
- }
- switch (s)
- {
- case Truncated:
- node.reply(client, replyContext, new
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new
int[2][], null), null);
- break;
- case Invalidated:
- node.reply(client, replyContext, new
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null,
null), null);
- break;
- case Lost:
- node.reply(client, replyContext, new
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new
int[1][], null), null);
- break;
- case Other:
- // currently caught elsewhere in response
tracking, but might help to throw an exception here
- }
- }));
- });
+ }
+ else if (success != null)
+ {
+ listener.onClientAction(MessageListener.ClientAction.SUCCESS,
node.id(), id, success);
+ node.reply(client, replyContext, (ListResult) success, null);
}
else
{
- node.reply(client, replyContext, (ListResult) success, fail);
+ listener.onClientAction(MessageListener.ClientAction.UNKNOWN,
node.id(), id, null);
+ node.agent().onUncaughtException(new
NullPointerException("Success and Failure were both null"));
+ }
+ }
+
+ private void checkOnResult(@Nullable RoutingKey homeKey, TxnId txnId,
int attempt, Throwable t) {
+ if (attempt == 3)
+ {
+ node.agent().onUncaughtException(t);
+ return;
}
+ if (homeKey == null)
+ homeKey = node.selectRandomHomeKey(txnId);
+ RoutingKey finalHomeKey = homeKey;
+ node.commandStores().select(homeKey).execute(() ->
CheckOnResult.checkOnResult(node, txnId, finalHomeKey, (s, f) -> {
+ if (f != null)
+ {
+ if (f instanceof Truncated)
+ {
+ node.reply(client, replyContext,
ListResult.truncated(client, ((Packet)replyContext).requestId, txnId), null);
+ return;
+ }
+ if (f instanceof Timeout || f instanceof SimulatedFault)
checkOnResult(finalHomeKey, txnId, attempt + 1, f);
+ else
+ {
+ node.reply(client, replyContext,
ListResult.failure(client, ((Packet)replyContext).requestId, txnId), null);
+ node.agent().onUncaughtException(f);
+ }
+ return;
+ }
+ switch (s)
+ {
+ case Truncated:
+ node.reply(client, replyContext,
ListResult.truncated(client, ((Packet)replyContext).requestId, txnId), null);
+ break;
+ case Invalidated:
+ node.reply(client, replyContext,
ListResult.invalidated(client, ((Packet)replyContext).requestId, txnId), null);
+ break;
+ case Lost:
+ node.reply(client, replyContext,
ListResult.lost(client, ((Packet)replyContext).requestId, txnId), null);
+ break;
+ case Other:
+ node.reply(client, replyContext,
ListResult.other(client, ((Packet)replyContext).requestId, txnId), null);
+ break;
+ default:
+ node.agent().onUncaughtException(new
AssertionError("Unknown outcome: " + s));
+ }
+ }));
}
}
public final Txn txn;
+ private final MessageListener listener;
+ private TxnId id;
- public ListRequest(Txn txn)
+ public ListRequest(Txn txn, MessageListener listener)
{
this.txn = txn;
+ this.listener = listener;
}
@Override
public void process(Node node, Id client, ReplyContext replyContext)
{
- node.coordinate(txn).addCallback(new ResultCallback(node, client,
replyContext, txn));
+ if (id != null)
+ throw new IllegalStateException("Called process multiple times");
+ id = node.nextTxnId(txn.kind(), txn.keys().domain());
+ listener.onClientAction(MessageListener.ClientAction.SUBMIT,
node.id(), id, txn);
+ node.coordinate(id, txn).addCallback(new ResultCallback(node, client,
replyContext, listener, id, txn));
}
@Override
@@ -173,7 +234,7 @@ public class ListRequest implements Request
@Override
public String toString()
{
- return txn.toString();
+ return id == null ? txn.toString() : id + " -> " + txn;
}
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java
b/accord-core/src/test/java/accord/impl/list/ListResult.java
index dc3311bd..88ffeb2f 100644
--- a/accord-core/src/test/java/accord/impl/list/ListResult.java
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -32,6 +32,7 @@ import accord.primitives.TxnId;
public class ListResult implements Result, Reply
{
+ public enum Fault { HeartBeat, Invalidated, Lost, Other, Truncated,
Failure }
public final Id client;
public final long requestId;
public final TxnId txnId;
@@ -39,6 +40,7 @@ public class ListResult implements Result, Reply
public final Keys responseKeys;
public final int[][] read; // equal in size to keys.size()
public final ListUpdate update;
+ private final Fault fault;
public ListResult(Id client, long requestId, TxnId txnId, Seekables<?, ?>
readKeys, Keys responseKeys, int[][] read, ListUpdate update)
{
@@ -49,6 +51,49 @@ public class ListResult implements Result, Reply
this.responseKeys = responseKeys;
this.read = read;
this.update = update;
+ this.fault = null;
+ }
+
+ private ListResult(Id client, long requestId, TxnId txnId, Fault fault)
+ {
+ this.client = client;
+ this.requestId = requestId;
+ this.txnId = txnId;
+ this.readKeys = null;
+ this.responseKeys = null;
+ this.read = null;
+ this.update = null;
+ this.fault = fault;
+ }
+
+ public static ListResult heartBeat(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.HeartBeat);
+ }
+
+ public static ListResult invalidated(Id client, long requestId, TxnId
txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Invalidated);
+ }
+
+ public static ListResult lost(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Lost);
+ }
+
+ public static ListResult other(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Other);
+ }
+
+ public static ListResult truncated(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Truncated);
+ }
+
+ public static ListResult failure(Id client, long requestId, TxnId txnId)
+ {
+ return new ListResult(client, requestId, txnId, Fault.Failure);
}
@Override
@@ -57,6 +102,18 @@ public class ListResult implements Result, Reply
return null;
}
+ public boolean isSuccess()
+ {
+ return fault == null;
+ }
+
+ public Fault fault()
+ {
+ if (isSuccess())
+ throw new IllegalStateException("Unable to find fault with
successful results");
+ return fault;
+ }
+
@Override
public String toString()
{
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 8c1996a9..4d04d512 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -131,7 +131,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new);
- awaitUninterruptibly(node.start());
+ awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(topology, true);
return node;
}
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 879dec72..365ceec9 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -113,7 +113,7 @@ public class ImmutableCommandTest
clock,
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
() -> storeSupport.data, new
ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new
TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
- awaitUninterruptibly(node.start());
+ awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(storeSupport.local.get(), true);
return node;
}
diff --git a/accord-core/src/test/java/accord/utils/Gen.java
b/accord-core/src/test/java/accord/utils/Gen.java
index 62da8c5e..af86340b 100644
--- a/accord-core/src/test/java/accord/utils/Gen.java
+++ b/accord-core/src/test/java/accord/utils/Gen.java
@@ -21,8 +21,18 @@ package accord.utils;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntPredicate;
+import java.util.function.IntSupplier;
+import java.util.function.IntUnaryOperator;
import java.util.function.LongPredicate;
+import java.util.function.LongSupplier;
+import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
public interface Gen<A> {
/**
@@ -46,6 +56,16 @@ public interface Gen<A> {
return r -> fn.apply(r, this.next(r));
}
+ default IntGen mapToInt(ToIntFunction<A> fn)
+ {
+ return r -> fn.applyAsInt(next(r));
+ }
+
+ default LongGen mapToLong(ToLongFunction<A> fn)
+ {
+ return r -> fn.applyAsLong(next(r));
+ }
+
default <B> Gen<B> flatMap(Function<? super A, Gen<? extends B>> mapper)
{
return rs -> mapper.apply(this.next(rs)).next(rs);
@@ -69,6 +89,16 @@ public interface Gen<A> {
};
}
+ default Supplier<A> asSupplier(RandomSource rs)
+ {
+ return () -> next(rs);
+ }
+
+ default Stream<A> asStream(RandomSource rs)
+ {
+ return Stream.generate(() -> next(rs));
+ }
+
interface IntGen extends Gen<Integer>
{
int nextInt(RandomSource random);
@@ -79,7 +109,12 @@ public interface Gen<A> {
return nextInt(random);
}
- default Gen.IntGen filterInt(IntPredicate fn)
+ default IntGen mapAsInt(IntUnaryOperator fn)
+ {
+ return r -> fn.applyAsInt(nextInt(r));
+ }
+
+ default Gen.IntGen filterAsInt(IntPredicate fn)
{
return rs -> {
int value;
@@ -95,7 +130,17 @@ public interface Gen<A> {
@Override
default Gen.IntGen filter(Predicate<Integer> fn)
{
- return filterInt(i -> fn.test(i));
+ return filterAsInt(i -> fn.test(i));
+ }
+
+ default IntSupplier asIntSupplier(RandomSource rs)
+ {
+ return () -> nextInt(rs);
+ }
+
+ default IntStream asIntStream(RandomSource rs)
+ {
+ return IntStream.generate(() -> nextInt(rs));
}
}
@@ -109,7 +154,12 @@ public interface Gen<A> {
return nextLong(random);
}
- default Gen.LongGen filterLong(LongPredicate fn)
+ default LongGen mapAsLong(LongUnaryOperator fn)
+ {
+ return r -> fn.applyAsLong(nextLong(r));
+ }
+
+ default Gen.LongGen filterAsLong(LongPredicate fn)
{
return rs -> {
long value;
@@ -125,7 +175,17 @@ public interface Gen<A> {
@Override
default Gen.LongGen filter(Predicate<Long> fn)
{
- return filterLong(i -> fn.test(i));
+ return filterAsLong(i -> fn.test(i));
+ }
+
+ default LongSupplier asLongSupplier(RandomSource rs)
+ {
+ return () -> nextLong(rs);
+ }
+
+ default LongStream asLongStream(RandomSource rs)
+ {
+ return LongStream.generate(() -> nextLong(rs));
}
}
}
diff --git a/accord-core/src/test/java/accord/utils/GenTest.java
b/accord-core/src/test/java/accord/utils/GenTest.java
index c722335f..7a046694 100644
--- a/accord-core/src/test/java/accord/utils/GenTest.java
+++ b/accord-core/src/test/java/accord/utils/GenTest.java
@@ -18,19 +18,25 @@
package accord.utils;
+import org.agrona.collections.IntArrayList;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.List;
+import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import static accord.utils.Property.qt;
+import static org.assertj.core.api.Assertions.assertThat;
public class GenTest {
@Test
@@ -97,4 +103,74 @@ public class GenTest {
});
});
}
+
+ enum PickWeight {A, B, C}
+
+ @Test
+ public void pickWeight()
+ {
+ int samples = 1000;
+ Gen<PickWeight> enums = Gens.enums().allWithWeights(PickWeight.class,
81, 10, 1);
+ Gen<Map<PickWeight, Integer>> gen = rs -> {
+ Map<PickWeight, Integer> counts = new EnumMap<>(PickWeight.class);
+ for (int i = 0; i < samples; i++)
+ counts.compute(enums.next(rs), (ignore, accum) -> accum ==
null ? 1 : accum + 1);
+ return counts;
+ };
+ qt().forAll(gen).check(counts -> {
+ // expected 810
+
assertThat(counts.get(PickWeight.A)).isGreaterThan(counts.get(PickWeight.B));
+ // expected 100
+ assertThat(counts.get(PickWeight.B))
+ .isBetween(50, 200);
+
+ if (counts.containsKey(PickWeight.C))
+ {
+ assertThat(counts.get(PickWeight.B))
+ .isGreaterThan(counts.get(PickWeight.C));
+
+ // expected 10
+ assertThat(counts.get(PickWeight.C))
+ .isBetween(1, 60);
+ }
+ });
+ }
+
+ @Test
+ public void runs()
+ {
+ double ratio = 0.0625;
+ int samples = 1000;
+ Gen<Runs> gen =
Gens.lists(Gens.bools().biasedRepeatingRuns(ratio)).ofSize(samples).map(Runs::new);
+ qt().forAll(gen).check(runs -> {
+ assertThat(IntStream.of(runs.runs).filter(i -> i >
5).toArray()).isNotEmpty();
+ assertThat(runs.counts.get(true) / 1000.0).isBetween(ratio * .5,
0.1);
+ });
+ }
+
+ private static class Runs
+ {
+ private final Map<Boolean, Long> counts;
+ private final int[] runs;
+
+ Runs(List<Boolean> samples)
+ {
+ this.counts =
samples.stream().collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+ IntArrayList runs = new IntArrayList();
+ int run = -1;
+ for (boolean b : samples)
+ {
+ if (b)
+ {
+ run = run == -1 ? 1 : run + 1;
+ }
+ else if (run != -1)
+ {
+ runs.add(run);
+ run = -1;
+ }
+ }
+ this.runs = runs.toIntArray();
+ }
+ }
}
diff --git a/accord-core/src/test/java/accord/utils/Gens.java
b/accord-core/src/test/java/accord/utils/Gens.java
index 4ccda465..244cd645 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -170,7 +170,7 @@ public class Gens {
return RandomSource::nextBoolean;
}
- public Gen<Boolean> runs(double ratio)
+ public Gen<Boolean> biasedRepeatingRuns(double ratio)
{
Invariants.checkArgument(ratio > 0 && ratio <= 1, "Expected %d to
be larger than 0 and <= 1", ratio);
int steps = (int) (1 / ratio);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index cd7741be..f988703e 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -321,7 +321,7 @@ public class Cluster implements Scheduler
SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new));
}
- AsyncResult<?> startup =
AsyncChains.reduce(lookup.values().stream().map(Node::start).collect(toList()),
(a, b) -> null).beginAsResult();
+ AsyncResult<?> startup =
AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()),
(a, b) -> null).beginAsResult();
while (sinks.processPending());
if (!startup.isDone()) throw new AssertionError();
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 1600e03a..a1919045 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -177,7 +177,7 @@ public class Main
MaelstromStore::new, new
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(),
scheduler, SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new);
- awaitUninterruptibly(on.start());
+ awaitUninterruptibly(on.unsafeStart());
err.println("Initialized node " + init.self);
err.flush();
sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID,
init.msg_id));
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle
b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index f77a0c07..109dd722 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -49,6 +49,10 @@ test {
jvmArgs += ['-XX:+HeapDumpOnOutOfMemoryError',
"-XX:HeapDumpPath=${buildDir}"]
}
+tasks.withType(Test) {
+ jvmArgs '-XX:+HeapDumpOnOutOfMemoryError'
+}
+
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.7.0'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]