This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch CASSANDRA-18804 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 1810a84b763e98134d36dc692889735486d84062 Author: David Capwell <[email protected]> AuthorDate: Mon Nov 27 16:38:59 2023 -0800 working on backporting tests to show invalidate doesnt break on topology drop --- accord-core/src/main/java/accord/utils/Utils.java | 9 + accord-core/src/test/java/accord/Utils.java | 15 ++ .../src/test/java/accord/burn/BurnTest.java | 1 + .../src/test/java/accord/impl/basic/Cluster.java | 2 +- .../java/accord/impl/basic/RandomDelayQueue.java | 2 +- .../src/test/java/accord/local/CommandsTest.java | 206 +++++++++++++++++++++ .../src/test/java/accord/utils/AccordGens.java | 15 ++ 7 files changed, 248 insertions(+), 2 deletions(-) diff --git a/accord-core/src/main/java/accord/utils/Utils.java b/accord-core/src/main/java/accord/utils/Utils.java index 696edb94..1358db22 100644 --- a/accord-core/src/main/java/accord/utils/Utils.java +++ b/accord-core/src/main/java/accord/utils/Utils.java @@ -18,6 +18,7 @@ package accord.utils; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -117,4 +118,12 @@ public class Utils { return set instanceof ImmutableBitSet ? (ImmutableBitSet) set : new ImmutableBitSet(set); } + + public static <T> T[] addAll(T[] first, T[] second) + { + T[] array = (T[]) Array.newInstance(first.getClass().getComponentType(), first.length + second.length); + System.arraycopy(first, 0, array, 0, first.length); + System.arraycopy(second, 0, array, first.length, second.length); + return array; + } } diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java index ac36fb16..40d4df08 100644 --- a/accord-core/src/test/java/accord/Utils.java +++ b/accord-core/src/test/java/accord/Utils.java @@ -23,9 +23,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import com.google.common.collect.Sets; +import accord.api.Key; import accord.api.MessageSink; import accord.api.Scheduler; import accord.coordinate.TxnExecute; @@ -36,6 +38,9 @@ import accord.impl.IntKey; import accord.impl.SimpleProgressLog; import accord.impl.SizeOfIntersectionSorter; import accord.impl.TestAgent; +import accord.impl.list.ListQuery; +import accord.impl.list.ListRead; +import accord.impl.list.ListUpdate; import accord.impl.mock.MockCluster; import accord.impl.mock.MockConfigurationService; import accord.impl.mock.MockStore; @@ -118,6 +123,16 @@ public class Utils return new Txn.InMemory(ranges, MockStore.read(ranges), MockStore.QUERY, MockStore.update(ranges)); } + public static Txn listWriteTxn(Node.Id client, Keys keys) + { + ListUpdate update = new ListUpdate(Function.identity()); + for (Key k : keys) + update.put(k, 1); + ListRead read = new ListRead(Function.identity(), keys, keys); + ListQuery query = new ListQuery(client, keys.size()); + return new Txn.InMemory(keys, read, query, update); + } + public static Txn readTxn(Keys keys) { return new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY); diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index cba50420..684dd77c 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -455,6 +455,7 @@ public class BurnTest @Timeout(value = 3, unit = TimeUnit.MINUTES) public void testOne() { + while (true) run(ThreadLocalRandom.current().nextLong(), 1000); } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 43b8920e..cf21bc0c 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -504,7 +504,7 @@ public class Cluster implements Scheduler case RANDOM_BIDIRECTIONAL: case RANDOM_UNIDIRECTIONAL: boolean bidirectional = kind == RANDOM_BIDIRECTIONAL; - int count = random.nextInt(bidirectional || random.nextBoolean() ? nodesList.size() : (nodesList.size() * nodesList.size())/2); + int count = random.nextInt(bidirectional || random.nextBoolean() ? nodesList.size() : Math.max(1, (nodesList.size() * nodesList.size())/2)); return randomOverrides(bidirectional, linkOverride, count, nodesList, random, defaultLinks); } }; diff --git a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java index 85eb2365..92d7d30e 100644 --- a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java +++ b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java @@ -93,7 +93,7 @@ public class RandomDelayQueue implements PendingQueue long now; int seq; - RandomDelayQueue(RandomSource random) + public RandomDelayQueue(RandomSource random) { this.jitterMillis = FrequentLargeRange.builder(random) .small(0, 50, TimeUnit.MICROSECONDS) diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java b/accord-core/src/test/java/accord/local/CommandsTest.java new file mode 100644 index 00000000..967ceecc --- /dev/null +++ b/accord-core/src/test/java/accord/local/CommandsTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.local; + +import accord.api.Key; +import accord.api.TestableConfigurationService; +import accord.burn.random.FrequentLargeRange; +import accord.impl.MessageListener; +import accord.impl.PrefixedIntHashKey; +import accord.impl.TopologyFactory; +import accord.impl.basic.Cluster; +import accord.impl.basic.Packet; +import accord.impl.basic.PendingRunnable; +import accord.impl.basic.PropagatingPendingQueue; +import accord.impl.basic.RandomDelayQueue; +import accord.impl.basic.SimulatedDelayedExecutorService; +import accord.impl.list.ListAgent; +import accord.messages.MessageType; +import accord.messages.PreAccept; +import accord.messages.ReplyContext; +import accord.messages.Request; +import accord.primitives.Timestamp; +import accord.topology.TopologyUtils; +import accord.primitives.FullRoute; +import accord.primitives.Keys; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.Routable; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.topology.Topology; +import accord.utils.AccordGens; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.RandomSource; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +import static accord.Utils.listWriteTxn; +import static accord.Utils.writeTxn; +import static accord.utils.Property.qt; +import static accord.utils.Utils.addAll; + +class CommandsTest +{ + private static final Node.Id N1 = new Node.Id(1); + + @Test + void addAndRemoveRangesValidate() + { + Gen<List<Node.Id>> nodeGen = Gens.lists(AccordGens.nodes()).ofSizeBetween(1, 100); + qt().withSeed(-8991099031722289106L).check(rs -> { + List<Node.Id> nodes = nodeGen.next(rs); + if (!nodes.contains(N1)) + nodes.add(N1); + nodes.sort(Comparator.naturalOrder()); + int rf = Math.min(3, nodes.size()); + Range[] prefix0 = PrefixedIntHashKey.ranges(0, nodes.size()); + Range[] prefix1 = PrefixedIntHashKey.ranges(1, nodes.size()); + Range[] allRanges = addAll(prefix0, prefix1); +// boolean add = rs.nextBoolean(); + boolean add = false; + Topology initialTopology = TopologyUtils.topology(1, nodes, Ranges.of(add ? prefix0 : allRanges), rf); + Topology updatedTopology = TopologyUtils.topology(2, nodes, Ranges.of(add ? allRanges : prefix0), rf); + + LinkedList<Request> requests = new LinkedList<>(); + requests.add(new Request() + { + @Override + public void process(Node node, Node.Id from, ReplyContext replyContext) + { + Ranges localRange = node.topology().localRangesForEpoch(initialTopology.epoch()); + int prefix; + if (!add) + { + // use the removed range + localRange = Ranges.ofSortedAndDeoverlapped(prefix1); + prefix = 1; + } + else + { + prefix = rs.pickInt(0, 1); + } + + Gen<Key> keyGen = AccordGens.prefixedIntHashKey(ignore -> prefix).filter(localRange::contains); + Keys keys = Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs)); + Txn txn = listWriteTxn(from, keys); + + TxnId txnId = node.nextTxnId(Txn.Kind.Write, Routable.Domain.Key); + + ((TestableConfigurationService) node.configService()).reportTopology(updatedTopology); + + node.coordinate(txnId, txn); + } + + @Override + public MessageType type() + { + return null; + } + }); + cluster(rs::fork, nodes, initialTopology, () -> { + if (requests.isEmpty()) return null; + return requests.pop(); + }); + }); + } + + static void cluster(Supplier<RandomSource> randomSupplier, List<Node.Id> nodes, Topology initialTopology, Supplier<Request> requests) + { + List<Throwable> failures = Collections.synchronizedList(new ArrayList<>()); + PropagatingPendingQueue queue = new PropagatingPendingQueue(failures, new RandomDelayQueue(randomSupplier.get())); + RandomSource retryRandom = randomSupplier.get(); + Consumer<Runnable> retryBootstrap = retry -> { + long delay = retryRandom.nextInt(1, 15); + queue.add((PendingRunnable) retry::run, delay, TimeUnit.SECONDS); + }; + Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier = onStale -> new ListAgent(1000L, failures::add, retryBootstrap, onStale); + RandomSource nowRandom = randomSupplier.get(); + Supplier<LongSupplier> nowSupplier = () -> { + RandomSource forked = nowRandom.fork(); + // TODO (now): meta-randomise scale of clock drift + return FrequentLargeRange.builder(forked) + .ratio(1, 5) + .small(50, 5000, TimeUnit.MICROSECONDS) + .large(1, 10, TimeUnit.MILLISECONDS) + .build() + .mapAsLong(j -> Math.max(0, queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j))) + .asLongSupplier(forked); + }; + SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, new ListAgent(1000L, failures::add, retryBootstrap, (i1, i2) -> { + throw new IllegalAccessError("Global executor should enver get a stale event"); + })); + TopologyFactory topologyFactory = new TopologyFactory(initialTopology.maxRf(), initialTopology.ranges().stream().toArray(Range[]::new)) { + @Override + public Topology toTopology(Node.Id[] cluster) + { + return initialTopology; + } + }; + /* + Id[] nodes, + MessageListener messageListener, + Supplier<PendingQueue> queueSupplier, + BiFunction<Id, BiConsumer<Timestamp, Ranges>, AgentExecutor> nodeExecutorSupplier, + Runnable checkFailures, + Consumer<Packet> responseSink, + Supplier<RandomSource> randomSupplier, + Supplier<LongSupplier> nowSupplierSupplier, + TopologyFactory topologyFactory, + Supplier<Packet> in, + Consumer<Runnable> noMoreWorkSignal + */ + AtomicInteger counter = new AtomicInteger(); + Cluster.run(nodes.toArray(Node.Id[]::new), + MessageListener.Noop.INSTANCE, + () -> queue, + (id, onStale) -> globalExecutor.withAgent(agentSupplier.apply(onStale)), + queue::checkFailures, + ignore -> {}, + randomSupplier, + nowSupplier, + topologyFactory, + () -> { + Request request = requests.get(); + if (request == null) + return null; + return new Packet(N1, N1, counter.incrementAndGet(), request); + }, + Runnable::run); + if (!failures.isEmpty()) + { + AssertionError error = new AssertionError("Unexpected errors detected"); + failures.forEach(error::addSuppressed); + throw error; + } + } +} \ No newline at end of file diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java b/accord-core/src/test/java/accord/utils/AccordGens.java index 59748995..7dc7c798 100644 --- a/accord-core/src/test/java/accord/utils/AccordGens.java +++ b/accord-core/src/test/java/accord/utils/AccordGens.java @@ -92,6 +92,21 @@ public class AccordGens return rs -> IntHashKey.key(rs.nextInt()); } + public static Gen<Key> prefixedIntHashKey() + { + return prefixedIntHashKey(RandomSource::nextInt, RandomSource::nextInt); + } + + public static Gen<Key> prefixedIntHashKey(Gen.IntGen prefixGen) + { + return prefixedIntHashKey(prefixGen, RandomSource::nextInt); + } + + public static Gen<Key> prefixedIntHashKey(Gen.IntGen prefixGen, Gen.IntGen valueGen) + { + return rs -> PrefixedIntHashKey.key(prefixGen.nextInt(rs), valueGen.nextInt(rs)); + } + public static Gen<KeyDeps> keyDeps(Gen<? extends Key> keyGen) { return keyDeps(keyGen, txnIds()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
