This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch CASSANDRA-18804
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit a718cf4fd86dfa8ba775e2f35215de372205e0f6
Author: David Capwell <[email protected]>
AuthorDate: Tue Nov 28 14:53:06 2023 -0800

    test stable
---
 .../src/test/java/accord/burn/BurnTest.java        |   3 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |   4 +-
 .../src/test/java/accord/local/CommandsTest.java   | 102 ++++++++++++---------
 3 files changed, 65 insertions(+), 44 deletions(-)

diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 684dd77c..722d2657 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -376,7 +376,8 @@ public class BurnTest
                                           queue::checkFailures,
                                           responseSink, random::fork, 
nowSupplier,
                                           topologyFactory, 
initialRequests::poll,
-                                          onSubmitted::set
+                                          onSubmitted::set,
+                                          ignore -> {}
             );
             for (Verifier verifier : validators.values())
                 verifier.close();
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index cf21bc0c..a9dbcf5f 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -262,7 +262,8 @@ public class Cluster implements Scheduler
                                               BiFunction<Id, 
BiConsumer<Timestamp, Ranges>, AgentExecutor> nodeExecutorSupplier,
                                               Runnable checkFailures, 
Consumer<Packet> responseSink,
                                               Supplier<RandomSource> 
randomSupplier, Supplier<LongSupplier> nowSupplierSupplier,
-                                              TopologyFactory topologyFactory, 
Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal)
+                                              TopologyFactory topologyFactory, 
Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal,
+                                              Consumer<Map<Id, Node>> 
readySignal)
     {
         Topology topology = topologyFactory.toTopology(nodes);
         Map<Id, Node> nodeMap = new LinkedHashMap<>();
@@ -344,6 +345,7 @@ public class Cluster implements Scheduler
                 reconfigure.cancel();
                 
durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop);
             });
+            readySignal.accept(nodeMap);
 
             Packet next;
             while ((next = in.get()) != null)
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java 
b/accord-core/src/test/java/accord/local/CommandsTest.java
index f91003e8..0e3133c9 100644
--- a/accord-core/src/test/java/accord/local/CommandsTest.java
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -21,6 +21,8 @@ package accord.local;
 import accord.api.Key;
 import accord.api.TestableConfigurationService;
 import accord.burn.random.FrequentLargeRange;
+import accord.coordinate.Timeout;
+import accord.coordinate.TopologyMismatch;
 import accord.impl.MessageListener;
 import accord.impl.PrefixedIntHashKey;
 import accord.impl.TopologyFactory;
