This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch CASSANDRA-18804
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 1810a84b763e98134d36dc692889735486d84062
Author: David Capwell <[email protected]>
AuthorDate: Mon Nov 27 16:38:59 2023 -0800

    working on backporting tests to show invalidate doesnt break on topology 
drop
---
 accord-core/src/main/java/accord/utils/Utils.java  |   9 +
 accord-core/src/test/java/accord/Utils.java        |  15 ++
 .../src/test/java/accord/burn/BurnTest.java        |   1 +
 .../src/test/java/accord/impl/basic/Cluster.java   |   2 +-
 .../java/accord/impl/basic/RandomDelayQueue.java   |   2 +-
 .../src/test/java/accord/local/CommandsTest.java   | 206 +++++++++++++++++++++
 .../src/test/java/accord/utils/AccordGens.java     |  15 ++
 7 files changed, 248 insertions(+), 2 deletions(-)

diff --git a/accord-core/src/main/java/accord/utils/Utils.java 
b/accord-core/src/main/java/accord/utils/Utils.java
index 696edb94..1358db22 100644
--- a/accord-core/src/main/java/accord/utils/Utils.java
+++ b/accord-core/src/main/java/accord/utils/Utils.java
@@ -18,6 +18,7 @@
 
 package accord.utils;
 
+import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -117,4 +118,12 @@ public class Utils
     {
         return set instanceof ImmutableBitSet ? (ImmutableBitSet) set : new 
ImmutableBitSet(set);
     }
+
+    public static <T> T[] addAll(T[] first, T[] second)
+    {
+        T[] array = (T[]) 
Array.newInstance(first.getClass().getComponentType(), first.length + 
second.length);
+        System.arraycopy(first, 0, array, 0, first.length);
+        System.arraycopy(second, 0, array, first.length, second.length);
+        return array;
+    }
 }
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index ac36fb16..40d4df08 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -23,9 +23,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import com.google.common.collect.Sets;
 
+import accord.api.Key;
 import accord.api.MessageSink;
 import accord.api.Scheduler;
 import accord.coordinate.TxnExecute;
@@ -36,6 +38,9 @@ import accord.impl.IntKey;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.impl.TestAgent;
+import accord.impl.list.ListQuery;
+import accord.impl.list.ListRead;
+import accord.impl.list.ListUpdate;
 import accord.impl.mock.MockCluster;
 import accord.impl.mock.MockConfigurationService;
 import accord.impl.mock.MockStore;
@@ -118,6 +123,16 @@ public class Utils
         return new Txn.InMemory(ranges, MockStore.read(ranges), 
MockStore.QUERY, MockStore.update(ranges));
     }
 
+    public static Txn listWriteTxn(Node.Id client, Keys keys)
+    {
+        ListUpdate update = new ListUpdate(Function.identity());
+        for (Key k : keys)
+            update.put(k, 1);
+        ListRead read = new ListRead(Function.identity(), keys, keys);
+        ListQuery query = new ListQuery(client, keys.size());
+        return new Txn.InMemory(keys, read, query, update);
+    }
+
     public static Txn readTxn(Keys keys)
     {
         return new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY);
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index cba50420..684dd77c 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -455,6 +455,7 @@ public class BurnTest
     @Timeout(value = 3, unit = TimeUnit.MINUTES)
     public void testOne()
     {
+        while (true)
         run(ThreadLocalRandom.current().nextLong(), 1000);
     }
 
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 43b8920e..cf21bc0c 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -504,7 +504,7 @@ public class Cluster implements Scheduler
                 case RANDOM_BIDIRECTIONAL:
                 case RANDOM_UNIDIRECTIONAL:
                     boolean bidirectional = kind == RANDOM_BIDIRECTIONAL;
-                    int count = random.nextInt(bidirectional || 
random.nextBoolean() ? nodesList.size() : (nodesList.size() * 
nodesList.size())/2);
+                    int count = random.nextInt(bidirectional || 
random.nextBoolean() ? nodesList.size() : Math.max(1, (nodesList.size() * 
nodesList.size())/2));
                     return randomOverrides(bidirectional, linkOverride, count, 
nodesList, random, defaultLinks);
             }
         };
diff --git a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java 
b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
index 85eb2365..92d7d30e 100644
--- a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
+++ b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
@@ -93,7 +93,7 @@ public class RandomDelayQueue implements PendingQueue
     long now;
     int seq;
 
