This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new cf101690 CEP-15: (Accord) Bootstrap's LocalOnly txn can not be
recreated from SerializerSupport (#93)
cf101690 is described below
commit cf10169067a8cd40fb876789a62439cc03fd2e9b
Author: dcapwell <[email protected]>
AuthorDate: Mon Jun 17 12:59:35 2024 -0700
CEP-15: (Accord) Bootstrap's LocalOnly txn can not be recreated from
SerializerSupport (#93)
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-19674
---
.../src/main/java/accord/local/Bootstrap.java | 1 +
.../src/main/java/accord/local/Cleanup.java | 2 +-
accord-core/src/main/java/accord/local/Node.java | 18 ++-
.../main/java/accord/local/SerializerSupport.java | 60 ++++++-
.../main/java/accord/messages/LocalRequest.java | 25 +--
.../src/main/java/accord/messages/Propagate.java | 45 ++----
.../src/main/java/accord/primitives/SyncPoint.java | 26 +++
accord-core/src/test/java/accord/Utils.java | 2 +-
.../src/test/java/accord/impl/basic/Cluster.java | 82 ++++++++++
.../accord/impl/basic/DelayedCommandStores.java | 67 ++++++--
.../src/test/java/accord/impl/basic/Journal.java | 15 +-
.../src/test/java/accord/impl/list/ListQuery.java | 16 ++
.../test/java/accord/impl/mock/MockCluster.java | 2 +-
.../java/accord/local/BootstrapLocalTxnTest.java | 133 ++++++++++++++++
.../src/test/java/accord/local/CommandsTest.java | 103 +-----------
.../src/test/java/accord/utils/AccordGens.java | 23 +++
.../java/accord/utils/LazyToString.java} | 27 ++--
.../test/java/accord/utils/ReflectionUtils.java | 177 +++++++++++++++++++++
.../src/main/java/accord/maelstrom/Cluster.java | 2 +-
.../src/main/java/accord/maelstrom/Main.java | 2 +-
20 files changed, 635 insertions(+), 193 deletions(-)
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java
b/accord-core/src/main/java/accord/local/Bootstrap.java
index e478eb21..87fd16ff 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -120,6 +120,7 @@ class Bootstrap
{
globalSyncId = node.nextTxnId(ExclusiveSyncPoint,
Routable.Domain.Range);
localSyncId = globalSyncId.as(LocalOnly).withEpoch(epoch);
+ Invariants.checkArgument(epoch <= globalSyncId.epoch(),
"Attempting to use local epoch %d which is larger than global epoch %d", epoch,
globalSyncId.epoch());
if (!node.topology().hasEpoch(globalSyncId.epoch()))
{
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java
b/accord-core/src/main/java/accord/local/Cleanup.java
index c38de8c2..bb6b1d2e 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -105,7 +105,7 @@ public enum Cleanup
if (txnId.kind() == EphemeralRead)
return Cleanup.NO; // TODO (required): clean-up based on timeout
- if (durableBefore.min(txnId) == Universal)
+ if (durableBefore.min(txnId) == UniversalOrInvalidated)
{
if (status.hasBeen(PreCommitted) && !status.hasBeen(Applied)) //
TODO (expected): may be stale
illegalState("Loading universally-durable command that has
been PreCommitted but not Applied");
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index ced53157..26ffd317 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -89,6 +89,7 @@ import accord.utils.Invariants;
import accord.utils.MapReduceConsume;
import accord.utils.RandomSource;
import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncExecutor;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
@@ -532,9 +533,21 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
messageSink.send(to, send);
}
- public void localRequest(LocalRequest message)
+ public <R> void localRequest(LocalRequest<R> message, BiConsumer<? super
R, Throwable> callback)
{
- localRequestHandler.handle(message, this);
+ localRequestHandler.handle(message, callback, this);
+ }
+
+ public <R> AsyncChain<R> localRequest(LocalRequest<R> message)
+ {
+ return new AsyncChains.Head<R>()
+ {
+ @Override
+ protected void start(BiConsumer<? super R, Throwable> callback)
+ {
+ localRequest(message, callback);
+ }
+ };
}
public void reply(Id replyingToNode, ReplyContext replyContext, Reply
send, Throwable failure)
@@ -602,6 +615,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
public FullRoute<?> computeRoute(long epoch, Seekables<?, ?> keysOrRanges)
{
+ Invariants.checkArgument(!keysOrRanges.isEmpty(), "Attempted to
compute a route from empty keys or ranges");
RoutingKey homeKey = trySelectHomeKey(epoch, keysOrRanges);
if (homeKey == null)
homeKey = selectRandomHomeKey(epoch);
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java
b/accord-core/src/main/java/accord/local/SerializerSupport.java
index 5384a7e3..0d9ec3b5 100644
--- a/accord-core/src/main/java/accord/local/SerializerSupport.java
+++ b/accord-core/src/main/java/accord/local/SerializerSupport.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
import com.google.common.collect.ImmutableSet;
+import accord.api.Agent;
import accord.api.Result;
import accord.api.VisibleForImplementation;
import accord.local.Command.WaitingOn;
@@ -37,10 +38,13 @@ import accord.messages.MessageType;
import accord.messages.PreAccept;
import accord.messages.Propagate;
import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRangeRoute;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
+import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.Invariants;
@@ -90,11 +94,63 @@ public class SerializerSupport
.addAll(APPLY_TYPES)
.build();
+ private static Command localOnly(Agent agent, RangesForEpoch
rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt,
@Nullable Timestamp executesAtLeast, Ballot promised, Ballot accepted,
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
+ {
+ //TODO (expected): LocalOnly doesn't reflect normal command flow so
relying on this special casing helps reconstruct work, but is a bit brittle;
should find a more maintainable way.
+ TxnId txnId = attrs.txnId();
+ FullRangeRoute route = (FullRangeRoute) attrs.route();
+ //TODO (correctness): not 100% correct as the ranges was "valid" which
can be mutated "after" the sync point... so might actually have less
+ Ranges participantRanges = route.participants().toRanges();
+
+ Txn emptyTxn = agent.emptyTxn(txnId.kind(), participantRanges);
+ Writes writes = emptyTxn.execute(txnId, txnId, null);
+ Result results = emptyTxn.result(txnId, txnId, null);
+ Ranges coordinateRanges = rangesForEpoch.coordinates(txnId);
+ attrs.partialTxn(emptyTxn.slice(coordinateRanges, true))
+ .partialDeps(Deps.NONE.slice(coordinateRanges));
+
+ switch (status.status)
+ {
+ case NotDefined:
+ return status == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(txnId)
+ :
Command.NotDefined.notDefined(attrs, promised);
+ case PreAccepted:
+ return Command.PreAccepted.preAccepted(attrs, executeAt,
promised);
+ case AcceptedInvalidate:
+ case Accepted:
+ case PreCommitted:
+ return Command.Accepted.accepted(attrs, status, executeAt,
promised, accepted);
+ case Committed:
+ case Stable:
+ return Command.Committed.committed(attrs, status, executeAt,
promised, accepted, waitingOnProvider.provide(attrs.partialDeps()));
+ case PreApplied:
+ case Applied:
+ return Command.Executed.executed(attrs, status, executeAt,
promised, accepted, waitingOnProvider.provide(attrs.partialDeps()), writes,
results);
+ case Truncated:
+ case Invalidated:
+ switch (status)
+ {
+ case Erased:
+ return Command.Truncated.erased(txnId,
attrs.durability(), attrs.route());
+ case TruncatedApplyWithOutcome:
+ return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, results, executesAtLeast);
+ case TruncatedApply:
+ return Command.Truncated.truncatedApply(attrs, status,
executeAt, null, null, executesAtLeast);
+ default:
+ throw new AssertionError("Unexpected truncate status:
" + status);
+ }
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
/**
* Reconstructs Command from register values and protocol messages.
*/
- public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable
attrs, SaveStatus status, Timestamp executeAt, @Nullable Timestamp
executesAtLeast, Ballot promised, Ballot accepted, WaitingOnProvider
waitingOnProvider, MessageProvider messageProvider)
+ public static Command reconstruct(Agent agent, RangesForEpoch
rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt,
@Nullable Timestamp executesAtLeast, Ballot promised, Ballot accepted,
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
{
+ if (attrs.txnId().kind() == Txn.Kind.LocalOnly)
+ return localOnly(agent, rangesForEpoch, attrs, status, executeAt,
executesAtLeast, promised, accepted, waitingOnProvider, messageProvider);
switch (status.status)
{
case NotDefined:
@@ -308,7 +364,7 @@ public class SerializerSupport
}
else
{
- checkState(witnessed.contains(STABLE_SLOW_PATH_REQ),
"Unable to find STABLE_SLOW_PATH_REQ; witnessed %s", new
LoggedMessageProvider(messageProvider));
+ checkState(witnessed.contains(STABLE_SLOW_PATH_REQ),
"Unable to find STABLE_SLOW_PATH_REQ; txn_id=%s, witnessed %s",
messageProvider.txnId(), new LoggedMessageProvider(messageProvider));
if (witnessed.contains(COMMIT_MAXIMAL_REQ))
{
commit = messageProvider.commitMaximal();
diff --git a/accord-core/src/main/java/accord/messages/LocalRequest.java
b/accord-core/src/main/java/accord/messages/LocalRequest.java
index d12e35a9..2775e9ac 100644
--- a/accord-core/src/main/java/accord/messages/LocalRequest.java
+++ b/accord-core/src/main/java/accord/messages/LocalRequest.java
@@ -18,25 +18,26 @@
package accord.messages;
import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommandStore;
-import accord.utils.MapReduceConsume;
+import accord.primitives.TxnId;
import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
-public interface LocalRequest<R> extends Request, PreLoadContext,
MapReduceConsume<SafeCommandStore, Void>
+public interface LocalRequest<R> extends Message
{
- /**
- * Process the request without executing the callback
- */
- void process(Node on);
+ default long waitForEpoch() { return 0; }
+ @Nullable
+ TxnId primaryTxnId();
- void process(Node on, BiConsumer<R, Throwable> callback);
-
- BiConsumer<R, Throwable> callback();
+ void process(Node on, BiConsumer<? super R, Throwable> callback);
interface Handler
{
- void handle(LocalRequest<?> message, Node node);
+ <R> void handle(LocalRequest<R> message, BiConsumer<? super R,
Throwable> callback, Node node);
+ }
+
+ static <R> void simpleHandler(LocalRequest<R> message, BiConsumer<? super
R, Throwable> callback, Node node)
+ {
+ message.process(node, callback);
}
}
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java
b/accord-core/src/main/java/accord/messages/Propagate.java
index c67d5fa2..a6d209e0 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -24,6 +24,7 @@ import accord.local.Command;
import accord.local.Commands;
import accord.local.KeyHistory;
import accord.local.Node;
+import accord.local.PreLoadContext;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.local.SaveStatus;
@@ -43,6 +44,7 @@ import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.Invariants;
+import accord.utils.MapReduceConsume;
import javax.annotation.Nullable;
import java.util.function.BiConsumer;
@@ -59,13 +61,13 @@ import static accord.primitives.Routables.Slice.Minimal;
import static accord.utils.Invariants.illegalState;
// TODO (required): detect propagate loops where we don't manage to update
anything but should
-public class Propagate implements EpochSupplier, LocalRequest<Status.Known>
+public class Propagate implements EpochSupplier, LocalRequest<Status.Known>,
PreLoadContext, MapReduceConsume<SafeCommandStore, Void>
{
public static class SerializerSupport
{
public static Propagate create(TxnId txnId, Route<?> route, SaveStatus
maxKnowledgeSaveStatus, SaveStatus maxSaveStatus, Ballot ballot,
Status.Durability durability, RoutingKey homeKey, RoutingKey progressKey,
Status.Known achieved, FoundKnownMap known, boolean isTruncated, PartialTxn
partialTxn, PartialDeps committedDeps, long toEpoch, Timestamp executeAt,
Writes writes, Result result)
{
- return new Propagate(txnId, route, maxKnowledgeSaveStatus,
maxSaveStatus, ballot, durability, homeKey, progressKey, achieved, known,
isTruncated, partialTxn, committedDeps, toEpoch, executeAt, writes, result,
null);
+ return new Propagate(txnId, route, maxKnowledgeSaveStatus,
maxSaveStatus, ballot, durability, homeKey, progressKey, achieved, known,
isTruncated, partialTxn, committedDeps, toEpoch, executeAt, writes, result);
}
}
@@ -89,14 +91,7 @@ public class Propagate implements EpochSupplier,
LocalRequest<Status.Known>
@Nullable public final Timestamp committedExecuteAt;
@Nullable public final Writes writes;
@Nullable public final Result result;
-
- protected transient BiConsumer<Status.Known, Throwable> callback;
-
- @Override
- public BiConsumer<Status.Known, Throwable> callback()
- {
- return callback;
- }
+ protected transient BiConsumer<? super Status.Known, Throwable> callback;
Propagate(
TxnId txnId,
@@ -115,8 +110,7 @@ public class Propagate implements EpochSupplier,
LocalRequest<Status.Known>
long toEpoch,
@Nullable Timestamp committedExecuteAt,
@Nullable Writes writes,
- @Nullable Result result,
- BiConsumer<Status.Known, Throwable> callback)
+ @Nullable Result result)
{
this.txnId = txnId;
this.route = route;
@@ -135,14 +129,13 @@ public class Propagate implements EpochSupplier,
LocalRequest<Status.Known>
this.committedExecuteAt = committedExecuteAt;
this.writes = writes;
this.result = result;
- this.callback = callback;
}
@Override
- public void process(Node on, BiConsumer<Status.Known, Throwable> callback)
+ public void process(Node on, BiConsumer<? super Status.Known, Throwable>
callback)
{
this.callback = callback;
- process(on);
+ on.mapReduceConsumeLocal(this, route, txnId.epoch(), toEpoch, this);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -213,9 +206,9 @@ public class Propagate implements EpochSupplier,
LocalRequest<Status.Known>
stableDeps =
full.stableDeps.slice(sliceRanges).reconstitutePartial(covering);
Propagate propagate =
- new Propagate(txnId, route, full.maxKnowledgeSaveStatus,
full.maxSaveStatus, full.acceptedOrCommitted, full.durability, full.homeKey,
progressKey, achieved, full.map, isShardTruncated, partialTxn, stableDeps,
toEpoch, full.executeAtIfKnown(), full.writes, full.result, callback);
+ new Propagate(txnId, route, full.maxKnowledgeSaveStatus,
full.maxSaveStatus, full.acceptedOrCommitted, full.durability, full.homeKey,
progressKey, achieved, full.map, isShardTruncated, partialTxn, stableDeps,
toEpoch, full.executeAtIfKnown(), full.writes, full.result);
- node.localRequest(propagate);
+ node.localRequest(propagate, callback);
}
@Override
@@ -243,24 +236,6 @@ public class Propagate implements EpochSupplier,
LocalRequest<Status.Known>
return KeyHistory.COMMANDS;
}
- @Override
- public void preProcess(Node on, Node.Id from, ReplyContext replyContext)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void process(Node on, Node.Id from, ReplyContext replyContext)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void process(Node node)
- {
- node.mapReduceConsumeLocal(this, route, txnId.epoch(), toEpoch, this);
- }
-
@Override
public Void apply(SafeCommandStore safeStore)
{
diff --git a/accord-core/src/main/java/accord/primitives/SyncPoint.java
b/accord-core/src/main/java/accord/primitives/SyncPoint.java
index d22f3756..afc44165 100644
--- a/accord-core/src/main/java/accord/primitives/SyncPoint.java
+++ b/accord-core/src/main/java/accord/primitives/SyncPoint.java
@@ -72,4 +72,30 @@ public class SyncPoint<S extends Seekables<?, ?>>
TxnId maxDep = waitFor.maxTxnId();
return TxnId.nonNullOrMax(maxDep, syncId).epoch();
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SyncPoint<?> syncPoint = (SyncPoint<?>) o;
+ return syncId.equals(syncPoint.syncId) &&
waitFor.equals(syncPoint.waitFor) &&
keysOrRanges.equals(syncPoint.keysOrRanges) &&
homeKey.equals(syncPoint.homeKey);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SyncPoint{" +
+ "syncId=" + syncId +
+ ", waitFor=" + waitFor +
+ ", keysOrRanges=" + keysOrRanges +
+ ", homeKey=" + homeKey +
+ '}';
+ }
}
diff --git a/accord-core/src/test/java/accord/Utils.java
b/accord-core/src/test/java/accord/Utils.java
index 78b12988..680b47d1 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -163,7 +163,7 @@ public class Utils
LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(nodeId,
messageSink,
- LocalRequest::process,
+ LocalRequest::simpleHandler,
new MockConfigurationService(messageSink,
EpochFunction.noop(), topology),
clock,
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index da5851f8..0ac20cb3 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -22,12 +22,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
@@ -61,6 +64,7 @@ import accord.impl.PrefixedIntHashKey;
import accord.impl.SimpleProgressLog;
import accord.impl.SizeOfIntersectionSorter;
import accord.impl.TopologyFactory;
+import accord.impl.list.ListAgent;
import accord.impl.list.ListStore;
import accord.local.AgentExecutor;
import accord.local.Node.Id;
@@ -274,6 +278,84 @@ public class Cluster implements Scheduler
run.run();
}
+ public static Map<MessageType, Cluster.Stats> run(Supplier<RandomSource>
randomSupplier,
+ List<Node.Id> nodes,
+ Topology initialTopology,
+ Function<Map<Node.Id,
Node>, Request> init)
+ {
+ List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
+ PropagatingPendingQueue queue = new PropagatingPendingQueue(failures,
new RandomDelayQueue(randomSupplier.get()));
+ RandomSource retryRandom = randomSupplier.get();
+ Consumer<Runnable> retryBootstrap = retry -> {
+ long delay = retryRandom.nextInt(1, 15);
+ queue.add((PendingRunnable) retry::run, delay, TimeUnit.SECONDS);
+ };
+ Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier =
onStale -> new ListAgent(1000L, failures::add, retryBootstrap, onStale);
+ RandomSource nowRandom = randomSupplier.get();
+ Supplier<LongSupplier> nowSupplier = () -> {
+ RandomSource forked = nowRandom.fork();
+ // TODO (now): meta-randomise scale of clock drift
+ return FrequentLargeRange.builder(forked)
+ .ratio(1, 5)
+ .small(50, 5000, TimeUnit.MICROSECONDS)
+ .large(1, 10, TimeUnit.MILLISECONDS)
+ .build()
+ .mapAsLong(j -> Math.max(0,
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
+ .asLongSupplier(forked);
+ };
+ SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, new ListAgent(1000L, failures::add,
retryBootstrap, (i1, i2) -> {
+ throw new IllegalAccessError("Global executor should enver get a
stale event");
+ }));
+ TopologyFactory topologyFactory = new
TopologyFactory(initialTopology.maxRf(),
initialTopology.ranges().stream().toArray(Range[]::new))
+ {
+ @Override
+ public Topology toTopology(Node.Id[] cluster)
+ {
+ return initialTopology;
+ }
+ };
+ AtomicInteger counter = new AtomicInteger();
+ AtomicReference<Map<Id, Node>> nodeMap = new AtomicReference<>();
+ Map<MessageType, Cluster.Stats> stats =
Cluster.run(nodes.toArray(Node.Id[]::new),
+
MessageListener.get(),
+ () -> queue,
+ (id, onStale) ->
globalExecutor.withAgent(agentSupplier.apply(onStale)),
+
queue::checkFailures,
+ ignore -> {
+ },
+ randomSupplier,
+ nowSupplier,
+ topologyFactory,
+ new Supplier<>()
+ {
+ private
Iterator<Request> requestIterator = null;
+ private final
RandomSource rs = randomSupplier.get();
+ @Override
+ public Packet
get()
+ {
+ //
((Cluster) node.scheduler()).onDone(() -> checkOnResult(homeKey, txnId, 0,
null));
+ if
(requestIterator == null)
+ {
+
Map<Node.Id, Node> nodes = nodeMap.get();
+
requestIterator = Collections.singleton(init.apply(nodes)).iterator();
+ }
+ if
(!requestIterator.hasNext())
+ return
null;
+ Node.Id id
= rs.pick(nodes);
+ return new
Packet(id, id, counter.incrementAndGet(), requestIterator.next());
+ }
+ },
+ Runnable::run,
+ nodeMap::set);
+ if (!failures.isEmpty())
+ {
+ AssertionError error = new AssertionError("Unexpected errors
detected");
+ failures.forEach(error::addSuppressed);
+ throw error;
+ }
+ return stats;
+ }
+
public static Map<MessageType, Stats> run(Id[] nodes, MessageListener
messageListener, Supplier<PendingQueue> queueSupplier,
BiFunction<Id,
BiConsumer<Timestamp, Ranges>, AgentExecutor> nodeExecutorSupplier,
Runnable checkFailures,
Consumer<Packet> responseSink,
diff --git
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 7998b6ba..5ba32268 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -19,14 +19,17 @@
package accord.impl.basic;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import accord.api.Agent;
import accord.api.DataStore;
@@ -54,12 +57,53 @@ import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Topology;
import accord.utils.Invariants;
+import accord.utils.LazyToString;
import accord.utils.RandomSource;
+import accord.utils.ReflectionUtils;
+import accord.utils.ReflectionUtils.Difference;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
{
+ //TODO (correctness): remove once we have a Journal integration that does
not change the values of a command...
+ // There are known issues due to Journal, so exclude known problamtic
fields
+ private static class ValidationHack
+ {
+ private static Pattern field(String path)
+ {
+ path = path.replace(".", "\\.");
+ path += ".*";
+ return Pattern.compile(path);
+ }
+
+ private static final List<Pattern> KNOWN_ISSUES =
List.of(field(".partialTxn.keys."),
+
field(".partialTxn.read.keys."),
+
field(".partialTxn.read.userReadKeys."),
+
field(".partialTxn.covering."),
+
field(".partialDeps.keyDeps."),
+
field(".partialDeps.rangeDeps."),
+
field(".partialDeps.covering."),
+
field(".additionalKeysOrRanges."),
+ // when a
new epoch is detected and the execute ranges have more than the coordinating
ranges,
+ // and the
coordinating ranges doesn't include the home key... we drop the query...
+ // The logic
to stitch messages together is not able to handle this as it doesn't know the
original Topologies
+
field(".partialTxn.query."),
+ //
cmd.mutable().build() != cmd. This is due to Command.durability changing
NotDurable to Local depending on the status
+
field(".durability."));
+ }
+
+ private static boolean hasKnownIssue(String path)
+ {
+ for (Pattern p : ValidationHack.KNOWN_ISSUES)
+ {
+ Matcher m = p.matcher(path);
+ if (m.find())
+ return true;
+ }
+ return false;
+ }
+
private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore
store, RandomSource random, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService
executorService, BooleanSupplier isLoadedCheck, Journal journal)
{
super(time, agent, store, random, shardDistributor,
progressLogFactory, DelayedCommandStore.factory(executorService, isLoadedCheck,
journal));
@@ -123,14 +167,11 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
}
@Override
- protected void validateRead(Command current)
+ public void validateRead(Command current)
{
// "loading" the command doesn't make sense as we don't "store"
the command...
if (current.txnId().kind() == Txn.Kind.EphemeralRead)
return;
- //TODO (correctness): these type of txn must be durable but
currently they are not... should make sure this is plugged into the C* journal
properly for reply
- if (current.txnId().kind() == Txn.Kind.LocalOnly)
- return;
Command.WaitingOn waitingOn = null;
if (current.isStable() && !current.isTruncated())
waitingOn = current.asCommitted().waitingOn;
@@ -138,10 +179,11 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
Command.WaitingOn finalWaitingOn = waitingOn;
CommonAttributes.Mutable mutable = current.mutable();
mutable.partialDeps(null).removePartialTxn();
+
Command reconstructed;
try
{
- reconstructed =
SerializerSupport.reconstruct(unsafeRangesForEpoch(), mutable,
current.saveStatus(), current.executeAt(),
current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null,
current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn,
messages);
+ reconstructed = SerializerSupport.reconstruct(agent(),
unsafeRangesForEpoch(), mutable, current.saveStatus(), current.executeAt(),
current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null,
current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn,
messages);
}
catch (IllegalStateException t)
{
@@ -152,16 +194,9 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
return;
throw t;
}
- //TODO (correctness): journal doesn't guarantee we pick the same
records we used to state transition
- if (current.partialDeps() != null &&
!current.partialDeps().rangeDeps.equals(reconstructed.partialDeps().rangeDeps))
- return;
- // for some reasons scope doesn't alaways match, this might be due
to journal... what sucks is that this can also be a bug in the extract, so its
- // hard to figure out what happened.
- if (current.partialDeps() != null &&
!current.partialDeps().equals(reconstructed.partialDeps()))
- return;
- if (current.isCommitted() && !current.isTruncated() &&
!Objects.equals(current.asCommitted().waitingOn(),
reconstructed.asCommitted().waitingOn()))
- return;
-// Invariants.checkState(current.equals(reconstructed), "Commands
did not match: expected %s, given %s", current, reconstructed);
+ List<Difference<?>> diff =
ReflectionUtils.recursiveEquals(current, reconstructed);
+ List<String> filteredDiff = diff.stream().filter(d ->
!DelayedCommandStores.hasKnownIssue(d.path)).map(Object::toString).collect(Collectors.toList());
+ Invariants.checkState(filteredDiff.isEmpty(), "Commands did not
match: expected %s, given %s, node %s, store %d, diff %s", current,
reconstructed, time, id(), new LazyToString(() -> String.join("\n",
filteredDiff)));
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java
b/accord-core/src/test/java/accord/impl/basic/Journal.java
index 7c0217b5..4c5f63e5 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -132,18 +133,18 @@ public class Journal implements LocalRequest.Handler,
Runnable
{
this.node = null;
}
-
+
@Override
- public void handle(LocalRequest<?> message, Node node)
+ public <R> void handle(LocalRequest<R> message, BiConsumer<? super R,
Throwable> callback, Node node)
{
messageListener.onMessage(NodeSink.Action.DELIVER, node.id(),
node.id(), -1, message);
if (message.type().hasSideEffects())
{
// enqueue
- unframedRequests.add(new RequestContext(message, () ->
node.scheduler().now(() -> message.process(node))));
+ unframedRequests.add(new RequestContext(message,
message.waitForEpoch(), () -> node.scheduler().now(() -> message.process(node,
callback))));
return;
}
- message.process(node);
+ message.process(node, callback);
}
public void handle(Request request, Node.Id from, ReplyContext
replyContext)
@@ -151,7 +152,7 @@ public class Journal implements LocalRequest.Handler,
Runnable
if (request.type() != null && request.type().hasSideEffects())
{
// enqueue
- unframedRequests.add(new RequestContext(request, () ->
node.receive(request, from, replyContext)));
+ unframedRequests.add(new RequestContext(request,
request.waitForEpoch(), () -> node.receive(request, from, replyContext)));
return;
}
node.receive(request, from, replyContext);
@@ -286,9 +287,9 @@ public class Journal implements LocalRequest.Handler,
Runnable
final Message message;
final Runnable fn;
- protected RequestContext(Request request, Runnable fn)
+ protected RequestContext(Message request, long waitForEpoch, Runnable
fn)
{
- this.waitForEpoch = request.waitForEpoch();
+ this.waitForEpoch = waitForEpoch;
this.message = request;
this.fn = fn;
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java
b/accord-core/src/test/java/accord/impl/list/ListQuery.java
index db2c7d4c..24488101 100644
--- a/accord-core/src/test/java/accord/impl/list/ListQuery.java
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -19,6 +19,7 @@
package accord.impl.list;
import java.util.Map;
+import java.util.Objects;
import accord.api.Data;
import accord.api.Key;
@@ -62,4 +63,19 @@ public class ListQuery implements Query
}
return new ListResult(ListResult.Status.Applied, client, requestId,
txnId, read.userReadKeys, responseKeys, values, (ListUpdate) update);
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ListQuery listQuery = (ListQuery) o;
+ return requestId == listQuery.requestId && Objects.equals(client,
listQuery.client);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index f3b80c53..de8c78f0 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -125,7 +125,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(id,
messageSink,
- LocalRequest::process,
+ LocalRequest::simpleHandler,
configurationService,
nowSupplier,
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
diff --git a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
new file mode 100644
index 00000000..ab38eb7f
--- /dev/null
+++ b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Test;
+
+import accord.impl.PrefixedIntHashKey;
+import accord.impl.basic.Cluster;
+import accord.impl.basic.DelayedCommandStores.DelayedCommandStore;
+import accord.messages.MessageType;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
+import accord.primitives.SyncPoint;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topology;
+import accord.topology.TopologyUtils;
+import accord.utils.AccordGens;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import org.assertj.core.api.Assertions;
+
+import static accord.local.PreLoadContext.contextFor;
+import static accord.utils.Property.qt;
+
+class BootstrapLocalTxnTest
+{
+ private static final Gen<Gen<Cleanup>> CLEANUP_DISTRIBUTION =
Gens.enums().allMixedDistribution(Cleanup.class);
+
+ @Test
+ public void localOnlyTxnLifeCycle()
+ {
+ Ranges ranges =
Ranges.ofSortedAndDeoverlapped(PrefixedIntHashKey.ranges(0, 1));
+ List<Node.Id> nodes = Collections.singletonList(new Node.Id(42));
+ Topology t = TopologyUtils.topology(1, nodes, ranges, 2);
+ qt().check(rs -> Cluster.run(rs::fork, nodes, t, nodeMap -> new
Request()
+ {
+ @Override
+ public void preProcess(Node on, Node.Id from, ReplyContext
replyContext)
+ {
+ // no-op
+ }
+
+ @Override
+ public void process(Node on, Node.Id from, ReplyContext
replyContext)
+ {
+ Gen<Cleanup> cleanupGen = CLEANUP_DISTRIBUTION.next(rs);
+ for (int storeId : on.commandStores().ids())
+ {
+ DelayedCommandStore store = (DelayedCommandStore)
on.commandStores().forId(storeId);
+ // this is a bit redudent but here to make the test easier
to maintain. Pre/Post execute we validate each command to make sure everything
is fine
+ // but that logic could be changed and this test has a
dependency on validation the command... so to make this dependency explicit
+ // the test will call the validation logic within the test
even though it will be called again in the background...
+ Consumer<Command> validate = store::validateRead;
+ TxnId globalSyncId =
on.nextTxnId(Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range);
+ TxnId localSyncId = globalSyncId.as(Txn.Kind.LocalOnly);
+ TxnId nextGlobalSyncId =
on.nextTxnId(Txn.Kind.ExclusiveSyncPoint,
Routable.Domain.Range).withEpoch(globalSyncId.epoch() + 1);
+ Ranges ranges =
AccordGens.rangesInsideRanges(store.updateRangesForEpoch().currentRanges(),
(rs2, r) -> rs2.nextInt(1, 4)).next(rs);
+
+ FullRoute<?> route = on.computeRoute(globalSyncId, ranges);
+ SyncPoint<Ranges> syncPoint = new
SyncPoint<>(globalSyncId, Deps.NONE, ranges, route);
+ Ranges valid = AccordGens.rangesInsideRanges(ranges, (rs2,
r) -> rs2.nextInt(1, 4)).next(rs);
+
Invariants.checkArgument(syncPoint.keysOrRanges.containsAll(valid));
+ store.execute(contextFor(localSyncId,
syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safe ->
Commands.createBootstrapCompleteMarkerTransaction(safe, localSyncId, syncPoint,
valid))
+ .flatMap(ignore ->
store.execute(contextFor(localSyncId), safe ->
validate.accept(safe.get(localSyncId, route.homeKey()).current())))
+ .flatMap(ignore ->
store.execute(contextFor(localSyncId), safe ->
Commands.markBootstrapComplete(safe, localSyncId, ranges)))
+ .flatMap(ignore ->
store.execute(contextFor(localSyncId), safe ->
validate.accept(safe.get(localSyncId, route.homeKey()).current())))
+ // cleanup txn
+ .flatMap(ignore ->
store.submit(PreLoadContext.empty(), safe -> {
+ Cleanup target = cleanupGen.next(rs);
+ if (target == Cleanup.NO)
+ return Cleanup.NO;
+
safe.commandStore().setRedundantBefore(RedundantBefore.create(ranges,
Long.MIN_VALUE, Long.MAX_VALUE, nextGlobalSyncId, nextGlobalSyncId,
TxnId.NONE));
+ switch (target)
+ {
+ case ERASE:
+
safe.commandStore().setDurableBefore(DurableBefore.create(ranges,
nextGlobalSyncId, nextGlobalSyncId));
+ break;
+ case TRUNCATE:
+
safe.commandStore().setDurableBefore(DurableBefore.create(ranges,
nextGlobalSyncId, globalSyncId));
+ break;
+ case TRUNCATE_WITH_OUTCOME:
+ // no update to DurableBefore =
TRUNCATE_WITH_OUTCOME
+ break;
+ default:
+ throw new
UnsupportedOperationException(target.name());
+ }
+ return target;
+ }))
+ // validate cmd
+ .flatMap(target ->
store.execute(contextFor(localSyncId), safe -> {
+ SafeCommand cmd = safe.get(localSyncId,
route.homeKey());
+ Command current = cmd.current();
+ validate.accept(current);
+
Assertions.assertThat(current.saveStatus()).isEqualTo(target == Cleanup.NO ?
SaveStatus.Applied : target.appliesIfNot);
+ }))
+ .begin(on.agent());
+ }
+ }
+
+ @Override
+ public MessageType type()
+ {
+ return null;
+ }
+ }));
+ }
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java
b/accord-core/src/test/java/accord/local/CommandsTest.java
index 6731a742..d8a00731 100644
--- a/accord-core/src/test/java/accord/local/CommandsTest.java
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -21,25 +21,15 @@ package accord.local;
import accord.api.Key;
import accord.api.RoutingKey;
import accord.api.TestableConfigurationService;
-import accord.burn.random.FrequentLargeRange;
import accord.coordinate.Preempted;
import accord.coordinate.Timeout;
import accord.coordinate.TopologyMismatch;
import accord.impl.InMemoryCommandStore;
-import accord.impl.MessageListener;
import accord.impl.PrefixedIntHashKey;
-import accord.impl.TopologyFactory;
import accord.impl.basic.Cluster;
-import accord.impl.basic.Packet;
-import accord.impl.basic.PendingRunnable;
-import accord.impl.basic.PropagatingPendingQueue;
-import accord.impl.basic.RandomDelayQueue;
-import accord.impl.basic.SimulatedDelayedExecutorService;
-import accord.impl.list.ListAgent;
import accord.messages.MessageType;
import accord.messages.ReplyContext;
import accord.messages.Request;
-import accord.primitives.Timestamp;
import accord.topology.TopologyUtils;
import accord.primitives.Keys;
import accord.primitives.Range;
@@ -51,24 +41,13 @@ import accord.topology.Topology;
import accord.utils.AccordGens;
import accord.utils.Gen;
import accord.utils.Gens;
-import accord.utils.RandomSource;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -99,8 +78,12 @@ class CommandsTest
Topology initialTopology = TopologyUtils.topology(1, nodes,
Ranges.of(allRanges), rf);
Topology updatedTopology = TopologyUtils.topology(2, nodes,
Ranges.of(prefix0), rf); // drop prefix1
- cluster(rs::fork, nodes, initialTopology, nodeMap -> new Request()
+ Cluster.run(rs::fork, nodes, initialTopology, nodeMap -> new
Request()
{
+ {
+ // the node selected does not matter and should not impact
determinism, they all share the same scheduler
+ ((Cluster)
nodeMap.values().stream().findFirst().get().scheduler()).onDone(() ->
checkState(0, nodeMap.values()));
+ }
@Override
public void preProcess(Node on, Node.Id from, ReplyContext
replyContext)
{
@@ -149,82 +132,6 @@ class CommandsTest
});
}
- static void cluster(Supplier<RandomSource> randomSupplier, List<Node.Id>
nodes, Topology initialTopology, Function<Map<Node.Id, Node>, Request> init)
- {
- List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
- PropagatingPendingQueue queue = new PropagatingPendingQueue(failures,
new RandomDelayQueue(randomSupplier.get()));
- RandomSource retryRandom = randomSupplier.get();
- Consumer<Runnable> retryBootstrap = retry -> {
- long delay = retryRandom.nextInt(1, 15);
- queue.add((PendingRunnable) retry::run, delay, TimeUnit.SECONDS);
- };
- Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier =
onStale -> new ListAgent(1000L, failures::add, retryBootstrap, onStale);
- RandomSource nowRandom = randomSupplier.get();
- Supplier<LongSupplier> nowSupplier = () -> {
- RandomSource forked = nowRandom.fork();
- // TODO (now): meta-randomise scale of clock drift
- return FrequentLargeRange.builder(forked)
- .ratio(1, 5)
- .small(50, 5000, TimeUnit.MICROSECONDS)
- .large(1, 10, TimeUnit.MILLISECONDS)
- .build()
- .mapAsLong(j -> Math.max(0,
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
- .asLongSupplier(forked);
- };
- SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, new ListAgent(1000L, failures::add,
retryBootstrap, (i1, i2) -> {
- throw new IllegalAccessError("Global executor should enver get a
stale event");
- }));
- TopologyFactory topologyFactory = new
TopologyFactory(initialTopology.maxRf(),
initialTopology.ranges().stream().toArray(Range[]::new))
- {
- @Override
- public Topology toTopology(Node.Id[] cluster)
- {
- return initialTopology;
- }
- };
- AtomicInteger counter = new AtomicInteger();
- AtomicReference<Map<Node.Id, Node>> nodeMap = new AtomicReference<>();
- Cluster.run(nodes.toArray(Node.Id[]::new),
- MessageListener.get(),
- () -> queue,
- (id, onStale) ->
globalExecutor.withAgent(agentSupplier.apply(onStale)),
- queue::checkFailures,
- ignore -> {
- },
- randomSupplier,
- nowSupplier,
- topologyFactory,
- new Supplier<>()
- {
- private Iterator<Request> requestIterator = null;
- private final RandomSource rs = randomSupplier.get();
- @Override
- public Packet get()
- {
- // ((Cluster) node.scheduler()).onDone(() ->
checkOnResult(homeKey, txnId, 0, null));
- if (requestIterator == null)
- {
- Map<Node.Id, Node> nodes = nodeMap.get();
- requestIterator =
Collections.singleton(init.apply(nodes)).iterator();
- // the node selected does not matter and
should not impact determinism, they all share the same scheduler
- ((Cluster)
nodes.values().stream().findFirst().get().scheduler()).onDone(() ->
checkState(0, nodes.values()));
- }
- if (!requestIterator.hasNext())
- return null;
- Node.Id id = rs.pick(nodes);
- return new Packet(id, id,
counter.incrementAndGet(), requestIterator.next());
- }
- },
- Runnable::run,
- nodeMap::set);
- if (!failures.isEmpty())
- {
- AssertionError error = new AssertionError("Unexpected errors
detected");
- failures.forEach(error::addSuppressed);
- throw error;
- }
- }
-
private static void checkState(int attempt, Collection<Node> values)
{
List<String> faults = new ArrayList<>();
diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java
b/accord-core/src/test/java/accord/utils/AccordGens.java
index 3247ebb1..ba326c3a 100644
--- a/accord-core/src/test/java/accord/utils/AccordGens.java
+++ b/accord-core/src/test/java/accord/utils/AccordGens.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
+import java.util.function.ToIntBiFunction;
import javax.annotation.Nullable;
@@ -392,6 +393,28 @@ public class AccordGens
throw new IllegalArgumentException("Unsupported type: " +
range.start().getClass());
}
+ public static Gen<Ranges> rangesInsideRanges(Ranges ranges,
ToIntBiFunction<RandomSource, Range> numSplits)
+ {
+ if (ranges.isEmpty())
+ return ignore -> ranges;
+ List<Gen<Range>> subsets = new ArrayList<>(ranges.size());
+ ranges.forEach(r -> subsets.add(rangeInsideRange(r)));
+ return rs -> {
+ List<Range> result = new ArrayList<>(subsets.size());
+ for (int i = 0; i < subsets.size(); i++)
+ {
+ Range range = ranges.get(i);
+ int splits = numSplits.applyAsInt(rs, range);
+ if (splits < 0) throw new IllegalArgumentException("numSplits
is less than 0: given " + splits);
+ if (splits == 0) continue;
+ Gen<Range> gen = subsets.get(i);
+ for (int s = 0; s < splits; s++)
+ result.add(gen.next(rs));
+ }
+ return Ranges.of(result.toArray(Range[]::new));
+ };
+ }
+
public static Gen<Range> prefixedIntHashKeyRangeInsideRange(Range range)
{
if (!(range.end() instanceof PrefixedIntHashKey))
diff --git a/accord-core/src/main/java/accord/messages/LocalRequest.java
b/accord-core/src/test/java/accord/utils/LazyToString.java
similarity index 58%
copy from accord-core/src/main/java/accord/messages/LocalRequest.java
copy to accord-core/src/test/java/accord/utils/LazyToString.java
index d12e35a9..3496184a 100644
--- a/accord-core/src/main/java/accord/messages/LocalRequest.java
+++ b/accord-core/src/test/java/accord/utils/LazyToString.java
@@ -15,28 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package accord.messages;
-import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommandStore;
-import accord.utils.MapReduceConsume;
+package accord.utils;
-import java.util.function.BiConsumer;
+import java.util.function.Supplier;
-public interface LocalRequest<R> extends Request, PreLoadContext,
MapReduceConsume<SafeCommandStore, Void>
+public class LazyToString
{
- /**
- * Process the request without executing the callback
- */
- void process(Node on);
+ private final Supplier<String> toString;
- void process(Node on, BiConsumer<R, Throwable> callback);
-
- BiConsumer<R, Throwable> callback();
+ public LazyToString(Supplier<String> toString)
+ {
+ this.toString = toString;
+ }
- interface Handler
+ @Override
+ public String toString()
{
- void handle(LocalRequest<?> message, Node node);
+ return this.toString.get();
}
}
diff --git a/accord-core/src/test/java/accord/utils/ReflectionUtils.java
b/accord-core/src/test/java/accord/utils/ReflectionUtils.java
new file mode 100644
index 00000000..701c911e
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/ReflectionUtils.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableSet;
+
+import accord.local.Node;
+import accord.primitives.Timestamp;
+
+public class ReflectionUtils
+{
+ /**
+ * Types that should not walk the fields in the type.
+ */
+ private static final Set<Class<?>> NO_RECURSIVE =
ImmutableSet.of(String.class,
+ //
numbers store primitives but reflection converts to a boxed value; this will
cause a recursive check over the same thing over and over again...
+
Byte.class,
+
Character.class,
+
Short.class,
+
Integer.class,
+
Long.class,
+
Float.class,
+
Double.class,
+ //
accord primitives
+
Timestamp.class,
+
Node.Id.class);
+
+ public static String toString(Object o)
+ {
+ if (o == null) return "null";
+ if (!o.getClass().isArray())
+ return o.toString();
+ if (o instanceof Object[])
+ return Stream.of((Object[])
o).map(ReflectionUtils::toString).collect(Collectors.toList()).toString();
+ if (o instanceof byte[])
+ return Arrays.toString((byte[]) o);
+ if (o instanceof short[])
+ return Arrays.toString((short[]) o);
+ if (o instanceof int[])
+ return Arrays.toString((int[]) o);
+ if (o instanceof long[])
+ return Arrays.toString((long[]) o);
+ if (o instanceof float[])
+ return Arrays.toString((float[]) o);
+ if (o instanceof double[])
+ return Arrays.toString((double[]) o);
+ if (o instanceof char[])
+ return Arrays.toString((char[]) o);
+ if (o instanceof boolean[])
+ return Arrays.toString((boolean[]) o);
+ throw new UnsupportedOperationException("Unknown array type: " +
o.getClass());
+ }
+
+ public static List<Difference<?>> recursiveEquals(Object lhs, Object rhs)
+ {
+ if (Objects.equals(lhs, rhs)) return Collections.emptyList();
+ // there is a difference... find it...
+ List<Difference<?>> accum = new ArrayList<>();
+ DeterministicIdentitySet<Object> seenLhs = new
DeterministicIdentitySet<>();
+ DeterministicIdentitySet<Object> seenRhs = new
DeterministicIdentitySet<>();
+ recursiveEquals(".", seenLhs, lhs, seenRhs, rhs, accum);
+ return accum;
+ }
+
+ private static void recursiveEquals(String path,
DeterministicIdentitySet<Object> seenLhs, Object lhs,
DeterministicIdentitySet<Object> seenRhs, Object rhs, List<Difference<?>> accum)
+ {
+ if (Objects.equals(lhs, rhs)) return;
+ if (lhs == null || rhs == null)
+ {
+ // one side is null, so doesn't make sense to look further
+ accum.add(new Difference<>(path, lhs, rhs));
+ return;
+ }
+ if (!(seenLhs.add(lhs) && seenRhs.add(rhs)))
+ {
+ // seen pointer before, unsafe to keep walking
+ accum.add(new Difference<>(path, lhs, rhs));
+ return;
+ }
+ List<Field> fields = getFields(lhs.getClass());
+ if (fields.isEmpty())
+ {
+ accum.add(new Difference<>(path, lhs, rhs));
+ return;
+ }
+ int startSize = accum.size();
+ for (Field f : fields)
+ {
+ f.setAccessible(true);
+ try
+ {
+ recursiveEquals(path + f.getName() + '.', seenLhs, f.get(lhs),
seenRhs, f.get(rhs), accum);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ // no field difference found, but the type defined equals as
different, so add this level
+ if (startSize == accum.size())
+ accum.add(new Difference<>(path, lhs, rhs));
+ }
+ private static boolean checkFields(Class<?> klass)
+ {
+ return !(klass.isEnum() || NO_RECURSIVE.contains(klass));
+ }
+
+ private static List<Field> getFields(Class<?> aClass)
+ {
+ if (!checkFields(aClass))
+ return Collections.emptyList();
+ List<Field> fields = new ArrayList<>();
+ for (Class<?> klass = aClass; klass != null; klass =
klass.getSuperclass())
+ {
+ Field[] fs = klass.getDeclaredFields();
+ if (fs != null && fs.length > 0)
+ {
+ for (Field f : fs)
+ {
+ if (Modifier.isStatic(f.getModifiers()) ||
Modifier.isTransient(f.getModifiers())) continue;
+ fields.add(f);
+ }
+ }
+ }
+ return fields;
+ }
+
+ public static class Difference<T>
+ {
+ public final String path;
+ public final T lhs, rhs;
+
+ public Difference(String path, T lhs, T rhs)
+ {
+ this.path = path;
+ this.lhs = lhs;
+ this.rhs = rhs;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Diff{" +
+ "path='" + path + '\'' +
+ ", lhs=" + ReflectionUtils.toString(lhs) +
+ ", rhs=" + ReflectionUtils.toString(rhs) +
+ '}';
+ }
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 1d9abff9..c4c34d78 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -321,7 +321,7 @@ public class Cluster implements Scheduler
MessageSink messageSink = sinks.create(node,
randomSupplier.get());
LongSupplier nowSupplier = nowSupplierSupplier.get();
LocalConfig localConfig = new MutableLocalConfig();
- lookup.put(node, new Node(node, messageSink,
LocalRequest::process, new SimpleConfigService(topology),
+ lookup.put(node, new Node(node, messageSink,
LocalRequest::simpleHandler, new SimpleConfigService(topology),
nowSupplier,
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier),
MaelstromStore::new, new
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE,
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index a7ac2ced..16a003a8 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -177,7 +177,7 @@ public class Main
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start,
init.self, out, err);
LocalConfig localConfig = new MutableLocalConfig();
- on = new Node(init.self, sink, LocalRequest::process, new
SimpleConfigService(topology),
+ on = new Node(init.self, sink, LocalRequest::simpleHandler, new
SimpleConfigService(topology),
System::currentTimeMillis,
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis),
MaelstromStore::new, new
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(),
scheduler, SizeOfIntersectionSorter.SUPPLIER,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]