@@ -47,55 +49,57 @@ import accord.utils.AccordGens;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static accord.Utils.listWriteTxn;
 import static accord.utils.Property.qt;
 import static accord.utils.Utils.addAll;
 
 class CommandsTest
 {
-    private static final Node.Id N1 = new Node.Id(1);
+    private static final Logger logger = 
LoggerFactory.getLogger(CommandsTest.class);
 
     @Test
-    void addAndRemoveRangesValidate()
+    void removeRangesValidate()
     {
-        Gen<List<Node.Id>> nodeGen = 
Gens.lists(AccordGens.nodes()).ofSizeBetween(1, 100);
+        Gen<List<Node.Id>> nodeGen = 
Gens.lists(AccordGens.nodes()).ofSizeBetween(1, 10);
         qt().check(rs -> {
             List<Node.Id> nodes = nodeGen.next(rs);
-            if (!nodes.contains(N1))
-                nodes.add(N1);
             nodes.sort(Comparator.naturalOrder());
+            logger.info("Running with {} nodes", nodes.size());
             int rf = Math.min(3, nodes.size());
             Range[] prefix0 = PrefixedIntHashKey.ranges(0, nodes.size());
             Range[] prefix1 = PrefixedIntHashKey.ranges(1, nodes.size());
             Range[] allRanges = addAll(prefix0, prefix1);
-            boolean add = rs.nextBoolean();
-            Topology initialTopology = TopologyUtils.topology(1, nodes, 
Ranges.of(add ? prefix0 : allRanges), rf);
-            Topology updatedTopology = TopologyUtils.topology(2, nodes, 
Ranges.of(add ? allRanges : prefix0), rf);
 
-            LinkedList<Request> requests = new LinkedList<>();
-            requests.add(new Request()
+            Topology initialTopology = TopologyUtils.topology(1, nodes, 
Ranges.of(allRanges), rf);
+            Topology updatedTopology = TopologyUtils.topology(2, nodes, 
Ranges.of(prefix0), rf); // drop prefix1
+
+            cluster(rs::fork, nodes, initialTopology, nodeMap -> new Request()
             {
                 @Override
                 public void process(Node node, Node.Id from, ReplyContext 
replyContext)
                 {
-                    Ranges localRange = 
node.topology().localRangesForEpoch(initialTopology.epoch());
-                    if (!add)
-                        localRange = Ranges.ofSortedAndDeoverlapped(prefix1); 
// make sure to use the range removed
+                    Ranges localRange = 
Ranges.ofSortedAndDeoverlapped(prefix1); // make sure to use the range removed
 
                     Gen<Key> keyGen = 
AccordGens.prefixedIntHashKeyInsideRanges(localRange);
                     Keys keys = 
Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs));
@@ -103,9 +107,27 @@ class CommandsTest
 
                     TxnId txnId = node.nextTxnId(Txn.Kind.Write, 
Routable.Domain.Key);
 
-                    ((TestableConfigurationService) 
node.configService()).reportTopology(updatedTopology);
+                    for (Node n : nodeMap.values())
+                        ((TestableConfigurationService) 
n.configService()).reportTopology(updatedTopology);
 
-                    node.coordinate(txnId, txn);
+                    node.coordinate(txnId, txn).addCallback((success, failure) 
-> {
+                        if (failure == null)
+                        {
+                            node.agent().onUncaughtException(new 
AssertionError("Expected TopologyMismatch exception, but txn was success"));
+                        }
+                        else if (!(failure instanceof TopologyMismatch))
+                        {
+                            if (failure instanceof Timeout)
+                            {
+                                // TODO (now): we don't know the result...
+                                logger.warn("Timeout seen...");
+                            }
+                            else
+                            {
+                                node.agent().onUncaughtException(new 
AssertionError("Expected TopologyMismatch exception, but failed with different 
exception", failure));
+                            }
+                        }
+                    });
                 }
 
                 @Override
@@ -114,14 +136,10 @@ class CommandsTest
                     return null;
                 }
             });
-            cluster(rs::fork, nodes, initialTopology, () -> {
-                if (requests.isEmpty()) return null;
-                return requests.pop();
-            });
         });
     }
 
-    static void cluster(Supplier<RandomSource> randomSupplier, List<Node.Id> 
nodes, Topology initialTopology, Supplier<Request> requests)
+    static void cluster(Supplier<RandomSource> randomSupplier, List<Node.Id> 
nodes, Topology initialTopology, Function<Map<Node.Id, Node>, Request> init)
     {
         List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
         PropagatingPendingQueue queue = new PropagatingPendingQueue(failures, 
new RandomDelayQueue(randomSupplier.get()));
@@ -146,43 +164,43 @@ class CommandsTest
         SimulatedDelayedExecutorService globalExecutor = new 
SimulatedDelayedExecutorService(queue, new ListAgent(1000L, failures::add, 
retryBootstrap, (i1, i2) -> {
             throw new IllegalAccessError("Global executor should enver get a 
stale event");
         }));
-        TopologyFactory topologyFactory = new 
TopologyFactory(initialTopology.maxRf(), 
initialTopology.ranges().stream().toArray(Range[]::new)) {
+        TopologyFactory topologyFactory = new 
TopologyFactory(initialTopology.maxRf(), 
initialTopology.ranges().stream().toArray(Range[]::new))
+        {
             @Override
             public Topology toTopology(Node.Id[] cluster)
             {
                 return initialTopology;
             }
         };
-        /*
-            Id[] nodes,
-            MessageListener messageListener,
-            Supplier<PendingQueue> queueSupplier,
-            BiFunction<Id, BiConsumer<Timestamp, Ranges>, AgentExecutor> 
nodeExecutorSupplier,
-            Runnable checkFailures,
-            Consumer<Packet> responseSink,
-            Supplier<RandomSource> randomSupplier,
-            Supplier<LongSupplier> nowSupplierSupplier,
-            TopologyFactory topologyFactory,
-            Supplier<Packet> in,
-            Consumer<Runnable> noMoreWorkSignal
-             */
         AtomicInteger counter = new AtomicInteger();
+        AtomicReference<Map<Node.Id, Node>> nodeMap = new AtomicReference<>();
         Cluster.run(nodes.toArray(Node.Id[]::new),
                     MessageListener.Noop.INSTANCE,
                     () -> queue,
                     (id, onStale) -> 
globalExecutor.withAgent(agentSupplier.apply(onStale)),
                     queue::checkFailures,
-                    ignore -> {},
+                    ignore -> {
+                    },
                     randomSupplier,
                     nowSupplier,
                     topologyFactory,
-                    () -> {
-                        Request request = requests.get();
-                        if (request == null)
-                            return null;
-                        return new Packet(N1, N1, counter.incrementAndGet(), 
request);
+                    new Supplier<>()
+                    {
+                        private Iterator<Request> requestIterator = null;
+                        private final RandomSource rs = randomSupplier.get();
+                        @Override
+                        public Packet get()
+                        {
+                            if (requestIterator == null)
+                                requestIterator = 
Collections.singleton(init.apply(nodeMap.get())).iterator();
+                            if (!requestIterator.hasNext())
+                                return null;
+                            Node.Id id = rs.pick(nodes);
+                            return new Packet(id, id, 
counter.incrementAndGet(), requestIterator.next());
+                        }
                     },
-                    Runnable::run);
+                    Runnable::run,
+                    nodeMap::set);
         if (!failures.isEmpty())
         {
             AssertionError error = new AssertionError("Unexpected errors 
detected");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to