This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch CASSANDRA-18804 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit a718cf4fd86dfa8ba775e2f35215de372205e0f6 Author: David Capwell <[email protected]> AuthorDate: Tue Nov 28 14:53:06 2023 -0800 test stable --- .../src/test/java/accord/burn/BurnTest.java | 3 +- .../src/test/java/accord/impl/basic/Cluster.java | 4 +- .../src/test/java/accord/local/CommandsTest.java | 102 ++++++++++++--------- 3 files changed, 65 insertions(+), 44 deletions(-) diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index 684dd77c..722d2657 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -376,7 +376,8 @@ public class BurnTest queue::checkFailures, responseSink, random::fork, nowSupplier, topologyFactory, initialRequests::poll, - onSubmitted::set + onSubmitted::set, + ignore -> {} ); for (Verifier verifier : validators.values()) verifier.close(); diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index cf21bc0c..a9dbcf5f 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -262,7 +262,8 @@ public class Cluster implements Scheduler BiFunction<Id, BiConsumer<Timestamp, Ranges>, AgentExecutor> nodeExecutorSupplier, Runnable checkFailures, Consumer<Packet> responseSink, Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, - TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal) + TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal, + Consumer<Map<Id, Node>> readySignal) { Topology topology = topologyFactory.toTopology(nodes); Map<Id, Node> nodeMap = new LinkedHashMap<>(); @@ -344,6 +345,7 @@ public class Cluster implements Scheduler reconfigure.cancel(); durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop); }); + readySignal.accept(nodeMap); Packet next; while ((next = in.get()) != null) diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java b/accord-core/src/test/java/accord/local/CommandsTest.java index f91003e8..0e3133c9 100644 --- a/accord-core/src/test/java/accord/local/CommandsTest.java +++ b/accord-core/src/test/java/accord/local/CommandsTest.java @@ -21,6 +21,8 @@ package accord.local; import accord.api.Key; import accord.api.TestableConfigurationService; import accord.burn.random.FrequentLargeRange; +import accord.coordinate.Timeout; +import accord.coordinate.TopologyMismatch; import accord.impl.MessageListener; import accord.impl.PrefixedIntHashKey; import accord.impl.TopologyFactory; @@ -47,55 +49,57 @@ import accord.utils.AccordGens; import accord.utils.Gen; import accord.utils.Gens; import accord.utils.RandomSource; + import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.LinkedList; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static accord.Utils.listWriteTxn; import static accord.utils.Property.qt; import static accord.utils.Utils.addAll; class CommandsTest { - private static final Node.Id N1 = new Node.Id(1); + private static final Logger logger = LoggerFactory.getLogger(CommandsTest.class); @Test - void addAndRemoveRangesValidate() + void removeRangesValidate() { - Gen<List<Node.Id>> nodeGen = Gens.lists(AccordGens.nodes()).ofSizeBetween(1, 100); + Gen<List<Node.Id>> nodeGen = Gens.lists(AccordGens.nodes()).ofSizeBetween(1, 10); qt().check(rs -> { List<Node.Id> nodes = nodeGen.next(rs); - if (!nodes.contains(N1)) - nodes.add(N1); nodes.sort(Comparator.naturalOrder()); + logger.info("Running with {} nodes", nodes.size()); int rf = Math.min(3, nodes.size()); Range[] prefix0 = PrefixedIntHashKey.ranges(0, nodes.size()); Range[] prefix1 = PrefixedIntHashKey.ranges(1, nodes.size()); Range[] allRanges = addAll(prefix0, prefix1); - boolean add = rs.nextBoolean(); - Topology initialTopology = TopologyUtils.topology(1, nodes, Ranges.of(add ? prefix0 : allRanges), rf); - Topology updatedTopology = TopologyUtils.topology(2, nodes, Ranges.of(add ? allRanges : prefix0), rf); - LinkedList<Request> requests = new LinkedList<>(); - requests.add(new Request() + Topology initialTopology = TopologyUtils.topology(1, nodes, Ranges.of(allRanges), rf); + Topology updatedTopology = TopologyUtils.topology(2, nodes, Ranges.of(prefix0), rf); // drop prefix1 + + cluster(rs::fork, nodes, initialTopology, nodeMap -> new Request() { @Override public void process(Node node, Node.Id from, ReplyContext replyContext) { - Ranges localRange = node.topology().localRangesForEpoch(initialTopology.epoch()); - if (!add) - localRange = Ranges.ofSortedAndDeoverlapped(prefix1); // make sure to use the range removed + Ranges localRange = Ranges.ofSortedAndDeoverlapped(prefix1); // make sure to use the range removed Gen<Key> keyGen = AccordGens.prefixedIntHashKeyInsideRanges(localRange); Keys keys = Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs)); @@ -103,9 +107,27 @@ class CommandsTest TxnId txnId = node.nextTxnId(Txn.Kind.Write, Routable.Domain.Key); - ((TestableConfigurationService) node.configService()).reportTopology(updatedTopology); + for (Node n : nodeMap.values()) + ((TestableConfigurationService) n.configService()).reportTopology(updatedTopology); - node.coordinate(txnId, txn); + node.coordinate(txnId, txn).addCallback((success, failure) -> { + if (failure == null) + { + node.agent().onUncaughtException(new AssertionError("Expected TopologyMismatch exception, but txn was success")); + } + else if (!(failure instanceof TopologyMismatch)) + { + if (failure instanceof Timeout) + { + // TODO (now): we don't know the result... + logger.warn("Timeout seen..."); + } + else + { + node.agent().onUncaughtException(new AssertionError("Expected TopologyMismatch exception, but failed with different exception", failure)); + } + } + }); } @Override @@ -114,14 +136,10 @@ class CommandsTest return null; } }); - cluster(rs::fork, nodes, initialTopology, () -> { - if (requests.isEmpty()) return null; - return requests.pop(); - }); }); } - static void cluster(Supplier<RandomSource> randomSupplier, List<Node.Id> nodes, Topology initialTopology, Supplier<Request> requests) + static void cluster(Supplier<RandomSource> randomSupplier, List<Node.Id> nodes, Topology initialTopology, Function<Map<Node.Id, Node>, Request> init) { List<Throwable> failures = Collections.synchronizedList(new ArrayList<>()); PropagatingPendingQueue queue = new PropagatingPendingQueue(failures, new RandomDelayQueue(randomSupplier.get())); @@ -146,43 +164,43 @@ class CommandsTest SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, new ListAgent(1000L, failures::add, retryBootstrap, (i1, i2) -> { throw new IllegalAccessError("Global executor should enver get a stale event"); })); - TopologyFactory topologyFactory = new TopologyFactory(initialTopology.maxRf(), initialTopology.ranges().stream().toArray(Range[]::new)) { + TopologyFactory topologyFactory = new TopologyFactory(initialTopology.maxRf(), initialTopology.ranges().stream().toArray(Range[]::new)) + { @Override public Topology toTopology(Node.Id[] cluster) { return initialTopology; } }; - /* - Id[] nodes, - MessageListener messageListener, - Supplier<PendingQueue> queueSupplier, - BiFunction<Id, BiConsumer<Timestamp, Ranges>, AgentExecutor> nodeExecutorSupplier, - Runnable checkFailures, - Consumer<Packet> responseSink, - Supplier<RandomSource> randomSupplier, - Supplier<LongSupplier> nowSupplierSupplier, - TopologyFactory topologyFactory, - Supplier<Packet> in, - Consumer<Runnable> noMoreWorkSignal - */ AtomicInteger counter = new AtomicInteger(); + AtomicReference<Map<Node.Id, Node>> nodeMap = new AtomicReference<>(); Cluster.run(nodes.toArray(Node.Id[]::new), MessageListener.Noop.INSTANCE, () -> queue, (id, onStale) -> globalExecutor.withAgent(agentSupplier.apply(onStale)), queue::checkFailures, - ignore -> {}, + ignore -> { + }, randomSupplier, nowSupplier, topologyFactory, - () -> { - Request request = requests.get(); - if (request == null) - return null; - return new Packet(N1, N1, counter.incrementAndGet(), request); + new Supplier<>() + { + private Iterator<Request> requestIterator = null; + private final RandomSource rs = randomSupplier.get(); + @Override + public Packet get() + { + if (requestIterator == null) + requestIterator = Collections.singleton(init.apply(nodeMap.get())).iterator(); + if (!requestIterator.hasNext()) + return null; + Node.Id id = rs.pick(nodes); + return new Packet(id, id, counter.incrementAndGet(), requestIterator.next()); + } }, - Runnable::run); + Runnable::run, + nodeMap::set); if (!failures.isEmpty()) { AssertionError error = new AssertionError("Unexpected errors detected"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