-    RandomDelayQueue(RandomSource random)
+    public RandomDelayQueue(RandomSource random)
     {
         this.jitterMillis = FrequentLargeRange.builder(random)
                                               .small(0, 50, 
TimeUnit.MICROSECONDS)
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java 
b/accord-core/src/test/java/accord/local/CommandsTest.java
new file mode 100644
index 00000000..967ceecc
--- /dev/null
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.api.Key;
+import accord.api.TestableConfigurationService;
+import accord.burn.random.FrequentLargeRange;
+import accord.impl.MessageListener;
+import accord.impl.PrefixedIntHashKey;
+import accord.impl.TopologyFactory;
+import accord.impl.basic.Cluster;
+import accord.impl.basic.Packet;
+import accord.impl.basic.PendingRunnable;
+import accord.impl.basic.PropagatingPendingQueue;
+import accord.impl.basic.RandomDelayQueue;
+import accord.impl.basic.SimulatedDelayedExecutorService;
+import accord.impl.list.ListAgent;
+import accord.messages.MessageType;
+import accord.messages.PreAccept;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.primitives.Timestamp;
+import accord.topology.TopologyUtils;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topology;
+import accord.utils.AccordGens;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.RandomSource;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import static accord.Utils.listWriteTxn;
+import static accord.Utils.writeTxn;
+import static accord.utils.Property.qt;
+import static accord.utils.Utils.addAll;
+
+class CommandsTest
+{
+    private static final Node.Id N1 = new Node.Id(1);
+
+    @Test
+    void addAndRemoveRangesValidate()
+    {
+        Gen<List<Node.Id>> nodeGen = 
Gens.lists(AccordGens.nodes()).ofSizeBetween(1, 100);
+        qt().withSeed(-8991099031722289106L).check(rs -> {
+            List<Node.Id> nodes = nodeGen.next(rs);
+            if (!nodes.contains(N1))
+                nodes.add(N1);
+            nodes.sort(Comparator.naturalOrder());
+            int rf = Math.min(3, nodes.size());
+            Range[] prefix0 = PrefixedIntHashKey.ranges(0, nodes.size());
+            Range[] prefix1 = PrefixedIntHashKey.ranges(1, nodes.size());
+            Range[] allRanges = addAll(prefix0, prefix1);
+//            boolean add = rs.nextBoolean();
+            boolean add = false;
+            Topology initialTopology = TopologyUtils.topology(1, nodes, 
Ranges.of(add ? prefix0 : allRanges), rf);
+            Topology updatedTopology = TopologyUtils.topology(2, nodes, 
Ranges.of(add ? allRanges : prefix0), rf);
+
+            LinkedList<Request> requests = new LinkedList<>();
+            requests.add(new Request()
+            {
+                @Override
+                public void process(Node node, Node.Id from, ReplyContext 
replyContext)
+                {
+                    Ranges localRange = 
node.topology().localRangesForEpoch(initialTopology.epoch());
+                    int prefix;
+                    if (!add)
+                    {
+                        // use the removed range
+                        localRange = Ranges.ofSortedAndDeoverlapped(prefix1);
+                        prefix = 1;
+                    }
+                    else
+                    {
+                        prefix = rs.pickInt(0, 1);
+                    }
+
+                    Gen<Key> keyGen = AccordGens.prefixedIntHashKey(ignore -> 
prefix).filter(localRange::contains);
+                    Keys keys = 
Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs));
+                    Txn txn = listWriteTxn(from, keys);
+
+                    TxnId txnId = node.nextTxnId(Txn.Kind.Write, 
Routable.Domain.Key);
+
+                    ((TestableConfigurationService) 
node.configService()).reportTopology(updatedTopology);
+
+                    node.coordinate(txnId, txn);
+                }
+
+                @Override
+                public MessageType type()
+                {
+                    return null;
+                }
+            });
+            cluster(rs::fork, nodes, initialTopology, () -> {
+                if (requests.isEmpty()) return null;
+                return requests.pop();
+            });
+        });
+    }
+
+    static void cluster(Supplier<RandomSource> randomSupplier, List<Node.Id> 
nodes, Topology initialTopology, Supplier<Request> requests)
+    {
+        List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
+        PropagatingPendingQueue queue = new PropagatingPendingQueue(failures, 
new RandomDelayQueue(randomSupplier.get()));
+        RandomSource retryRandom = randomSupplier.get();
+        Consumer<Runnable> retryBootstrap = retry -> {
+            long delay = retryRandom.nextInt(1, 15);
+            queue.add((PendingRunnable) retry::run, delay, TimeUnit.SECONDS);
+        };
+        Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier = 
onStale -> new ListAgent(1000L, failures::add, retryBootstrap, onStale);
+        RandomSource nowRandom = randomSupplier.get();
+        Supplier<LongSupplier> nowSupplier = () -> {
+            RandomSource forked = nowRandom.fork();
+            // TODO (now): meta-randomise scale of clock drift
+            return FrequentLargeRange.builder(forked)
+                                     .ratio(1, 5)
+                                     .small(50, 5000, TimeUnit.MICROSECONDS)
+                                     .large(1, 10, TimeUnit.MILLISECONDS)
+                                     .build()
+                                     .mapAsLong(j -> Math.max(0, 
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
+                                     .asLongSupplier(forked);
+        };
+        SimulatedDelayedExecutorService globalExecutor = new 
SimulatedDelayedExecutorService(queue, new ListAgent(1000L, failures::add, 
retryBootstrap, (i1, i2) -> {
+            throw new IllegalAccessError("Global executor should enver get a 
stale event");
+        }));
+        TopologyFactory topologyFactory = new 
TopologyFactory(initialTopology.maxRf(), 
initialTopology.ranges().stream().toArray(Range[]::new)) {
+            @Override
+            public Topology toTopology(Node.Id[] cluster)
+            {
+                return initialTopology;
+            }
+        };
+        /*
+            Id[] nodes,
+            MessageListener messageListener,
+            Supplier<PendingQueue> queueSupplier,
+            BiFunction<Id, BiConsumer<Timestamp, Ranges>, AgentExecutor> 
nodeExecutorSupplier,
+            Runnable checkFailures,
+            Consumer<Packet> responseSink,
+            Supplier<RandomSource> randomSupplier,
+            Supplier<LongSupplier> nowSupplierSupplier,
+            TopologyFactory topologyFactory,
+            Supplier<Packet> in,
+            Consumer<Runnable> noMoreWorkSignal
+             */
+        AtomicInteger counter = new AtomicInteger();
+        Cluster.run(nodes.toArray(Node.Id[]::new),
+                    MessageListener.Noop.INSTANCE,
+                    () -> queue,
+                    (id, onStale) -> 
globalExecutor.withAgent(agentSupplier.apply(onStale)),
+                    queue::checkFailures,
+                    ignore -> {},
+                    randomSupplier,
+                    nowSupplier,
+                    topologyFactory,
+                    () -> {
+                        Request request = requests.get();
+                        if (request == null)
+                            return null;
+                        return new Packet(N1, N1, counter.incrementAndGet(), 
request);
+                    },
+                    Runnable::run);
+        if (!failures.isEmpty())
+        {
+            AssertionError error = new AssertionError("Unexpected errors 
detected");
+            failures.forEach(error::addSuppressed);
+            throw error;
+        }
+    }
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java 
b/accord-core/src/test/java/accord/utils/AccordGens.java
index 59748995..7dc7c798 100644
--- a/accord-core/src/test/java/accord/utils/AccordGens.java
+++ b/accord-core/src/test/java/accord/utils/AccordGens.java
@@ -92,6 +92,21 @@ public class AccordGens
         return rs -> IntHashKey.key(rs.nextInt());
     }
 
+    public static Gen<Key> prefixedIntHashKey()
+    {
+        return prefixedIntHashKey(RandomSource::nextInt, 
RandomSource::nextInt);
+    }
+
+    public static Gen<Key> prefixedIntHashKey(Gen.IntGen prefixGen)
+    {
+        return prefixedIntHashKey(prefixGen, RandomSource::nextInt);
+    }
+
+    public static Gen<Key> prefixedIntHashKey(Gen.IntGen prefixGen, Gen.IntGen 
valueGen)
+    {
+        return rs -> PrefixedIntHashKey.key(prefixGen.nextInt(rs), 
valueGen.nextInt(rs));
+    }
+
     public static Gen<KeyDeps> keyDeps(Gen<? extends Key> keyGen)
     {
         return keyDeps(keyGen, txnIds());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to