This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch semi-integrated-burn-test-rebased in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit d05cb74e29b81572179cd7058ef7f0aa661506d0 Author: Alex Petrov <[email protected]> AuthorDate: Mon Nov 11 18:39:02 2024 +0100 Move files --- accord-core/build.gradle | 5 +- .../java/accord/NetworkFilter.java | 0 .../src/{test => main-test}/java/accord/Utils.java | 35 +- .../accord/api/TestableConfigurationService.java | 0 .../main-test/java/accord/burn/BurnTestBase.java | 680 +++++++++++++++++++++ .../accord/burn/BurnTestConfigurationService.java | 0 .../java/accord/burn/SimulationException.java | 0 .../java/accord/burn/TopologyUpdates.java | 0 .../accord/burn/random/FrequentLargeRange.java | 0 .../java/accord/burn/random/IntRange.java | 0 .../java/accord/burn/random/RandomWalkRange.java | 0 .../java/accord/impl/IntHashKey.java | 2 +- .../java/accord/impl/IntKey.java | 0 .../java/accord/impl/MessageListener.java | 0 .../java/accord/impl/PrefixedIntHashKey.java | 2 +- .../java/accord/impl/TestAgent.java | 0 .../java/accord/impl/TopologyFactory.java | 0 .../java/accord/impl/basic/Cluster.java | 130 ++-- .../accord/impl/basic/DelayedCommandStores.java | 19 +- .../accord/impl/basic/MonitoredPendingQueue.java | 0 .../java/accord/impl/basic/NodeSink.java | 0 .../java/accord/impl/basic/Packet.java | 0 .../java/accord/impl/basic/Pending.java | 0 .../java/accord/impl/basic/PendingQueue.java | 0 .../java/accord/impl/basic/PendingRunnable.java | 0 .../java/accord/impl/basic/RandomDelayQueue.java | 0 .../impl/basic/RecurringPendingRunnable.java | 0 .../basic/SimulatedDelayedExecutorService.java | 0 .../java/accord/impl/basic/SimulatedFault.java | 0 .../accord/impl/basic/TaskExecutorService.java | 0 .../java/accord/impl/list/ListAgent.java | 0 .../java/accord/impl/list/ListData.java | 0 .../accord/impl/list/ListFetchCoordinator.java | 0 .../java/accord/impl/list/ListQuery.java | 6 +- .../java/accord/impl/list/ListRead.java | 2 +- .../java/accord/impl/list/ListRequest.java | 0 .../java/accord/impl/list/ListResult.java | 0 .../java/accord/impl/list/ListStore.java | 0 .../java/accord/impl/list/ListUpdate.java | 4 +- .../java/accord/impl/list/ListWrite.java | 4 +- .../java/accord/impl/mock/MockCluster.java | 19 +- .../accord/impl/mock/MockConfigurationService.java | 22 +- .../java/accord/impl/mock/MockStore.java | 0 .../java/accord/impl/mock/Network.java | 0 .../accord/impl/mock/RecordingMessageSink.java | 7 +- .../java/accord/impl/mock/SimpleMessageSink.java | 0 .../java/accord/topology/TopologyRandomizer.java | 4 +- .../java/accord/topology/TopologyUtils.java | 0 .../java/accord/utils/AccordGens.java | 2 +- .../java/accord/utils/CRCUtils.java | 0 .../java/accord/utils/EpochFunction.java | 0 .../{test => main-test}/java/accord/utils/Gen.java | 0 .../java/accord/utils/Gens.java | 0 .../java/accord/utils/LazyToString.java | 0 .../java/accord/utils/LoggingRandomSource.java | 0 .../java/accord/utils/MessageTask.java | 0 .../java/accord/utils/Pair.java | 0 .../java/accord/utils/Property.java | 0 .../java/accord/utils/RandomTestRunner.java | 0 .../java/accord/utils/ReflectionUtils.java | 0 .../java/accord/utils/SeedProvider.java | 0 .../java/accord/utils/async/TimeoutUtils.java | 0 .../java/accord/verify/CompositeVerifier.java | 0 .../java/accord/verify/HistoryViolation.java | 0 .../accord/verify/LinearizabilityVerifier.java | 0 .../accord/verify/SerializabilityVerifier.java | 0 .../verify/StrictSerializabilityVerifier.java | 0 .../java/accord/verify/Verifier.java | 0 68 files changed, 805 insertions(+), 138 deletions(-) diff --git a/accord-core/build.gradle b/accord-core/build.gradle index b30737c1..fd41a272 100644 --- a/accord-core/build.gradle +++ b/accord-core/build.gradle @@ -16,6 +16,7 @@ * limitations under the License. */ + plugins { id 'accord.java-conventions' id 'maven-publish' @@ -146,4 +147,6 @@ task allTestJar(type: Jar) { test { maxHeapSize = '4g' -} \ No newline at end of file +} + +sourceSets.main.java.srcDirs += ['src/main-test/java'] \ No newline at end of file diff --git a/accord-core/src/test/java/accord/NetworkFilter.java b/accord-core/src/main-test/java/accord/NetworkFilter.java similarity index 100% rename from accord-core/src/test/java/accord/NetworkFilter.java rename to accord-core/src/main-test/java/accord/NetworkFilter.java diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/main-test/java/accord/Utils.java similarity index 91% rename from accord-core/src/test/java/accord/Utils.java rename to accord-core/src/main-test/java/accord/Utils.java index 9ea7d858..198800c8 100644 --- a/accord-core/src/test/java/accord/Utils.java +++ b/accord-core/src/main-test/java/accord/Utils.java @@ -18,30 +18,28 @@ package accord; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.TimeUnit; import java.util.function.Function; -import accord.api.Agent; import com.google.common.collect.Sets; +import accord.api.Agent; import accord.api.Key; +import accord.api.LocalConfig; import accord.api.MessageSink; import accord.api.Scheduler; import accord.api.TopologySorter; -import accord.api.LocalConfig; import accord.coordinate.CoordinationAdapter; +import accord.impl.DefaultLocalListeners; +import accord.impl.DefaultRemoteListeners; import accord.impl.DefaultTimeouts; import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; -import accord.impl.DefaultLocalListeners; -import accord.impl.progresslog.DefaultProgressLogs; -import accord.impl.DefaultRemoteListeners; import accord.impl.SizeOfIntersectionSorter; import accord.impl.TestAgent; import accord.impl.list.ListQuery; @@ -50,6 +48,7 @@ import accord.impl.list.ListUpdate; import accord.impl.mock.MockCluster; import accord.impl.mock.MockConfigurationService; import accord.impl.mock.MockStore; +import accord.impl.progresslog.DefaultProgressLogs; import accord.local.DurableBefore; import accord.local.Node; import accord.local.Node.Id; @@ -67,8 +66,6 @@ import accord.utils.EpochFunction; import accord.utils.Invariants; import accord.utils.SortedArrays.SortedArrayList; import accord.utils.ThreadPoolScheduler; -import org.awaitility.Awaitility; -import org.awaitility.core.ThrowingRunnable; import static accord.utils.async.AsyncChains.awaitUninterruptibly; @@ -132,9 +129,10 @@ public class Utils public static Txn listWriteTxn(Id client, Keys keys) { - ListUpdate update = new ListUpdate(Function.identity()); + TreeMap<Key, Integer> map = new TreeMap<>(); for (Key k : keys) - update.put(k, 1); + map.put(k, 1); + ListUpdate update = new ListUpdate(Function.identity()); ListRead read = new ListRead(Function.identity(), false, keys, keys); ListQuery query = new ListQuery(client, keys.size(), false); return new Txn.InMemory(keys, read, query, update); @@ -202,21 +200,6 @@ public class Utils return node; } - public static void spinUntilSuccess(ThrowingRunnable runnable) - { - spinUntilSuccess(runnable, 10); - } - - public static void spinUntilSuccess(ThrowingRunnable runnable, int timeoutInSeconds) - { - Awaitility.await() - .pollInterval(Duration.ofMillis(100)) - .pollDelay(0, TimeUnit.MILLISECONDS) - .atMost(timeoutInSeconds, TimeUnit.SECONDS) - .ignoreExceptions() - .untilAsserted(runnable); - } - public static TopologyManager testTopologyManager(TopologySorter.Supplier sorter, Id node) { return new TopologyManager(sorter, new TestAgent.RethrowAgent(), node, Scheduler.NEVER_RUN_SCHEDULED, new MockCluster.Clock(0), LocalConfig.DEFAULT); diff --git a/accord-core/src/test/java/accord/api/TestableConfigurationService.java b/accord-core/src/main-test/java/accord/api/TestableConfigurationService.java similarity index 100% rename from accord-core/src/test/java/accord/api/TestableConfigurationService.java rename to accord-core/src/main-test/java/accord/api/TestableConfigurationService.java diff --git a/accord-core/src/main-test/java/accord/burn/BurnTestBase.java b/accord-core/src/main-test/java/accord/burn/BurnTestBase.java new file mode 100644 index 00000000..0f5a34c9 --- /dev/null +++ b/accord-core/src/main-test/java/accord/burn/BurnTestBase.java @@ -0,0 +1,680 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.burn; + +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntSupplier; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import java.util.zip.CRC32; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.Journal; +import accord.api.Key; +import accord.burn.random.FrequentLargeRange; +import accord.impl.MessageListener; +import accord.impl.PrefixedIntHashKey; +import accord.impl.TopologyFactory; +import accord.impl.basic.Cluster; +import accord.impl.basic.Cluster.Stats; +import accord.impl.basic.InMemoryJournal; +import accord.impl.basic.Packet; +import accord.impl.basic.PendingQueue; +import accord.impl.basic.PendingRunnable; +import accord.impl.basic.MonitoredPendingQueue; +import accord.impl.basic.RandomDelayQueue; +import accord.impl.basic.RandomDelayQueue.Factory; +import accord.impl.basic.SimulatedDelayedExecutorService; +import accord.impl.list.ListAgent; +import accord.impl.list.ListQuery; +import accord.impl.list.ListRead; +import accord.impl.list.ListRequest; +import accord.impl.list.ListResult; +import accord.impl.list.ListUpdate; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.Node.Id; +import accord.messages.MessageType; +import accord.messages.Reply; +import accord.primitives.Keys; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.topology.Shard; +import accord.topology.Topology; +import accord.utils.DefaultRandom; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.RandomSource; +import accord.utils.Utils; +import accord.utils.async.AsyncExecutor; +import accord.utils.async.TimeoutUtils; +import accord.verify.CompositeVerifier; +import accord.verify.StrictSerializabilityVerifier; +import accord.verify.Verifier; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.IntHashSet; + +import static accord.impl.PrefixedIntHashKey.forHash; +import static accord.impl.PrefixedIntHashKey.range; +import static accord.impl.PrefixedIntHashKey.ranges; +import static accord.primitives.Txn.Kind.EphemeralRead; +import static accord.utils.Invariants.illegalArgument; +import static accord.utils.Utils.toArray; + +public class BurnTestBase +{ + private static final Logger logger = LoggerFactory.getLogger(BurnTestBase.class); + + /** + * Min hash value for the test domain, this value must be respected by the hash function + * @see {@link BurnTestBase#hash(int)} + */ + public static final int HASH_RANGE_START = 0; + /** + * Max hash value for the test domain, this value must be respected by the hash function + * @see {@link BurnTestBase#hash(int)} + */ + public static final int HASH_RANGE_END = 1 << 16; + private static final Range[] EMPTY_RANGES = new Range[0]; + + static List<Packet> generate(RandomSource random, MessageListener listener, Function<? super CommandStore, AsyncExecutor> executor, List<Id> clients, List<Id> nodes, int[] keys, int operations) + { + List<Packet> packets = new ArrayList<>(); + Int2ObjectHashMap<int[]> prefixKeyUpdates = new Int2ObjectHashMap<>(); + double readInCommandStore = random.nextDouble(); + Function<int[], Range> nextRange = randomRanges(random); + + for (int count = 0 ; count < operations ; ++count) + { + int finalCount = count; + Id client = clients.get(random.nextInt(clients.size())); + Id node = nodes.get(random.nextInt(nodes.size())); + + boolean isRangeQuery = random.nextBoolean(); + String description; + Function<Node, Txn> txnGenerator; + if (isRangeQuery) + { + description = "range"; + txnGenerator = n -> { + int[] prefixes = prefixes(n.topology().current()); + + int rangeCount = 1 + random.nextInt(2); + List<Range> requestRanges = new ArrayList<>(); + while (--rangeCount >= 0) + requestRanges.add(nextRange.apply(prefixes)); + Ranges ranges = Ranges.of(requestRanges.toArray(EMPTY_RANGES)); + ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, false, ranges, ranges); + ListQuery query = new ListQuery(client, finalCount, false); + return new Txn.InMemory(ranges, read, query); + }; + } + else + { + description = "key"; + txnGenerator = n -> { + int[] prefixes = prefixes(n.topology().current()); + + boolean isWrite = random.nextBoolean(); + int readCount = 1 + random.nextInt(2); + int writeCount = isWrite ? 1 + random.nextInt(2) : 0; + Txn.Kind kind = isWrite ? Txn.Kind.Write : readCount == 1 ? EphemeralRead : Txn.Kind.Read; + + TreeSet<Key> requestKeys = new TreeSet<>(); + IntHashSet readValues = new IntHashSet(); + while (readCount-- > 0) + requestKeys.add(randomKey(random, prefixes, keys, readValues)); + + ListUpdate update = isWrite ? new ListUpdate(executor) : null; + IntHashSet writeValues = isWrite ? new IntHashSet() : null; + while (writeCount-- > 0) + { + PrefixedIntHashKey.Key key = randomKey(random, prefixes, keys, writeValues); + int i = Arrays.binarySearch(keys, key.key); + int[] keyUpdateCounter = prefixKeyUpdates.computeIfAbsent(key.prefix, ignore -> new int[keys.length]); + update.put(key, ++keyUpdateCounter[i]); + } + + Keys readKeys = new Keys(requestKeys); + if (isWrite) + requestKeys.addAll(update.keySet()); + ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, kind == EphemeralRead, readKeys, new Keys(requestKeys)); + ListQuery query = new ListQuery(client, finalCount, kind == EphemeralRead); + return new Txn.InMemory(kind, new Keys(requestKeys), read, query, update); + }; + } + packets.add(new Packet(client, node, count, new ListRequest(description, txnGenerator, listener))); + } + + return packets; + } + + private static int[] prefixes(Topology topology) + { + IntHashSet uniq = new IntHashSet(); + for (Shard shard : topology.shards()) + uniq.add(((PrefixedIntHashKey) shard.range.start()).prefix); + int[] prefixes = new int[uniq.size()]; + IntHashSet.IntIterator it = uniq.iterator(); + for (int i = 0; it.hasNext(); i++) + prefixes[i] = it.nextValue(); + Arrays.sort(prefixes); + return prefixes; + } + + private static Function<int[], Range> randomRanges(RandomSource rs) + { + int selection = rs.nextInt(0, 2); + switch (selection) + { + case 0: // uniform + return (prefixes) -> randomRange(rs, prefixes, () -> rs.nextInt(0, 1 << 13) + 1); + case 1: // zipf + int domain = HASH_RANGE_END - HASH_RANGE_START + 1; + int splitSize = 100; + int interval = domain / splitSize; + int[] splits = new int[6]; + for (int i = 0; i < splits.length; i++) + splits[i] = i == 0 ? interval : splits[i - 1] * 2; + int[] splitsToPick = splits; + int bias = rs.nextInt(0, 3); // small, large, random + if (bias != 0) + splitsToPick = Arrays.copyOf(splits, splits.length); + if (bias == 1) + Utils.reverse(splitsToPick); + else if (bias == 2) + Utils.shuffle(splitsToPick, rs); + Gen.IntGen zipf = Gens.pickZipf(splitsToPick); + return (prefixes) -> randomRange(rs, prefixes, () -> { + int value = zipf.nextInt(rs); + int idx = Arrays.binarySearch(splits, value); + int min = idx == 0 ? 0 : splits[idx - 1]; + return rs.nextInt(min, value) + 1; + }); + default: + throw new AssertionError("Unexpected value: " + selection); + } + } + + private static Range randomRange(RandomSource random, int[] prefixes, IntSupplier rangeSizeFn) + { + int prefix = random.pickInt(prefixes); + int i = random.nextInt(HASH_RANGE_START, HASH_RANGE_END); + int rangeSize = rangeSizeFn.getAsInt(); + int j = i + rangeSize; + if (j > HASH_RANGE_END) + { + int delta = j - HASH_RANGE_END; + j = HASH_RANGE_END; + i -= delta; + // saftey check, this shouldn't happen unless the configs were changed in an unsafe way + if (i < HASH_RANGE_START) + i = HASH_RANGE_START; + } + return range(forHash(prefix, i), forHash(prefix, j)); + } + + private static PrefixedIntHashKey.Key randomKey(RandomSource random, int[] prefixes, int[] keys, Set<Integer> seen) + { + int prefix = random.pickInt(prefixes); + int key; + do + { + key = random.pickInt(keys); + } + while (!seen.add(key)); + return PrefixedIntHashKey.key(prefix, key, hash(key)); + } + + /** + * This class uses a limited range than the default for the following reasons: + * + * 1) easier to debug smaller numbers + * 2) adds hash collisions (multiple keys map to the same hash) + */ + private static int hash(int key) + { + CRC32 crc32c = new CRC32(); + crc32c.update(key); + crc32c.update(key >> 8); + crc32c.update(key >> 16); + crc32c.update(key >> 24); + return (int) crc32c.getValue() & 0xffff; + } + + @SuppressWarnings("unused") + void reconcile(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws ExecutionException, InterruptedException + { + RandomSource random1 = new DefaultRandom(), random2 = new DefaultRandom(); + + random1.setSeed(seed); + random2.setSeed(seed); + ExecutorService exec = Executors.newFixedThreadPool(2); + RandomDelayQueue.ReconcilingQueueFactory factory = new RandomDelayQueue.ReconcilingQueueFactory(seed); + Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory, clients, nodes, keyCount, operations, concurrency, factory.get(true), InMemoryJournal::new)); + Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory, clients, nodes, keyCount, operations, concurrency, factory.get(false), InMemoryJournal::new)); + exec.shutdown(); + f1.get(); + f2.get(); + } + + void burn(RandomSource random, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency, PendingQueue pendingQueue, Function<Id, Journal> journalFactory) + { + List<Throwable> failures = Collections.synchronizedList(new ArrayList<>()); + AtomicLong progress = new AtomicLong(); + MonitoredPendingQueue queue = new MonitoredPendingQueue(failures, progress, 5L, TimeUnit.MINUTES, pendingQueue); + long startNanos = System.nanoTime(); + long startLogicalMillis = queue.nowInMillis(); + Consumer<Runnable> retryBootstrap; + { + RandomSource retryRandom = random.fork(); + retryBootstrap = retry -> { + long delay = retryRandom.nextInt(1, 15); + queue.add(PendingRunnable.create(retry::run), delay, TimeUnit.SECONDS); + }; + } + IntSupplier coordinationDelays, progressDelays, timeoutDelays; + { + RandomSource rnd = random.fork(); + coordinationDelays = delayGenerator(rnd, 1, 100, 100, 1000); + progressDelays = delayGenerator(rnd, 1, 100, 100, 1000); + timeoutDelays = delayGenerator(rnd, 500, 800, 1000, 10000); + } + Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier = onStale -> new ListAgent(random.fork(), 1000L, failures::add, retryBootstrap, onStale, coordinationDelays, progressDelays, timeoutDelays); + + Supplier<LongSupplier> nowSupplier = () -> { + RandomSource forked = random.fork(); + // TODO (expected): meta-randomise scale of clock drift + return FrequentLargeRange.builder(forked) + .ratio(1, 5) + .small(50, 5000, TimeUnit.MICROSECONDS) + .large(1, 10, TimeUnit.MILLISECONDS) + .build() + .mapAsLong(j -> Math.max(0, queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j))) + .asLongSupplier(forked); + }; + + SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, new ListAgent(random.fork(), 1000L, failures::add, retryBootstrap, (i1, i2) -> { + throw new IllegalAccessError("Global executor should enver get a stale event"); + }, coordinationDelays, progressDelays, timeoutDelays)); + Int2ObjectHashMap<Verifier> validators = new Int2ObjectHashMap<>(); + Function<CommandStore, AsyncExecutor> executor = ignore -> globalExecutor; + + MessageListener listener = MessageListener.get(); + + int[] keys = IntStream.range(0, keyCount).toArray(); + Packet[] requests = toArray(generate(random, listener, executor, clients, nodes, keys, operations), Packet[]::new); + int[] starts = new int[requests.length]; + Packet[] replies = new Packet[requests.length]; + + AtomicInteger acks = new AtomicInteger(); + AtomicInteger nacks = new AtomicInteger(); + AtomicInteger lost = new AtomicInteger(); + AtomicInteger truncated = new AtomicInteger(); + AtomicInteger recovered = new AtomicInteger(); + AtomicInteger failedToCheck = new AtomicInteger(); + AtomicInteger clock = new AtomicInteger(); + AtomicInteger requestIndex = new AtomicInteger(); + Queue<Packet> initialRequests = new ArrayDeque<>(); + for (int max = Math.min(concurrency, requests.length) ; requestIndex.get() < max ; ) + { + int i = requestIndex.getAndIncrement(); + starts[i] = clock.incrementAndGet(); + initialRequests.add(requests[i]); + } + + // not used for atomicity, just for encapsulation + AtomicReference<Runnable> onSubmitted = new AtomicReference<>(); + Consumer<Packet> responseSink = packet -> { + if (replies[(int)packet.replyId] != null) + return; + + if (requestIndex.get() < requests.length) + { + int i = requestIndex.getAndIncrement(); + starts[i] = clock.incrementAndGet(); + queue.addNoDelay(requests[i]); + if (i == requests.length - 1) + onSubmitted.get().run(); + } + if (packet.message instanceof Reply.FailureReply) + { + failures.add(new AssertionError("Unexpected failure in list reply", ((Reply.FailureReply) packet.message).failure)); + return; + } + ListResult reply = (ListResult) packet.message; + + try + { + if (!reply.isSuccess() && reply.status() == ListResult.Status.HeartBeat) + return; // interrupted; will fetch our actual reply once rest of simulation is finished (but wanted to send another request to keep correct number in flight) + + int start = starts[(int)packet.replyId]; + int end = clock.incrementAndGet(); + logger.debug("{} at [{}, {}]", reply, start, end); + replies[(int)packet.replyId] = packet; + + if (!reply.isSuccess()) + { + switch (reply.status()) + { + case Lost: lost.incrementAndGet(); break; + case Invalidated: nacks.incrementAndGet(); break; + case Failure: failedToCheck.incrementAndGet(); break; + case Truncated: truncated.incrementAndGet(); break; + // txn was applied?, but client saw a timeout, so response isn't known + case Other: break; + default: throw new AssertionError("Unexpected fault: " + reply.status()); + } + return; + } + + progress.incrementAndGet(); + switch (reply.status()) + { + default: throw new AssertionError("Unhandled status: " + reply.status()); + case Applied: acks.incrementAndGet(); break; + case RecoveryApplied: recovered.incrementAndGet(); // NOTE: technically this might have been applied by the coordinator and it simply timed out + } + // TODO (correctness): when a keyspace is removed, the history/validator isn't cleaned up... + // the current logic for add keyspace only knows what is there, so a ABA problem exists where keyspaces + // may come back... logically this is a problem as the history doesn't get reset, but practically that + // is fine as the backing map and the validator are consistent + Int2ObjectHashMap<Verifier.Checker> seen = new Int2ObjectHashMap<>(); + for (int i = 0 ; i < reply.read.length ; ++i) + { + Key key = reply.responseKeys.get(i); + int prefix = prefix(key); + int keyValue = key(key); + int k = Arrays.binarySearch(keys, keyValue); + Verifier verifier = validators.computeIfAbsent(prefix, ignore -> createVerifier(Integer.toString(prefix), keyCount)); + Verifier.Checker check = seen.computeIfAbsent(prefix, ignore -> verifier.witness(start, end)); + + int[] read = reply.read[i]; + int write = reply.update == null ? -1 : reply.update.getOrDefault(key, -1); + + if (read != null) + check.read(k, read); + if (write >= 0) + check.write(k, write); + } + for (Verifier.Checker check : seen.values()) + { + check.close(); + } + } + catch (Throwable t) + { + failures.add(t); + } + }; + + Map<MessageType, Stats> messageStatsMap; + try + { + messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), listener, () -> queue, + (id, onStale) -> globalExecutor.withAgent(agentSupplier.apply(onStale)), + queue::checkFailures, + responseSink, random::fork, nowSupplier, + topologyFactory, initialRequests::poll, + onSubmitted::set, + ignore -> {}, + journalFactory); + for (Verifier verifier : validators.values()) + verifier.close(); + } + catch (Throwable t) + { + for (int i = 0 ; i < requests.length ; ++i) + { + logger.info("{}", requests[i]); + logger.info("\t\t" + replies[i]); + } + throw t; + } + + int observedOperations = acks.get() + recovered.get() + nacks.get() + lost.get() + truncated.get(); + logger.info("nodes: {}, rf: {}. Received {} acks, {} recovered, {} nacks, {} lost, {} truncated ({} total) to {} operations", nodes.size(), topologyFactory.rf, acks.get(), recovered.get(), nacks.get(), lost.get(), truncated.get(), observedOperations, operations); + logger.info("Message counts: {}", statsInDescOrder(messageStatsMap)); + logger.info("Took {} and in logical time of {}", Duration.ofNanos(System.nanoTime() - startNanos), Duration.ofMillis(queue.nowInMillis() - startLogicalMillis)); + if (clock.get() != operations * 2 || observedOperations != operations) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0 ; i < requests.length ; ++i) + { + // since this only happens when operations are lost, only log the ones without a reply to lower the amount of noise + if (replies[i] == null) + { + sb.setLength(0); + sb.append(requests[i]).append("\n\t\t").append(replies[i]); + logger.info(sb.toString()); + } + } + if (clock.get() != operations * 2) throw new AssertionError("Incomplete set of responses; clock=" + clock.get() + ", expected operations=" + (operations * 2)); + else throw new AssertionError("Incomplete set of responses; ack+recovered+other+nacks+lost+truncated=" + observedOperations + ", expected operations=" + (operations * 2)); + } + } + + private static IntSupplier delayGenerator(RandomSource rnd, int absoluteMin, int absoluteMaxMin, int absoluteMinMax, int asoluteMax) + { + int minDelay = rnd.nextInt(absoluteMin, absoluteMaxMin); + int maxDelay = rnd.nextInt(Math.max(absoluteMinMax, minDelay), asoluteMax); + if (rnd.nextBoolean()) + { + int medianDelay = rnd.nextInt(minDelay, maxDelay); + return () -> rnd.nextBiasedInt(minDelay, medianDelay, maxDelay); + } + return () -> rnd.nextInt(minDelay, maxDelay); + } + + private static String statsInDescOrder(Map<MessageType, Stats> statsMap) + { + List<Stats> stats = new ArrayList<>(statsMap.values()); + stats.sort(Comparator.comparingInt(s -> -s.count())); + return stats.toString(); + } + + protected Verifier createVerifier(String prefix, int keyCount) + { + return new StrictSerializabilityVerifier(prefix, keyCount); + } + + public static void main(String[] args) + { + int count = 1; + int operations = 1000; + Long overrideSeed = null; + boolean reconcile = false; + LongSupplier seedGenerator = ThreadLocalRandom.current()::nextLong; + boolean hasOverriddenSeed = false; + for (int i = 0 ; i < args.length ; i += 2) + { + switch (args[i]) + { + default: throw illegalArgument("Invalid option: " + args[i]); + case "-c": + count = Integer.parseInt(args[i + 1]); + if (hasOverriddenSeed) + throw illegalArgument("Cannot override both seed (-s) and number of seeds to run (-c)"); + overrideSeed = null; + break; + case "-s": + overrideSeed = Long.parseLong(args[i + 1]); + hasOverriddenSeed = true; + count = 1; + break; + case "-o": + operations = Integer.parseInt(args[i + 1]); + break; + case "-r": + reconcile = true; + --i; + break; + case "--loop-seed": + seedGenerator = new DefaultRandom(Long.parseLong(args[i + 1]))::nextLong; + } + } + while (count-- > 0) + { + if (!reconcile) + { + new BurnTestBase().run(overrideSeed != null ? overrideSeed : seedGenerator.getAsLong(), operations); + } + else + { + new BurnTestBase().reconcile(overrideSeed != null ? overrideSeed : seedGenerator.getAsLong(), operations); + } + } + } + + void run(long seed) + { + Duration timeout = Duration.ofMinutes(3); + try + { + TimeoutUtils.runBlocking(timeout, "BurnTest with timeout", () -> run(seed, 1000)); + } + catch (Throwable thrown) + { + Throwable cause = thrown; + if (cause instanceof ExecutionException) + cause = cause.getCause(); + if (cause instanceof TimeoutException) + { + TimeoutException override = new TimeoutException("test did not complete within " + timeout); + override.setStackTrace(new StackTraceElement[0]); + cause = override; + } + logger.error("Exception running burn test for seed {}:", seed, cause); + throw SimulationException.wrap(seed, cause); + } + } + + void run(long seed, int operations) + { + logger.info("Seed: {}", seed); + Cluster.trace.trace("Seed: {}", seed); + RandomSource random = new DefaultRandom(seed); + try + { + List<Id> clients = generateIds(true, 1 + random.nextInt(4)); + int rf; + float chance = random.nextFloat(); + if (chance < 0.2f) { rf = random.nextInt(2, 9); } + else if (chance < 0.4f) { rf = 3; } + else if (chance < 0.7f) { rf = 5; } + else if (chance < 0.8f) { rf = 7; } + else { rf = 9; } + + List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3)); + + burn(random, new TopologyFactory(rf, ranges(0, HASH_RANGE_START, HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() * 3))), + clients, + nodes, + 5 + random.nextInt(15), + operations, + 10 + random.nextInt(30), + new Factory(random).get(), + InMemoryJournal::new); + } + catch (Throwable t) + { + logger.error("Exception running burn test for seed {}:", seed, t); + throw SimulationException.wrap(seed, t); + } + } + + void reconcile(long seed, int operations) + { + logger.info("Seed: {}", seed); + Cluster.trace.trace("Seed: {}", seed); + RandomSource random = new DefaultRandom(seed); + try + { + List<Id> clients = generateIds(true, 1 + random.nextInt(4)); + int rf; + float chance = random.nextFloat(); + if (chance < 0.2f) { rf = random.nextInt(2, 9); } + else if (chance < 0.4f) { rf = 3; } + else if (chance < 0.7f) { rf = 5; } + else if (chance < 0.8f) { rf = 7; } + else { rf = 9; } + + List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3)); + + reconcile(seed, new TopologyFactory(rf, ranges(0, HASH_RANGE_START, HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() * 3))), + clients, + nodes, + 5 + random.nextInt(15), + operations, + 10 + random.nextInt(30)); + } + catch (Throwable t) + { + logger.error("Exception running burn test for seed {}:", seed, t); + throw SimulationException.wrap(seed, t); + } + } + + private static List<Id> generateIds(boolean clients, int count) + { + List<Id> ids = new ArrayList<>(); + for (int i = 1; i <= count ; ++i) + ids.add(new Id(clients ? -i : i)); + return ids; + } + + private static int key(Key key) + { + return ((PrefixedIntHashKey) key).key; + } + + private static int prefix(Key key) + { + return ((PrefixedIntHashKey) key).prefix; + } +} diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/main-test/java/accord/burn/BurnTestConfigurationService.java similarity index 100% rename from accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java rename to accord-core/src/main-test/java/accord/burn/BurnTestConfigurationService.java diff --git a/accord-core/src/test/java/accord/burn/SimulationException.java b/accord-core/src/main-test/java/accord/burn/SimulationException.java similarity index 100% rename from accord-core/src/test/java/accord/burn/SimulationException.java rename to accord-core/src/main-test/java/accord/burn/SimulationException.java diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java b/accord-core/src/main-test/java/accord/burn/TopologyUpdates.java similarity index 100% rename from accord-core/src/test/java/accord/burn/TopologyUpdates.java rename to accord-core/src/main-test/java/accord/burn/TopologyUpdates.java diff --git a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java b/accord-core/src/main-test/java/accord/burn/random/FrequentLargeRange.java similarity index 100% rename from accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java rename to accord-core/src/main-test/java/accord/burn/random/FrequentLargeRange.java diff --git a/accord-core/src/test/java/accord/burn/random/IntRange.java b/accord-core/src/main-test/java/accord/burn/random/IntRange.java similarity index 100% rename from accord-core/src/test/java/accord/burn/random/IntRange.java rename to accord-core/src/main-test/java/accord/burn/random/IntRange.java diff --git a/accord-core/src/test/java/accord/burn/random/RandomWalkRange.java b/accord-core/src/main-test/java/accord/burn/random/RandomWalkRange.java similarity index 100% rename from accord-core/src/test/java/accord/burn/random/RandomWalkRange.java rename to accord-core/src/main-test/java/accord/burn/random/RandomWalkRange.java diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/main-test/java/accord/impl/IntHashKey.java similarity index 99% rename from accord-core/src/test/java/accord/impl/IntHashKey.java rename to accord-core/src/main-test/java/accord/impl/IntHashKey.java index 0585204e..1712b960 100644 --- a/accord-core/src/test/java/accord/impl/IntHashKey.java +++ b/accord-core/src/main-test/java/accord/impl/IntHashKey.java @@ -111,7 +111,7 @@ public abstract class IntHashKey implements RoutableKey public static final class Key extends IntHashKey implements accord.api.Key { - private Key(int key) + public Key(int key) { super(key); } diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/main-test/java/accord/impl/IntKey.java similarity index 100% rename from accord-core/src/test/java/accord/impl/IntKey.java rename to accord-core/src/main-test/java/accord/impl/IntKey.java diff --git a/accord-core/src/test/java/accord/impl/MessageListener.java b/accord-core/src/main-test/java/accord/impl/MessageListener.java similarity index 100% rename from accord-core/src/test/java/accord/impl/MessageListener.java rename to accord-core/src/main-test/java/accord/impl/MessageListener.java diff --git a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java b/accord-core/src/main-test/java/accord/impl/PrefixedIntHashKey.java similarity index 99% rename from accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java rename to accord-core/src/main-test/java/accord/impl/PrefixedIntHashKey.java index a6a8bc84..575ed6d1 100644 --- a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java +++ b/accord-core/src/main-test/java/accord/impl/PrefixedIntHashKey.java @@ -157,7 +157,7 @@ public class PrefixedIntHashKey implements RoutableKey public static final class Hash extends PrefixedIntRoutingKey { - private Hash(int prefix, int hash) + public Hash(int prefix, int hash) { super(prefix, hash); } diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java b/accord-core/src/main-test/java/accord/impl/TestAgent.java similarity index 100% rename from accord-core/src/test/java/accord/impl/TestAgent.java rename to accord-core/src/main-test/java/accord/impl/TestAgent.java diff --git a/accord-core/src/test/java/accord/impl/TopologyFactory.java b/accord-core/src/main-test/java/accord/impl/TopologyFactory.java similarity index 100% rename from accord-core/src/test/java/accord/impl/TopologyFactory.java rename to accord-core/src/main-test/java/accord/impl/TopologyFactory.java diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/main-test/java/accord/impl/basic/Cluster.java similarity index 92% rename from accord-core/src/test/java/accord/impl/basic/Cluster.java rename to accord-core/src/main-test/java/accord/impl/basic/Cluster.java index 3dd73e6f..cf0604ad 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/main-test/java/accord/impl/basic/Cluster.java @@ -46,11 +46,11 @@ import java.util.function.Predicate; import java.util.function.Supplier; import javax.annotation.Nullable; -import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.BarrierType; +import accord.api.Journal; import accord.api.LocalConfig; import accord.api.MessageSink; import accord.api.RoutingKey; @@ -66,32 +66,28 @@ import accord.coordinate.Invalidated; import accord.coordinate.Preempted; import accord.coordinate.Timeout; import accord.coordinate.Truncated; -import accord.impl.DurabilityScheduling; import accord.impl.DefaultLocalListeners; import accord.impl.DefaultRemoteListeners; import accord.impl.DefaultTimeouts; +import accord.impl.DurabilityScheduling; import accord.impl.InMemoryCommandStore.GlobalCommand; import accord.impl.MessageListener; import accord.impl.PrefixedIntHashKey; import accord.impl.SizeOfIntersectionSorter; import accord.impl.TopologyFactory; -import accord.impl.basic.DelayedCommandStores.DelayedCommandStore; import accord.impl.list.ListAgent; import accord.impl.list.ListStore; import accord.impl.progresslog.DefaultProgressLogs; import accord.local.AgentExecutor; import accord.local.Command; import accord.local.CommandStore; +import accord.local.CommandStores; import accord.local.DurableBefore; import accord.local.Node; import accord.local.Node.Id; import accord.local.RedundantBefore; -import accord.local.TimeService; -import accord.primitives.RoutableKey; -import accord.primitives.SaveStatus; import accord.local.ShardDistributor; -import accord.primitives.Seekables; -import accord.primitives.Status; +import accord.local.TimeService; import accord.local.cfk.CommandsForKey; import accord.messages.Message; import accord.messages.MessageType; @@ -102,11 +98,16 @@ import accord.primitives.FullRoute; import accord.primitives.Keys; import accord.primitives.Range; import accord.primitives.Ranges; +import accord.primitives.RoutableKey; +import accord.primitives.SaveStatus; +import accord.primitives.Seekables; +import accord.primitives.Status; import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; import accord.topology.Topology; import accord.topology.TopologyRandomizer; +import accord.utils.AccordGens; import accord.utils.Gens; import accord.utils.Invariants; import accord.utils.RandomSource; @@ -117,11 +118,7 @@ import org.agrona.collections.Int2ObjectHashMap; import static accord.impl.basic.Cluster.OverrideLinksKind.NONE; import static accord.impl.basic.Cluster.OverrideLinksKind.RANDOM_BIDIRECTIONAL; -import static accord.impl.basic.NodeSink.Action.DELIVER; -import static accord.impl.basic.NodeSink.Action.DROP; -import static accord.utils.AccordGens.keysInsideRanges; -import static accord.utils.AccordGens.rangeInsideRange; -import static accord.utils.Gens.mixedDistribution; +import static accord.impl.basic.DelayedCommandStores.*; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -418,37 +415,37 @@ public class Cluster }; AtomicInteger counter = new AtomicInteger(); AtomicReference<Map<Id, Node>> nodeMap = new AtomicReference<>(); - Map<MessageType, Cluster.Stats> stats = Cluster.run(nodes.toArray(Node.Id[]::new), - MessageListener.get(), - () -> queue, - (id, onStale) -> globalExecutor.withAgent(agentSupplier.apply(onStale)), - queue::checkFailures, - ignore -> { - }, - randomSupplier, - nowSupplier, - topologyFactory, - new Supplier<>() + Map<MessageType, Cluster.Stats> stats = run(nodes.toArray(Node.Id[]::new), + MessageListener.get(), + () -> queue, + (id, onStale) -> globalExecutor.withAgent(agentSupplier.apply(onStale)), + queue::checkFailures, + ignore -> { + }, + randomSupplier, + nowSupplier, + topologyFactory, + new Supplier<>() + { + private Iterator<Request> requestIterator = null; + private final RandomSource rs = randomSupplier.get(); + @Override + public Packet get() + { + if (requestIterator == null) { - private Iterator<Request> requestIterator = null; - private final RandomSource rs = randomSupplier.get(); - @Override - public Packet get() - { - // ((Cluster) node.scheduler()).onDone(() -> checkOnResult(homeKey, txnId, 0, null)); - if (requestIterator == null) - { - Map<Node.Id, Node> nodes = nodeMap.get(); - requestIterator = Collections.singleton(init.apply(nodes)).iterator(); - } - if (!requestIterator.hasNext()) - return null; - Node.Id id = rs.pick(nodes); - return new Packet(id, id, counter.incrementAndGet(), requestIterator.next()); - } - }, - Runnable::run, - nodeMap::set); + Map<Node.Id, Node> nodes = nodeMap.get(); + requestIterator = Collections.singleton(init.apply(nodes)).iterator(); + } + if (!requestIterator.hasNext()) + return null; + Node.Id id = rs.pick(nodes); + return new Packet(id, id, counter.incrementAndGet(), requestIterator.next()); + } + }, + Runnable::run, + nodeMap::set, + InMemoryJournal::new); if (!failures.isEmpty()) { AssertionError error = new AssertionError("Unexpected errors detected"); @@ -463,7 +460,7 @@ public class Cluster Runnable checkFailures, Consumer<Packet> responseSink, Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplierSupplier, TopologyFactory topologyFactory, Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal, - Consumer<Map<Id, Node>> readySignal) + Consumer<Map<Id, Node>> readySignal, Function<Node.Id, Journal> journalSupplier) { Topology topology = topologyFactory.toTopology(nodes); Map<Id, Node> nodeMap = new LinkedHashMap<>(); @@ -490,11 +487,10 @@ public class Cluster ClusterScheduler scheduler = sinks.new ClusterScheduler(id.id); MessageSink messageSink = sinks.create(id, randomSupplier.get()); LongSupplier nowSupplier = nowSupplierSupplier.get(); - LocalConfig localConfig = LocalConfig.DEFAULT; BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges) -> configRandomizer.onStale(id, sinceAtLeast, ranges); AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id, onStale); executorMap.put(id, nodeExecutor); - Journal journal = new Journal(id); + Journal journal = journalSupplier.apply(id); journalMap.put(id, journal); BurnTestConfigurationService configService = new BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, nodeMap::get, topologyUpdates); BooleanSupplier isLoadedCheck = Gens.supplier(Gens.bools().mixedDistribution().next(random), random); @@ -503,7 +499,7 @@ public class Cluster nodeExecutor.agent(), randomSupplier.get(), scheduler, SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, DefaultTimeouts::new, DefaultProgressLogs::new, DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, isLoadedCheck, journal), new CoordinationAdapter.DefaultFactory(), - DurableBefore.NOOP_PERSISTER, localConfig); + DurableBefore.NOOP_PERSISTER, LocalConfig.DEFAULT); DurabilityScheduling durability = node.durabilityScheduling(); // TODO (desired): randomise durability.setShardCycleTime(30, SECONDS); @@ -535,7 +531,7 @@ public class Cluster AsyncResult<?> startup = AsyncChains.reduce(nodeMap.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult(); while (sinks.processPending()); - Assertions.assertTrue(startup.isDone()); + Invariants.checkArgument(startup.isDone()); ClusterScheduler clusterScheduler = sinks.new ClusterScheduler(-1); List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes)); @@ -556,22 +552,23 @@ public class Cluster Predicate<Pending> pred = getPendingPredicate(id.id, stores); while (sinks.drain(pred)); - // Journal cleanup is a rough equivalent of a node restart. trace.debug("Triggering journal cleanup for node " + id); CommandsForKey.disableLinearizabilityViolationsReporting(); ListStore listStore = (ListStore) nodeMap.get(id).commandStores().dataStore(); + + // Data- store and CommandStore cleanup, followed by journal replay is a rough equivalent of a node restart. NavigableMap<RoutableKey, Timestamped<int[]>> prevData = listStore.copyOfCurrentData(); listStore.clear(); listStore.restoreFromSnapshot(); - - Journal journal = journalMap.get(id); - Int2ObjectHashMap<NavigableMap<TxnId, Command>> beforeStores = copyCommands(stores); for (CommandStore s : stores) { DelayedCommandStores.DelayedCommandStore store = (DelayedCommandStores.DelayedCommandStore) s; store.clearForTesting(); - journal.reconstructAll(store.loader(), store.id()); } + + Journal journal = journalMap.get(id); + Int2ObjectHashMap<NavigableMap<TxnId, Command>> beforeStores = copyCommands(stores); + journal.replay(nodeMap.get(id).commandStores()); while (sinks.drain(pred)); CommandsForKey.enableLinearizabilityViolationsReporting(); Invariants.checkState(listStore.equals(prevData)); @@ -651,14 +648,16 @@ public class Cluster Node node = nodeMap.get(id); Journal journal = journalMap.get(node.id()); - CommandStore[] stores = nodeMap.get(node.id()).commandStores().all(); + CommandStores stores = nodeMap.get(node.id()).commandStores(); // run on node scheduler so doesn't run during replay scheduled = node.scheduler().selfRecurring(() -> { - journal.purge(j -> j < stores.length ? stores[j] : null); + trace.debug("Starting purge."); + journal.purge(stores); + trace.debug("Finished purge."); schedule(clusterScheduler, rs, nodes, nodeMap, journalMap); + }, 0, SECONDS); } - } private static Int2ObjectHashMap<NavigableMap<TxnId, Command>> copyCommands(CommandStore[] stores) @@ -666,7 +665,7 @@ public class Cluster Int2ObjectHashMap<NavigableMap<TxnId, Command>> result = new Int2ObjectHashMap<>(); for (CommandStore s : stores) { - DelayedCommandStores.DelayedCommandStore store = (DelayedCommandStores.DelayedCommandStore) s; + DelayedCommandStore store = (DelayedCommandStore) s; NavigableMap<TxnId, Command> commands = new TreeMap<>(); result.put(store.id(), commands); for (Map.Entry<TxnId, GlobalCommand> e : store.unsafeCommands().entrySet()) @@ -679,7 +678,7 @@ public class Cluster { for (CommandStore s : stores) { - DelayedCommandStores.DelayedCommandStore store = (DelayedCommandStores.DelayedCommandStore) s; + DelayedCommandStore store = (DelayedCommandStore) s; NavigableMap<TxnId, Command> before = beforeStores.get(store.id()); for (Map.Entry<TxnId, GlobalCommand> e : store.unsafeCommands().entrySet()) { @@ -687,7 +686,8 @@ public class Cluster Command afterCommand = e.getValue().value(); if (beforeCommand == null) { - Invariants.checkArgument(afterCommand.is(Status.NotDefined)); + Invariants.checkArgument(afterCommand.is(Status.NotDefined), + "After command should have been not defined %s", afterCommand); continue; } if (afterCommand.hasBeen(Status.Truncated)) @@ -828,7 +828,7 @@ public class Cluster private BarrierService(Node node, RandomSource rs) { super(node, rs); - this.typeSupplier = mixedDistribution(BarrierType.values()).next(rs).asSupplier(rs); + this.typeSupplier = Gens.mixedDistribution(BarrierType.values()).next(rs).asSupplier(rs); this.includeRangeSupplier = Gens.bools().mixedDistribution().next(rs).asSupplier(rs); this.wholeOrPartialSupplier = Gens.bools().mixedDistribution().next(rs).asSupplier(rs); } @@ -843,7 +843,7 @@ public class Cluster BarrierType type = typeSupplier.get(); if (type == BarrierType.local) { - Keys keys = Keys.of(keysInsideRanges(ranges).next(rs)); + Keys keys = Keys.of(AccordGens.keysInsideRanges(ranges).next(rs)); run(node, keys, node.computeRoute(current.epoch(), keys), current.epoch(), type); } else @@ -852,7 +852,7 @@ public class Cluster for (Range range : ranges) { if (includeRangeSupplier.get()) - subset.add(wholeOrPartialSupplier.get() ? range : rangeInsideRange(range).next(rs)); + subset.add(wholeOrPartialSupplier.get() ? range : AccordGens.rangeInsideRange(range).next(rs)); } if (subset.isEmpty()) return; @@ -880,7 +880,7 @@ public class Cluster Collections.shuffle(nodes, random.asJdkRandom()); int partitionSize = random.nextInt((rf+1)/2); Set<Id> partition = new LinkedHashSet<>(nodes.subList(0, partitionSize)); - BiFunction<Id, Id, Link> down = (from, to) -> new Link(() -> DROP, up.apply(from, to).latencyMicros); + BiFunction<Id, Id, Link> down = (from, to) -> new Link(() -> NodeSink.Action.DROP, up.apply(from, to).latencyMicros); return (from, to) -> (partition.contains(from) == partition.contains(to) ? up : down).apply(from, to); } @@ -931,12 +931,12 @@ public class Cluster private static Link healthy(LongSupplier latency) { - return new Link(() -> DELIVER, latency); + return new Link(() -> NodeSink.Action.DELIVER, latency); } private static Link down(LongSupplier latency) { - return new Link(() -> DROP, latency); + return new Link(() -> NodeSink.Action.DROP, latency); } private LongSupplier defaultRandomWalkLatencyMicros(RandomSource random) diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/main-test/java/accord/impl/basic/DelayedCommandStores.java similarity index 93% rename from accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java rename to accord-core/src/main-test/java/accord/impl/basic/DelayedCommandStores.java index 04ac085d..ede62de4 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/main-test/java/accord/impl/basic/DelayedCommandStores.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Queue; import java.util.concurrent.Callable; import java.util.function.BiConsumer; @@ -34,6 +33,7 @@ import com.google.common.collect.Iterables; import accord.api.Agent; import accord.api.DataStore; +import accord.api.Journal; import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.impl.InMemoryCommandStore; @@ -42,13 +42,14 @@ import accord.impl.InMemorySafeCommand; import accord.impl.InMemorySafeCommandsForKey; import accord.impl.InMemorySafeTimestampsForKey; import accord.impl.PrefixedIntHashKey; -import accord.impl.basic.TaskExecutorService.Task; import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores; +import accord.local.DurableBefore; import accord.local.Node; import accord.local.NodeCommandStoreService; import accord.local.PreLoadContext; +import accord.local.RedundantBefore; import accord.local.SafeCommandStore; import accord.local.ShardDistributor; import accord.primitives.Range; @@ -105,7 +106,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread public static class DelayedCommandStore extends InMemoryCommandStore { - public class DelayedTask<T> extends Task<T> + public class DelayedTask<T> extends TaskExecutorService.Task<T> { private DelayedTask(Callable<T> fn) { @@ -130,7 +131,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread } private final SimulatedDelayedExecutorService executor; - private final Queue<Task<?>> pending = new LinkedList<>(); + private final Queue<TaskExecutorService.Task<?>> pending = new LinkedList<>(); private final BooleanSupplier isLoadedCheck; private final Journal journal; @@ -153,7 +154,8 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread return; // Journal will not have result persisted. This part is here for test purposes and ensuring that we have strict object equality. - Command reconstructed = journal.reconstruct(id, current.txnId()); + // TODO: redundant/durable before + Command reconstructed = journal.loadCommand(id, current.txnId(), RedundantBefore.EMPTY, DurableBefore.EMPTY); List<Difference<?>> diff = ReflectionUtils.recursiveEquals(current, reconstructed); Invariants.checkState(diff.isEmpty(), "Commands did not match: expected %s, given %s, node %s, store %d, diff %s", current, reconstructed, node, id(), new LazyToString(() -> String.join("\n", Iterables.transform(diff, Object::toString)))); } @@ -190,7 +192,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread @Override public <T> AsyncChain<T> submit(Callable<T> fn) { - Task<T> task = new DelayedTask<>(fn); + TaskExecutorService.Task<T> task = new DelayedTask<>(fn); if (Invariants.testParanoia(LINEAR, LINEAR, HIGH)) { return AsyncChains.detectLeak(agent::onUncaughtException, () -> { @@ -223,7 +225,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread private void runNextTask() { - Task<?> next = pending.peek(); + TaskExecutorService.Task<?> next = pending.peek(); if (next == null) return; @@ -275,7 +277,8 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread Command before = safe.original(); Command after = safe.current(); - commandStore.journal.onExecute(commandStore.id(), before, after, Objects.equals(context.primaryTxnId(), after.txnId())); + // TODO: do we want to use AccordCommandStore, since it handles caches? + commandStore.journal.saveCommand(commandStore.id(), new Journal.CommandUpdate(before, after), () -> {}); commandStore.validateRead(safe.current()); }); super.postExecute(); diff --git a/accord-core/src/test/java/accord/impl/basic/MonitoredPendingQueue.java b/accord-core/src/main-test/java/accord/impl/basic/MonitoredPendingQueue.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/MonitoredPendingQueue.java rename to accord-core/src/main-test/java/accord/impl/basic/MonitoredPendingQueue.java diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/main-test/java/accord/impl/basic/NodeSink.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/NodeSink.java rename to accord-core/src/main-test/java/accord/impl/basic/NodeSink.java diff --git a/accord-core/src/test/java/accord/impl/basic/Packet.java b/accord-core/src/main-test/java/accord/impl/basic/Packet.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/Packet.java rename to accord-core/src/main-test/java/accord/impl/basic/Packet.java diff --git a/accord-core/src/test/java/accord/impl/basic/Pending.java b/accord-core/src/main-test/java/accord/impl/basic/Pending.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/Pending.java rename to accord-core/src/main-test/java/accord/impl/basic/Pending.java diff --git a/accord-core/src/test/java/accord/impl/basic/PendingQueue.java b/accord-core/src/main-test/java/accord/impl/basic/PendingQueue.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/PendingQueue.java rename to accord-core/src/main-test/java/accord/impl/basic/PendingQueue.java diff --git a/accord-core/src/test/java/accord/impl/basic/PendingRunnable.java b/accord-core/src/main-test/java/accord/impl/basic/PendingRunnable.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/PendingRunnable.java rename to accord-core/src/main-test/java/accord/impl/basic/PendingRunnable.java diff --git a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java b/accord-core/src/main-test/java/accord/impl/basic/RandomDelayQueue.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java rename to accord-core/src/main-test/java/accord/impl/basic/RandomDelayQueue.java diff --git a/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java b/accord-core/src/main-test/java/accord/impl/basic/RecurringPendingRunnable.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java rename to accord-core/src/main-test/java/accord/impl/basic/RecurringPendingRunnable.java diff --git a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java b/accord-core/src/main-test/java/accord/impl/basic/SimulatedDelayedExecutorService.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java rename to accord-core/src/main-test/java/accord/impl/basic/SimulatedDelayedExecutorService.java diff --git a/accord-core/src/test/java/accord/impl/basic/SimulatedFault.java b/accord-core/src/main-test/java/accord/impl/basic/SimulatedFault.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/SimulatedFault.java rename to accord-core/src/main-test/java/accord/impl/basic/SimulatedFault.java diff --git a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java b/accord-core/src/main-test/java/accord/impl/basic/TaskExecutorService.java similarity index 100% rename from accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java rename to accord-core/src/main-test/java/accord/impl/basic/TaskExecutorService.java diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/main-test/java/accord/impl/list/ListAgent.java similarity index 100% rename from accord-core/src/test/java/accord/impl/list/ListAgent.java rename to accord-core/src/main-test/java/accord/impl/list/ListAgent.java diff --git a/accord-core/src/test/java/accord/impl/list/ListData.java b/accord-core/src/main-test/java/accord/impl/list/ListData.java similarity index 100% rename from accord-core/src/test/java/accord/impl/list/ListData.java rename to accord-core/src/main-test/java/accord/impl/list/ListData.java diff --git a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java b/accord-core/src/main-test/java/accord/impl/list/ListFetchCoordinator.java similarity index 100% rename from accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java rename to accord-core/src/main-test/java/accord/impl/list/ListFetchCoordinator.java diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java b/accord-core/src/main-test/java/accord/impl/list/ListQuery.java similarity index 96% rename from accord-core/src/test/java/accord/impl/list/ListQuery.java rename to accord-core/src/main-test/java/accord/impl/list/ListQuery.java index de886b99..f10ac792 100644 --- a/accord-core/src/test/java/accord/impl/list/ListQuery.java +++ b/accord-core/src/main-test/java/accord/impl/list/ListQuery.java @@ -38,9 +38,9 @@ import javax.annotation.Nonnull; public class ListQuery implements Query { - final Id client; - final long requestId; - final boolean isEphemeralRead; + public final Id client; + public final long requestId; + public final boolean isEphemeralRead; public ListQuery(Id client, long requestId, boolean isEphemeralRead) { diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/main-test/java/accord/impl/list/ListRead.java similarity index 99% rename from accord-core/src/test/java/accord/impl/list/ListRead.java rename to accord-core/src/main-test/java/accord/impl/list/ListRead.java index 915442ad..b1ea3c7c 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRead.java +++ b/accord-core/src/main-test/java/accord/impl/list/ListRead.java @@ -46,7 +46,7 @@ public class ListRead implements Read private static final Logger logger = LoggerFactory.getLogger(ListRead.class); private final Function<? super CommandStore, AsyncExecutor> executor; - private final boolean isEphemeralRead; + public final boolean isEphemeralRead; public final Seekables<?, ?> userReadKeys; // those only to be returned to user public final Seekables<?, ?> keys; // those including necessary for writes diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/main-test/java/accord/impl/list/ListRequest.java similarity index 100% rename from accord-core/src/test/java/accord/impl/list/ListRequest.java rename to accord-core/src/main-test/java/accord/impl/list/ListRequest.java diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java b/accord-core/src/main-test/java/accord/impl/list/ListResult.java similarity index 100% rename from accord-core/src/test/java/accord/impl/list/ListResult.java rename to accord-core/src/main-test/java/accord/impl/list/ListResult.java diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/main-test/java/accord/impl/list/ListStore.java similarity index 100% rename from accord-core/src/test/java/accord/impl/list/ListStore.java rename to accord-core/src/main-test/java/accord/impl/list/ListStore.java diff --git a/accord-core/src/test/java/accord/impl/list/ListUpdate.java b/accord-core/src/main-test/java/accord/impl/list/ListUpdate.java similarity index 98% rename from accord-core/src/test/java/accord/impl/list/ListUpdate.java rename to accord-core/src/main-test/java/accord/impl/list/ListUpdate.java index a3f8e17b..00de6f14 100644 --- a/accord-core/src/test/java/accord/impl/list/ListUpdate.java +++ b/accord-core/src/main-test/java/accord/impl/list/ListUpdate.java @@ -34,8 +34,8 @@ import accord.primitives.Ranges; import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.utils.Invariants; -import accord.utils.async.AsyncExecutor; import accord.utils.Timestamped; +import accord.utils.async.AsyncExecutor; public class ListUpdate extends TreeMap<Key, Integer> implements Update { @@ -95,7 +95,7 @@ public class ListUpdate extends TreeMap<Key, Integer> implements Update { ListUpdate result = new ListUpdate(executor); result.putAll(this); - result.putAll((ListUpdate) other); + result.putAll(((ListUpdate) other)); return result; } diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/main-test/java/accord/impl/list/ListWrite.java similarity index 92% rename from accord-core/src/test/java/accord/impl/list/ListWrite.java rename to accord-core/src/main-test/java/accord/impl/list/ListWrite.java index aced2e04..79e0de5a 100644 --- a/accord-core/src/test/java/accord/impl/list/ListWrite.java +++ b/accord-core/src/main-test/java/accord/impl/list/ListWrite.java @@ -56,7 +56,7 @@ public class ListWrite extends TreeMap<Key, int[]> implements Write ListStore s = (ListStore) store; if (!containsKey(key)) return Writes.SUCCESS; - TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?, ?, ?>) safeStore, ((Key)key).toUnseekable(), txnId, executeAt, true); + TimestampsForKeys.updateLastExecutionTimestamps(safeStore, ((Key)key).toUnseekable(), txnId, executeAt, true); logger.trace("submitting WRITE on {} at {} key:{}", s.node, executeAt, key); return executor.apply(safeStore.commandStore()).submit(() -> { @@ -73,7 +73,7 @@ public class ListWrite extends TreeMap<Key, int[]> implements Write if (!containsKey(key)) return; - TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?, ?, ?>) safeStore, ((Key)key).toUnseekable(), txnId, executeAt, true); + TimestampsForKeys.updateLastExecutionTimestamps(safeStore, ((Key)key).toUnseekable(), txnId, executeAt, true); logger.trace("unsafe applying WRITE on {} at {} key:{}", s.node, executeAt, key); int[] data = get(key); s.data.merge((Key)key, new Timestamped<>(executeAt, data, Arrays::toString), ListStore::merge); diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/main-test/java/accord/impl/mock/MockCluster.java similarity index 98% rename from accord-core/src/test/java/accord/impl/mock/MockCluster.java rename to accord-core/src/main-test/java/accord/impl/mock/MockCluster.java index 7604548b..6a266847 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java +++ b/accord-core/src/main-test/java/accord/impl/mock/MockCluster.java @@ -33,17 +33,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.NetworkFilter; -import accord.api.MessageSink; +import accord.Utils; import accord.api.LocalConfig; +import accord.api.MessageSink; import accord.coordinate.CoordinationAdapter; +import accord.impl.DefaultLocalListeners; +import accord.impl.DefaultRemoteListeners; import accord.impl.DefaultTimeouts; import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; -import accord.impl.DefaultLocalListeners; -import accord.impl.progresslog.DefaultProgressLogs; -import accord.impl.DefaultRemoteListeners; import accord.impl.SizeOfIntersectionSorter; import accord.impl.TestAgent; +import accord.impl.progresslog.DefaultProgressLogs; import accord.local.AgentExecutor; import accord.local.DurableBefore; import accord.local.Node; @@ -64,8 +65,6 @@ import accord.utils.Invariants; import accord.utils.RandomSource; import accord.utils.ThreadPoolScheduler; -import static accord.Utils.id; -import static accord.Utils.idList; import static accord.local.TimeService.elapsedWrapperFromNonMonotonicSource; import static accord.primitives.Routable.Domain.Key; import static accord.primitives.Txn.Kind.Write; @@ -116,7 +115,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> private synchronized Id nextNodeId() { - return id(nextNodeId++); + return Utils.id(nextNodeId++); } private synchronized long nextMessageId() @@ -258,7 +257,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> public Node get(int i) { - return get(id(i)); + return get(Utils.id(i)); } public static MockConfigurationService configService(Node node) @@ -276,7 +275,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> public MockConfigurationService configService(int i) { - return configService(id(i)); + return configService(Utils.id(i)); } public Iterable<MockConfigurationService> configServices(int... ids) @@ -299,7 +298,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> public Iterable<Node> nodes(int... ids) { assert ids.length > 0; - return nodes(idList(ids)); + return nodes(Utils.idList(ids)); } public static class Config diff --git a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java b/accord-core/src/main-test/java/accord/impl/mock/MockConfigurationService.java similarity index 82% rename from accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java rename to accord-core/src/main-test/java/accord/impl/mock/MockConfigurationService.java index a892bb8a..09cd3b02 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java +++ b/accord-core/src/main-test/java/accord/impl/mock/MockConfigurationService.java @@ -18,27 +18,27 @@ package accord.impl.mock; +import accord.api.ConfigurationService; import accord.api.MessageSink; import accord.api.TestableConfigurationService; import accord.local.Node; import accord.primitives.Ranges; import accord.topology.Topology; import accord.utils.EpochFunction; +import accord.utils.Invariants; import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; -import org.junit.jupiter.api.Assertions; - import java.util.*; public class MockConfigurationService implements TestableConfigurationService { private final MessageSink messageSink; private final List<Topology> epochs = new ArrayList<>(); - private final Map<Long, EpochReady> acks = new HashMap<>(); + private final Map<Long, ConfigurationService.EpochReady> acks = new HashMap<>(); private final List<AsyncResult<Void>> syncs = new ArrayList<>(); - private final List<Listener> listeners = new ArrayList<>(); + private final List<ConfigurationService.Listener> listeners = new ArrayList<>(); private final EpochFunction<MockConfigurationService> fetchTopologyHandler; public MockConfigurationService(MessageSink messageSink, EpochFunction<MockConfigurationService> fetchTopologyHandler) @@ -55,7 +55,7 @@ public class MockConfigurationService implements TestableConfigurationService } @Override - public synchronized void registerListener(Listener listener) + public synchronized void registerListener(ConfigurationService.Listener listener) { listeners.add(listener); } @@ -83,13 +83,13 @@ public class MockConfigurationService implements TestableConfigurationService } @Override - public synchronized void acknowledgeEpoch(EpochReady epoch, boolean startSync) + public synchronized void acknowledgeEpoch(ConfigurationService.EpochReady epoch, boolean startSync) { - Assertions.assertFalse(acks.containsKey(epoch.epoch)); + Invariants.checkArgument(acks.containsKey(epoch.epoch)); acks.put(epoch.epoch, epoch); } - public synchronized EpochReady ackFor(long epoch) + public synchronized ConfigurationService.EpochReady ackFor(long epoch) { return acks.get(epoch); } @@ -110,11 +110,11 @@ public class MockConfigurationService implements TestableConfigurationService if (topology.epoch() > epochs.size()) return; - Assertions.assertEquals(topology.epoch(), epochs.size()); + Invariants.checkArgument(topology.epoch() == epochs.size()); epochs.add(topology); List<AsyncResult<Void>> futures = new ArrayList<>(); - for (Listener listener : listeners) + for (ConfigurationService.Listener listener : listeners) futures.add(listener.onTopologyUpdate(topology, false, true)); AsyncResult<Void> result = futures.isEmpty() @@ -126,7 +126,7 @@ public class MockConfigurationService implements TestableConfigurationService public synchronized void reportSyncComplete(Node.Id node, long epoch) { - for (Listener listener : listeners) + for (ConfigurationService.Listener listener : listeners) listener.onRemoteSyncComplete(node, epoch); } } diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/main-test/java/accord/impl/mock/MockStore.java similarity index 100% rename from accord-core/src/test/java/accord/impl/mock/MockStore.java rename to accord-core/src/main-test/java/accord/impl/mock/MockStore.java diff --git a/accord-core/src/test/java/accord/impl/mock/Network.java b/accord-core/src/main-test/java/accord/impl/mock/Network.java similarity index 100% rename from accord-core/src/test/java/accord/impl/mock/Network.java rename to accord-core/src/main-test/java/accord/impl/mock/Network.java diff --git a/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java b/accord-core/src/main-test/java/accord/impl/mock/RecordingMessageSink.java similarity index 93% rename from accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java rename to accord-core/src/main-test/java/accord/impl/mock/RecordingMessageSink.java index ef00df85..a6f48f78 100644 --- a/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java +++ b/accord-core/src/main-test/java/accord/impl/mock/RecordingMessageSink.java @@ -24,8 +24,7 @@ import accord.messages.Callback; import accord.messages.Reply; import accord.messages.ReplyContext; import accord.messages.Request; - -import org.junit.jupiter.api.Assertions; +import accord.utils.Invariants; import java.util.ArrayList; import java.util.Collections; @@ -78,8 +77,8 @@ public class RecordingMessageSink extends SimpleMessageSink public void assertHistorySizes(int requests, int responses) { - Assertions.assertEquals(requests, this.requests.size()); - Assertions.assertEquals(responses, this.responses.size()); + Invariants.checkArgument(requests == this.requests.size()); + Invariants.checkArgument(responses == this.responses.size()); } public void clearHistory() diff --git a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java b/accord-core/src/main-test/java/accord/impl/mock/SimpleMessageSink.java similarity index 100% rename from accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java rename to accord-core/src/main-test/java/accord/impl/mock/SimpleMessageSink.java diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/main-test/java/accord/topology/TopologyRandomizer.java similarity index 99% rename from accord-core/src/test/java/accord/topology/TopologyRandomizer.java rename to accord-core/src/main-test/java/accord/topology/TopologyRandomizer.java index 4784d304..f8427c1f 100644 --- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java +++ b/accord-core/src/main-test/java/accord/topology/TopologyRandomizer.java @@ -52,8 +52,8 @@ import accord.utils.RandomSource; import accord.utils.SortedArrays.SortedArrayList; import org.agrona.collections.IntHashSet; -import static accord.burn.BurnTest.HASH_RANGE_END; -import static accord.burn.BurnTest.HASH_RANGE_START; +import static accord.burn.BurnTestBase.HASH_RANGE_END; +import static accord.burn.BurnTestBase.HASH_RANGE_START; // TODO (testing): add change replication factor diff --git a/accord-core/src/test/java/accord/topology/TopologyUtils.java b/accord-core/src/main-test/java/accord/topology/TopologyUtils.java similarity index 100% rename from accord-core/src/test/java/accord/topology/TopologyUtils.java rename to accord-core/src/main-test/java/accord/topology/TopologyUtils.java diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java b/accord-core/src/main-test/java/accord/utils/AccordGens.java similarity index 99% rename from accord-core/src/test/java/accord/utils/AccordGens.java rename to accord-core/src/main-test/java/accord/utils/AccordGens.java index 560caa11..ca3dc3f7 100644 --- a/accord-core/src/test/java/accord/utils/AccordGens.java +++ b/accord-core/src/main-test/java/accord/utils/AccordGens.java @@ -373,7 +373,7 @@ public class AccordGens else if (chance < 0.7f) { rf = 5; } else if (chance < 0.8f) { rf = 7; } else { rf = 9; } - Node.Id[] nodes = Utils.toArray(Gens.lists(nodeGen).unique().ofSizeBetween(rf, rf * 3).next(rs), Node.Id[]::new); + Node.Id[] nodes = toArray(Gens.lists(nodeGen).unique().ofSizeBetween(rf, rf * 3).next(rs), Node.Id[]::new); Ranges ranges = rangesGenFactory.apply(nodes.length, rf).next(rs); int numElectorate = nodes.length + rf - 1; diff --git a/accord-core/src/test/java/accord/utils/CRCUtils.java b/accord-core/src/main-test/java/accord/utils/CRCUtils.java similarity index 100% rename from accord-core/src/test/java/accord/utils/CRCUtils.java rename to accord-core/src/main-test/java/accord/utils/CRCUtils.java diff --git a/accord-core/src/test/java/accord/utils/EpochFunction.java b/accord-core/src/main-test/java/accord/utils/EpochFunction.java similarity index 100% rename from accord-core/src/test/java/accord/utils/EpochFunction.java rename to accord-core/src/main-test/java/accord/utils/EpochFunction.java diff --git a/accord-core/src/test/java/accord/utils/Gen.java b/accord-core/src/main-test/java/accord/utils/Gen.java similarity index 100% rename from accord-core/src/test/java/accord/utils/Gen.java rename to accord-core/src/main-test/java/accord/utils/Gen.java diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/main-test/java/accord/utils/Gens.java similarity index 100% rename from accord-core/src/test/java/accord/utils/Gens.java rename to accord-core/src/main-test/java/accord/utils/Gens.java diff --git a/accord-core/src/test/java/accord/utils/LazyToString.java b/accord-core/src/main-test/java/accord/utils/LazyToString.java similarity index 100% rename from accord-core/src/test/java/accord/utils/LazyToString.java rename to accord-core/src/main-test/java/accord/utils/LazyToString.java diff --git a/accord-core/src/test/java/accord/utils/LoggingRandomSource.java b/accord-core/src/main-test/java/accord/utils/LoggingRandomSource.java similarity index 100% rename from accord-core/src/test/java/accord/utils/LoggingRandomSource.java rename to accord-core/src/main-test/java/accord/utils/LoggingRandomSource.java diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java b/accord-core/src/main-test/java/accord/utils/MessageTask.java similarity index 100% rename from accord-core/src/test/java/accord/utils/MessageTask.java rename to accord-core/src/main-test/java/accord/utils/MessageTask.java diff --git a/accord-core/src/test/java/accord/utils/Pair.java b/accord-core/src/main-test/java/accord/utils/Pair.java similarity index 100% rename from accord-core/src/test/java/accord/utils/Pair.java rename to accord-core/src/main-test/java/accord/utils/Pair.java diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/main-test/java/accord/utils/Property.java similarity index 100% rename from accord-core/src/test/java/accord/utils/Property.java rename to accord-core/src/main-test/java/accord/utils/Property.java diff --git a/accord-core/src/test/java/accord/utils/RandomTestRunner.java b/accord-core/src/main-test/java/accord/utils/RandomTestRunner.java similarity index 100% rename from accord-core/src/test/java/accord/utils/RandomTestRunner.java rename to accord-core/src/main-test/java/accord/utils/RandomTestRunner.java diff --git a/accord-core/src/test/java/accord/utils/ReflectionUtils.java b/accord-core/src/main-test/java/accord/utils/ReflectionUtils.java similarity index 100% rename from accord-core/src/test/java/accord/utils/ReflectionUtils.java rename to accord-core/src/main-test/java/accord/utils/ReflectionUtils.java diff --git a/accord-core/src/test/java/accord/utils/SeedProvider.java b/accord-core/src/main-test/java/accord/utils/SeedProvider.java similarity index 100% rename from accord-core/src/test/java/accord/utils/SeedProvider.java rename to accord-core/src/main-test/java/accord/utils/SeedProvider.java diff --git a/accord-core/src/test/java/accord/utils/async/TimeoutUtils.java b/accord-core/src/main-test/java/accord/utils/async/TimeoutUtils.java similarity index 100% rename from accord-core/src/test/java/accord/utils/async/TimeoutUtils.java rename to accord-core/src/main-test/java/accord/utils/async/TimeoutUtils.java diff --git a/accord-core/src/test/java/accord/verify/CompositeVerifier.java b/accord-core/src/main-test/java/accord/verify/CompositeVerifier.java similarity index 100% rename from accord-core/src/test/java/accord/verify/CompositeVerifier.java rename to accord-core/src/main-test/java/accord/verify/CompositeVerifier.java diff --git a/accord-core/src/test/java/accord/verify/HistoryViolation.java b/accord-core/src/main-test/java/accord/verify/HistoryViolation.java similarity index 100% rename from accord-core/src/test/java/accord/verify/HistoryViolation.java rename to accord-core/src/main-test/java/accord/verify/HistoryViolation.java diff --git a/accord-core/src/test/java/accord/verify/LinearizabilityVerifier.java b/accord-core/src/main-test/java/accord/verify/LinearizabilityVerifier.java similarity index 100% rename from accord-core/src/test/java/accord/verify/LinearizabilityVerifier.java rename to accord-core/src/main-test/java/accord/verify/LinearizabilityVerifier.java diff --git a/accord-core/src/test/java/accord/verify/SerializabilityVerifier.java b/accord-core/src/main-test/java/accord/verify/SerializabilityVerifier.java similarity index 100% rename from accord-core/src/test/java/accord/verify/SerializabilityVerifier.java rename to accord-core/src/main-test/java/accord/verify/SerializabilityVerifier.java diff --git a/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java b/accord-core/src/main-test/java/accord/verify/StrictSerializabilityVerifier.java similarity index 100% rename from accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java rename to accord-core/src/main-test/java/accord/verify/StrictSerializabilityVerifier.java diff --git a/accord-core/src/test/java/accord/verify/Verifier.java b/accord-core/src/main-test/java/accord/verify/Verifier.java similarity index 100% rename from accord-core/src/test/java/accord/verify/Verifier.java rename to accord-core/src/main-test/java/accord/verify/Verifier.java --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
