This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 256b35e2 Need to simulate Cassandra Journal in Accord BurnTest to
detect issues earlier before they are seen in Cassandra (#87)
256b35e2 is described below
commit 256b35e27d170db9fcd8024d5678b4f6e9d3a956
Author: dcapwell <[email protected]>
AuthorDate: Wed May 15 09:16:01 2024 -0700
Need to simulate Cassandra Journal in Accord BurnTest to detect issues
earlier before they are seen in Cassandra (#87)
patch by Benedict Elliott Smith, David Capwell; reviewed by Benedict
Elliott Smith, David Capwell for CASSANDRA-19618
---
.../coordinate/AbstractCoordinatePreAccept.java | 8 +-
.../accord/coordinate/CoordinatePreAccept.java | 2 +-
.../accord/coordinate/CoordinationAdapter.java | 5 +
.../java/accord/impl/InMemoryCommandStore.java | 26 +-
.../main/java/accord/impl/InMemorySafeCommand.java | 28 ++
.../src/main/java/accord/local/Bootstrap.java | 5 +-
.../src/main/java/accord/local/Command.java | 32 +-
.../src/main/java/accord/local/CommandStore.java | 6 +
.../src/main/java/accord/local/CommandsForKey.java | 5 +-
.../main/java/accord/local/CommonAttributes.java | 6 +
.../main/java/accord/local/SafeCommandStore.java | 2 +-
.../main/java/accord/local/SerializerSupport.java | 166 ++++++--
.../src/main/java/accord/messages/Propagate.java | 1 +
.../src/main/java/accord/primitives/Routables.java | 6 +
.../src/main/java/accord/utils/Invariants.java | 5 +
.../src/test/java/accord/burn/BurnTest.java | 50 ++-
.../src/test/java/accord/impl/MessageListener.java | 19 +
.../src/test/java/accord/impl/basic/Cluster.java | 146 ++++++-
.../accord/impl/basic/DelayedCommandStores.java | 107 ++++-
.../src/test/java/accord/impl/basic/Journal.java | 436 +++++++++++++++++++++
.../src/test/java/accord/impl/list/ListRead.java | 18 +
.../src/test/java/accord/impl/list/ListResult.java | 35 ++
.../src/test/java/accord/impl/list/ListStore.java | 11 +
.../src/test/java/accord/impl/list/ListWrite.java | 27 ++
.../src/test/java/accord/utils/AccordGens.java | 44 +++
accord-core/src/test/java/accord/utils/Gens.java | 20 +-
26 files changed, 1138 insertions(+), 78 deletions(-)
diff --git
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
index 39083dbd..b5c5b49d 100644
---
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
+++
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
@@ -96,6 +96,7 @@ abstract class AbstractCoordinatePreAccept<T, R> extends
SettableResult<T> imple
}
final Node node;
+ @Nullable
final TxnId txnId;
final FullRoute<?> route;
@@ -219,7 +220,7 @@ abstract class AbstractCoordinatePreAccept<T, R> extends
SettableResult<T> imple
onNewEpochTopologyMismatch(mismatch);
return;
}
- topologies = node.topology().withUnsyncedEpochs(route,
txnId.epoch(), latestEpoch);
+ topologies = node.topology().withUnsyncedEpochs(route,
earliestEpoch(), latestEpoch);
boolean equivalent = topologies.oldestEpoch() <=
prevTopologies.currentEpoch();
for (long epoch = topologies.currentEpoch() ; equivalent && epoch
> prevTopologies.currentEpoch() ; --epoch)
equivalent =
topologies.forEpoch(epoch).shards().equals(prevTopologies.current().shards());
@@ -236,6 +237,11 @@ abstract class AbstractCoordinatePreAccept<T, R> extends
SettableResult<T> imple
});
}
+ protected long earliestEpoch()
+ {
+ return txnId == null ? executeAtEpoch() : txnId.epoch();
+ }
+
@Override
public final void accept(T success, Throwable failure)
{
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
index 5f25e41a..1c19fc60 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
@@ -152,7 +152,7 @@ abstract class CoordinatePreAccept<T> extends
AbstractCoordinatePreAccept<T, Pre
void onPreAccepted(Topologies topologies)
{
Timestamp executeAt = foldl(oks, (ok, prev) ->
mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
- onPreAccepted(topologies, executeAt, oks);
+ node.withEpoch(executeAt.epoch(), () -> onPreAccepted(topologies,
executeAt, oks));
}
abstract void onPreAccepted(Topologies topologies, Timestamp executeAt,
List<PreAcceptOk> oks);
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
index 0b08dd2d..dc8de331 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
@@ -91,6 +91,11 @@ public interface CoordinationAdapter<R>
public static <R> void stabilise(CoordinationAdapter<R> adapter, Node
node, Topologies any, FullRoute<?> route, Ballot ballot, TxnId txnId, Txn txn,
Timestamp executeAt, Deps deps, BiConsumer<? super R, Throwable> callback)
{
+ if (!node.topology().hasEpoch(executeAt.epoch()))
+ {
+ node.withEpoch(executeAt.epoch(), () -> stabilise(adapter,
node, any, route, ballot, txnId, txn, executeAt, deps, callback));
+ return;
+ }
Topologies coordinates = any.forEpochs(txnId.epoch(),
txnId.epoch());
Topologies all;
if (txnId.epoch() == executeAt.epoch()) all = coordinates;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index e94bac11..5ee3f8d5 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -388,6 +388,8 @@ public abstract class InMemoryCommandStore extends
CommandStore
timestampsForKey.put(key, timestampsForKey((Key)
key).createSafeReference());
}
+ protected void validateRead(Command current) {}
+
protected final InMemorySafeStore createSafeStore(PreLoadContext context,
RangesForEpoch ranges)
{
Map<TxnId, InMemorySafeCommand> commands = new HashMap<>();
@@ -395,6 +397,14 @@ public abstract class InMemoryCommandStore extends
CommandStore
Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey = new
HashMap<>();
context.forEachId(txnId -> commands.put(txnId, lazyReference(txnId)));
+ for (InMemorySafeCommand safe : commands.values())
+ {
+ GlobalCommand global = safe.unsafeGlobal();
+ if (global == null) continue;
+ Command current = global.value();
+ if (current == null) continue;
+ validateRead(current);
+ }
for (Seekable seekable : context.keys())
{
@@ -402,8 +412,18 @@ public abstract class InMemoryCommandStore extends
CommandStore
{
case Key:
RoutableKey key = (RoutableKey) seekable;
- commandsForKey.put(key, commandsForKey((Key)
key).createSafeReference());
- timestampsForKey.put(key, timestampsForKey((Key)
key).createSafeReference());
+ switch (context.keyHistory())
+ {
+ case NONE:
+ continue;
+ case COMMANDS:
+ commandsForKey.put(key, commandsForKey((Key)
key).createSafeReference());
+ break;
+ case TIMESTAMPS:
+ timestampsForKey.put(key, timestampsForKey((Key)
key).createSafeReference());
+ break;
+ default: throw new
UnsupportedOperationException("Unknown key history: " + context.keyHistory());
+ }
break;
case Range:
// load range cfks here
@@ -633,7 +653,7 @@ public abstract class InMemoryCommandStore extends
CommandStore
public static class InMemorySafeStore extends
AbstractSafeCommandStore<InMemorySafeCommand, InMemorySafeTimestampsForKey,
InMemorySafeCommandsForKey>
{
private final InMemoryCommandStore commandStore;
- private final Map<TxnId, InMemorySafeCommand> commands;
+ protected final Map<TxnId, InMemorySafeCommand> commands;
private final Map<RoutableKey, InMemorySafeTimestampsForKey>
timestampsForKey;
private final Map<RoutableKey, InMemorySafeCommandsForKey>
commandsForKey;
private final RangesForEpoch ranges;
diff --git a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
index 267d5692..946f255a 100644
--- a/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
+++ b/accord-core/src/main/java/accord/impl/InMemorySafeCommand.java
@@ -18,8 +18,11 @@
package accord.impl;
+import java.util.Objects;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
+
import accord.impl.InMemoryCommandStore.GlobalCommand;
import accord.local.Command;
import accord.local.Listeners;
@@ -30,9 +33,11 @@ import static accord.utils.Invariants.illegalState;
public class InMemorySafeCommand extends SafeCommand implements
SafeState<Command>
{
+ private static final Object INIT = new Object();
private static final Supplier<GlobalCommand> INVALIDATED = () -> null;
private Supplier<GlobalCommand> lazy;
+ private Object original = INIT;
private GlobalCommand global;
public InMemorySafeCommand(TxnId txnId, GlobalCommand global)
@@ -54,10 +59,26 @@ public class InMemorySafeCommand extends SafeCommand
implements SafeState<Comman
return global.value();
}
+ public boolean isModified()
+ {
+ return original != INIT && !Objects.equals(original, global.value());
+ }
+
+ @Nullable
+ public Command original()
+ {
+ touch();
+ if (!isModified())
+ return global.value();
+ return (Command) original;
+ }
+
@Override
protected void set(Command update)
{
touch();
+ if (original == INIT)
+ original = global.value();
global.value(update);
}
@@ -83,6 +104,7 @@ public class InMemorySafeCommand extends SafeCommand
implements SafeState<Comman
public void invalidate()
{
lazy = INVALIDATED;
+ original = INIT;
}
@Override
@@ -107,4 +129,10 @@ public class InMemorySafeCommand extends SafeCommand
implements SafeState<Comman
touch();
return global;
}
+
+ @Nullable
+ GlobalCommand unsafeGlobal()
+ {
+ return global;
+ }
}
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java
b/accord-core/src/main/java/accord/local/Bootstrap.java
index d268812f..e478eb21 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -261,7 +261,10 @@ class Bootstrap
else
{
// TODO (expected): first check to see if we are still relevant
- node.agent().onFailedBootstrap("SafeToRead", state.ranges, ()
-> started(state, null), failure);
+ CommandStore store = CommandStore.current();
+ node.agent().onFailedBootstrap("SafeToRead", state.ranges, ()
-> {
+ store.maybeExecuteImmediately(() -> started(state, null));
+ }, failure);
}
}
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index f67c9663..6b9d71af 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -370,10 +370,10 @@ public abstract class Command implements CommonAttributes
case DefinitionErased:
case DefinitionUnknown:
case NoOp:
- Invariants.checkState(partialTxn == null);
+ Invariants.checkState(partialTxn == null, "partialTxn
is defined");
break;
case DefinitionKnown:
- Invariants.checkState(partialTxn != null);
+ Invariants.checkState(partialTxn != null, "partialTxn
is null");
break;
}
}
@@ -418,8 +418,8 @@ public abstract class Command implements CommonAttributes
{
default: throw new AssertionError("Unhandled Outcome: " +
known.outcome);
case Apply:
- Invariants.checkState(writes != null);
- Invariants.checkState(result != null);
+ Invariants.checkState(writes != null, "Writes is
null");
+ Invariants.checkState(result != null, "Result is
null");
break;
case Invalidated:
Invariants.checkState(validate.durability().isMaybeInvalidated());
@@ -427,8 +427,8 @@ public abstract class Command implements CommonAttributes
Invariants.checkState(validate.durability() != Local);
case Erased:
case WasApply:
- Invariants.checkState(writes == null);
- Invariants.checkState(result == null);
+ Invariants.checkState(writes == null, "Writes exist");
+ Invariants.checkState(result == null, "Results exist");
break;
}
}
@@ -840,15 +840,18 @@ public abstract class Command implements CommonAttributes
public static Truncated truncatedApply(CommonAttributes common,
SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result)
{
- // TODO (now) !!! uncomment and fix
-//
Invariants.checkArgument(!common.txnId().kind().awaitsOnlyDeps());
+ Invariants.checkArgument(!common.txnId().kind().awaitsOnlyDeps());
Durability durability = checkTruncatedApplyInvariants(common,
saveStatus, executeAt);
return validate(new Truncated(common.txnId(), saveStatus,
durability, common.route(), executeAt, EMPTY, writes, result));
}
- public static Truncated truncatedApply(CommonAttributes common,
SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result,
Timestamp dependencyExecutesAt)
+ public static Truncated truncatedApply(CommonAttributes common,
SaveStatus saveStatus, Timestamp executeAt, Writes writes, Result result,
@Nullable Timestamp dependencyExecutesAt)
{
- Invariants.checkArgument(common.txnId().kind().awaitsOnlyDeps());
+ if (!common.txnId().kind().awaitsOnlyDeps())
+ {
+ Invariants.checkState(dependencyExecutesAt == null);
+ return truncatedApply(common, saveStatus, executeAt, writes,
result);
+ }
Durability durability = checkTruncatedApplyInvariants(common,
saveStatus, executeAt);
return validate(new TruncatedAwaitsOnlyDeps(common.txnId(),
saveStatus, durability, common.route(), executeAt, EMPTY, writes, result,
dependencyExecutesAt));
}
@@ -926,6 +929,10 @@ public abstract class Command implements CommonAttributes
public static class TruncatedAwaitsOnlyDeps extends Truncated
{
+ /**
+ * TODO (desired): Ideally we would not store this differently than we
do for earlier states (where we encode in WaitingOn), but we also
+ * don't want to waste the space and complexity budget in earlier
phases. Consider how to improve.
+ */
@Nullable final Timestamp executesAtLeast;
public TruncatedAwaitsOnlyDeps(CommonAttributes commonAttributes,
SaveStatus saveStatus, @Nullable Timestamp executeAt, @Nullable Writes writes,
@Nullable Result result, @Nullable Timestamp executesAtLeast)
@@ -963,7 +970,7 @@ public abstract class Command implements CommonAttributes
public static class PreAccepted extends AbstractCommand
{
- private final Timestamp executeAt;
+ private final @Nullable Timestamp executeAt;
private final PartialTxn partialTxn;
private final @Nullable PartialDeps partialDeps;
@@ -993,7 +1000,7 @@ public abstract class Command implements CommonAttributes
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
PreAccepted that = (PreAccepted) o;
- return executeAt.equals(that.executeAt)
+ return Objects.equals(executeAt, that.executeAt)
&& Objects.equals(partialTxn, that.partialTxn)
&& Objects.equals(partialDeps, that.partialDeps);
}
@@ -1692,6 +1699,7 @@ public abstract class Command implements CommonAttributes
static Command.Accepted acceptInvalidated(Command command, Ballot ballot)
{
SaveStatus saveStatus = SaveStatus.get(Status.AcceptedInvalidate,
command.known());
+ // TODO (desired): This should be NonNull, but AcceptedInvalidated is
represented by Command.Accepted because there’s no acceptedOrCommitted register
in NotDefined
return validate(new Command.Accepted(command, saveStatus, ballot,
command.executeAt(), command.partialTxn(), null, ballot));
}
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index c79a2871..8e2f18eb 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -219,6 +219,12 @@ public abstract class CommandStore implements AgentExecutor
public abstract boolean inStore();
+ public void maybeExecuteImmediately(Runnable task)
+ {
+ if (inStore()) task.run();
+ else execute(task);
+ }
+
public abstract AsyncChain<Void> execute(PreLoadContext context,
Consumer<? super SafeCommandStore> consumer);
public abstract <T> AsyncChain<T> submit(PreLoadContext context,
Function<? super SafeCommandStore, T> apply);
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java
b/accord-core/src/main/java/accord/local/CommandsForKey.java
index 9e628b92..cba170c9 100644
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -662,12 +662,13 @@ public class CommandsForKey implements CommandsSummary
{
if (newStatus.compareTo(cur.status) <= 0)
{
+ // TODO (required): this validation is not safe for replay
where we may have to "catch up" commands that are behind CFK
// we can redundantly update the same transaction via
notifyWaitingOnCommit since updates to CFK may be asynchronous
// (particularly for invalidations). So we should expect
that we might already represent the latest information for this transaction.
// TODO (desired): consider only accepting this for
Invalidation
// TODO (desired): also clean-up special casing for
AcceptedInvalidate, which exists because it currently has no effect on the CFK
state
// so it could be any of Transitively Known, Historic,
PreAccept or Accept
- Invariants.checkState(cur.status == newStatus ||
next.status() == Status.AcceptedInvalidate);
+ Invariants.checkState(cur.status == newStatus ||
next.status() == Status.AcceptedInvalidate, "Attempted update to CommandsForKey
with %s, implying stale status; found %s", next, cur);
if (!newStatus.hasInfo ||
next.acceptedOrCommitted().equals(prev.acceptedOrCommitted()))
return this;
}
@@ -1220,7 +1221,7 @@ public class CommandsForKey implements CommandsSummary
Key key = this.key;
Keys keys = Keys.of(key);
safeStore = safeStore; // make unsafe for compiler to permit in
lambda
- safeStore.commandStore().execute(PreLoadContext.contextFor(txnId,
keys), safeStore0 -> {
+ safeStore.commandStore().execute(PreLoadContext.contextFor(txnId,
keys, KeyHistory.COMMANDS), safeStore0 -> {
SafeCommand safeCommand0 = safeStore0.get(txnId);
safeCommand0.initialise();
Command command = safeCommand0.current();
diff --git a/accord-core/src/main/java/accord/local/CommonAttributes.java
b/accord-core/src/main/java/accord/local/CommonAttributes.java
index 9c180a9c..46c38588 100644
--- a/accord-core/src/main/java/accord/local/CommonAttributes.java
+++ b/accord-core/src/main/java/accord/local/CommonAttributes.java
@@ -136,6 +136,12 @@ public interface CommonAttributes
return this;
}
+ public Mutable removePartialTxn()
+ {
+ this.partialTxn = null;
+ return this;
+ }
+
@Override
public PartialDeps partialDeps()
{
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 8fdf127f..5c8e2834 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -246,7 +246,7 @@ public abstract class SafeCommandStore
keys = updated.asCommitted().waitingOn.keys;
// TODO (required): consider how execution works for transactions
that await future deps and where the command store inherits additional keys in
execution epoch
Ranges ranges = ranges().allAt(updated.executeAt());
- PreLoadContext context = PreLoadContext.contextFor(txnId, keys);
+ PreLoadContext context = PreLoadContext.contextFor(txnId, keys,
COMMANDS);
// TODO (expected): execute immediately for any keys we already
have loaded, and save only those we haven't for later
if (canExecuteWith(context))
{
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java
b/accord-core/src/main/java/accord/local/SerializerSupport.java
index 7ca59e00..5384a7e3 100644
--- a/accord-core/src/main/java/accord/local/SerializerSupport.java
+++ b/accord-core/src/main/java/accord/local/SerializerSupport.java
@@ -19,6 +19,8 @@ package accord.local;
import java.util.Set;
+import javax.annotation.Nullable;
+
import com.google.common.collect.ImmutableSet;
import accord.api.Result;
@@ -28,6 +30,7 @@ import accord.local.CommandStores.RangesForEpoch;
import accord.local.CommonAttributes.Mutable;
import accord.messages.Accept;
import accord.messages.Apply;
+import accord.messages.ApplyThenWaitUntilApplied;
import accord.messages.BeginRecovery;
import accord.messages.Commit;
import accord.messages.MessageType;
@@ -38,18 +41,21 @@ import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.Invariants;
import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
+import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ;
import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ;
import static accord.messages.MessageType.PRE_ACCEPT_REQ;
import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
-import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
+import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
@@ -60,15 +66,40 @@ import static accord.utils.Invariants.illegalState;
@VisibleForImplementation
public class SerializerSupport
{
+ private static final Set<MessageType> PRE_ACCEPT_TYPES =
+ ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ,
PROPAGATE_PRE_ACCEPT_MSG);
+
+ private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES =
+ ImmutableSet.<MessageType>builder()
+ .addAll(PRE_ACCEPT_TYPES)
+ .add(COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ)
+ .build();
+
+ private static final Set<MessageType> PRE_ACCEPT_STABLE_TYPES =
+ ImmutableSet.<MessageType>builder()
+ .addAll(PRE_ACCEPT_COMMIT_TYPES)
+ .add(STABLE_FAST_PATH_REQ, STABLE_SLOW_PATH_REQ,
STABLE_MAXIMAL_REQ, PROPAGATE_STABLE_MSG)
+ .build();
+
+ private static final Set<MessageType> APPLY_TYPES =
+ ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG,
APPLY_THEN_WAIT_UNTIL_APPLIED_REQ);
+
+ private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
+ ImmutableSet.<MessageType>builder()
+ .addAll(PRE_ACCEPT_STABLE_TYPES)
+ .addAll(APPLY_TYPES)
+ .build();
+
/**
* Reconstructs Command from register values and protocol messages.
*/
- public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable
attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
+ public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable
attrs, SaveStatus status, Timestamp executeAt, @Nullable Timestamp
executesAtLeast, Ballot promised, Ballot accepted, WaitingOnProvider
waitingOnProvider, MessageProvider messageProvider)
{
switch (status.status)
{
case NotDefined:
- return Command.NotDefined.notDefined(attrs, promised);
+ return status == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
+ :
Command.NotDefined.notDefined(attrs, promised);
case PreAccepted:
return preAccepted(rangesForEpoch, attrs, executeAt, promised,
messageProvider);
case AcceptedInvalidate:
@@ -83,15 +114,12 @@ public class SerializerSupport
return executed(rangesForEpoch, attrs, status, executeAt,
promised, accepted, waitingOnProvider, messageProvider);
case Truncated:
case Invalidated:
- return truncated(attrs, status, executeAt, messageProvider);
+ return truncated(attrs, status, executeAt, executesAtLeast,
messageProvider);
default:
throw new IllegalStateException();
}
}
- private static final Set<MessageType> PRE_ACCEPT_TYPES =
- ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ,
PROPAGATE_PRE_ACCEPT_MSG);
-
private static Command.PreAccepted preAccepted(RangesForEpoch
rangesForEpoch, Mutable attrs, Timestamp executeAt, Ballot promised,
MessageProvider messageProvider)
{
Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
@@ -118,24 +146,12 @@ public class SerializerSupport
return Command.Accepted.accepted(attrs, status, executeAt, promised,
accepted);
}
- private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES =
- ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ,
PROPAGATE_PRE_ACCEPT_MSG, COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ);
-
- private static final Set<MessageType> PRE_ACCEPT_STABLE_TYPES =
- ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ,
PROPAGATE_PRE_ACCEPT_MSG,
- COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ,
STABLE_FAST_PATH_REQ, STABLE_SLOW_PATH_REQ, STABLE_MAXIMAL_REQ,
PROPAGATE_STABLE_MSG);
-
private static Command.Committed committed(RangesForEpoch rangesForEpoch,
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
{
attrs = extract(rangesForEpoch, status, accepted, messageProvider,
(attrs0, txn, deps, i1, i2) -> attrs0.partialTxn(txn).partialDeps(deps), attrs);
return Command.Committed.committed(attrs, status, executeAt, promised,
accepted, waitingOnProvider.provide(attrs.partialDeps()));
}
- private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
- ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ,
PROPAGATE_PRE_ACCEPT_MSG,
- COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ,
STABLE_MAXIMAL_REQ, STABLE_FAST_PATH_REQ, PROPAGATE_STABLE_MSG,
- APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ,
PROPAGATE_APPLY_MSG);
-
private static Command.Executed executed(RangesForEpoch rangesForEpoch,
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
{
return extract(rangesForEpoch, status, accepted, messageProvider,
(attrs0, txn, deps, writes, result) -> {
@@ -146,10 +162,7 @@ public class SerializerSupport
}, attrs);
}
- private static final Set<MessageType> APPLY_TYPES =
- ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ,
PROPAGATE_APPLY_MSG);
-
- private static Command.Truncated truncated(Mutable attrs, SaveStatus
status, Timestamp executeAt, MessageProvider messageProvider)
+ private static Command.Truncated truncated(Mutable attrs, SaveStatus
status, Timestamp executeAt, @Nullable Timestamp executesAtLeast,
MessageProvider messageProvider)
{
Writes writes = null;
Result result = null;
@@ -162,15 +175,15 @@ public class SerializerSupport
case TruncatedApplyWithDeps:
Set<MessageType> witnessed = messageProvider.test(APPLY_TYPES);
checkState(!witnessed.isEmpty());
- if (witnessed.contains(APPLY_MINIMAL_REQ))
+ if (witnessed.contains(APPLY_MAXIMAL_REQ))
{
- Apply apply = messageProvider.applyMinimal();
+ Apply apply = messageProvider.applyMaximal();
writes = apply.writes;
result = apply.result;
}
- if (witnessed.contains(APPLY_MAXIMAL_REQ))
+ else if (witnessed.contains(APPLY_MINIMAL_REQ))
{
- Apply apply = messageProvider.applyMaximal();
+ Apply apply = messageProvider.applyMinimal();
writes = apply.writes;
result = apply.result;
}
@@ -180,8 +193,18 @@ public class SerializerSupport
writes = propagate.writes;
result = propagate.result;
}
+ else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ))
+ {
+ ApplyThenWaitUntilApplied apply =
messageProvider.applyThenWaitUntilApplied();
+ writes = apply.writes;
+ result = apply.result;
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unhandled types:
" + witnessed);
+ }
case TruncatedApply:
- return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result);
+ return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result, executesAtLeast);
case ErasedOrInvalidated:
return Command.Truncated.erasedOrInvalidated(attrs.txnId(),
attrs.durability(), attrs.route());
case Erased:
@@ -231,6 +254,7 @@ public class SerializerSupport
case AcceptedInvalidate:
case Accepted:
case PreCommitted:
+ {
PartialTxn txn = null;
PartialDeps deps = null;
@@ -247,7 +271,7 @@ public class SerializerSupport
deps = slicePartialDeps(rangesForEpoch, accept);
}
return withContents.apply(param, txn, deps, null, null);
-
+ }
case Committed:
{
witnessed = messageProvider.test(PRE_ACCEPT_COMMIT_TYPES);
@@ -285,14 +309,25 @@ public class SerializerSupport
else
{
checkState(witnessed.contains(STABLE_SLOW_PATH_REQ),
"Unable to find STABLE_SLOW_PATH_REQ; witnessed %s", new
LoggedMessageProvider(messageProvider));
- if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
+ if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+ {
+ commit = messageProvider.commitMaximal();
+ }
+ else if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
{
commit = messageProvider.commitSlowPath();
}
+ else if (witnessed.contains(PRE_ACCEPT_REQ) ||
witnessed.contains(BEGIN_RECOVER_REQ) ||
witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
+ {
+ Commit slowPath = messageProvider.stableSlowPath();
+ Ranges ranges =
rangesForEpoch.allBetween(slowPath.txnId.epoch(), slowPath.executeAt.epoch());
+ PartialTxn txn =
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed,
messageProvider).slice(ranges, true);
+ PartialDeps deps = slowPath.partialDeps.slice(ranges);
+ return withContents.apply(param, txn, deps, null,
null);
+ }
else
{
- checkState(witnessed.contains(COMMIT_MAXIMAL_REQ),
"Unable to find COMMIT_MAXIMAL_REQ; witnessed %s", new
LoggedMessageProvider(messageProvider));
- commit = messageProvider.commitMaximal();
+ throw illegalState("Unable to find
COMMIT_SLOW_PATH_REQ; witnessed %s", new
LoggedMessageProvider(messageProvider));
}
}
@@ -309,14 +344,14 @@ public class SerializerSupport
Ranges ranges =
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
return withContents.apply(param, apply.txn.slice(ranges,
true), apply.deps.slice(ranges), apply.writes, apply.result);
}
- else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+ else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ))
{
- Propagate propagate = messageProvider.propagateApply();
- return sliceAndApply(rangesForEpoch, propagate,
withContents, param, propagate.writes, propagate.result);
+ ApplyThenWaitUntilApplied apply =
messageProvider.applyThenWaitUntilApplied();
+ Ranges ranges =
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
+ return withContents.apply(param, apply.txn.slice(ranges,
true), apply.deps.slice(ranges), apply.writes, apply.result);
}
- else
+ else if (witnessed.contains(APPLY_MINIMAL_REQ))
{
- checkState(witnessed.contains(APPLY_MINIMAL_REQ), "Unable
to find APPLY_MINIMAL_REQ; witnessed %s", new
LoggedMessageProvider(messageProvider));
Apply apply = messageProvider.applyMinimal();
Commit commit;
if (witnessed.contains(STABLE_MAXIMAL_REQ))
@@ -326,7 +361,18 @@ public class SerializerSupport
else if (witnessed.contains(PROPAGATE_STABLE_MSG))
{
Propagate propagate =
messageProvider.propagateStable();
- return withContents.apply(param, propagate.partialTxn,
propagate.stableDeps, apply.writes, apply.result);
+ var ranges = propagate.committedExecuteAt == null ?
rangesForEpoch.allAt(propagate.txnId) :
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
+ return withContents.apply(param,
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges),
apply.writes, apply.result);
+ }
+ else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+ {
+ Propagate propagate = messageProvider.propagateApply();
+ var ranges = propagate.committedExecuteAt == null ?
rangesForEpoch.allAt(propagate.txnId) :
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
+ return withContents.apply(param,
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges),
apply.writes, apply.result);
+ }
+ else if (witnessed.contains(COMMIT_MAXIMAL_REQ))
+ {
+ commit = messageProvider.commitMaximal();
}
else if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
{
@@ -336,6 +382,12 @@ public class SerializerSupport
{
commit = messageProvider.stableFastPath();
}
+ else if (witnessed.contains(PRE_ACCEPT_REQ) ||
witnessed.contains(BEGIN_RECOVER_REQ) ||
witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
+ {
+ PartialTxn txn =
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
+ Ranges ranges =
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
+ return withContents.apply(param, txn.slice(ranges,
true), apply.deps.slice(ranges), apply.writes, apply.result);
+ }
else
{
throw illegalState("Invalid state: insufficient stable
or commit messages found to reconstruct PreApplied or greater SaveStatus;
witnessed " + witnessed);
@@ -343,6 +395,36 @@ public class SerializerSupport
return sliceAndApply(rangesForEpoch, messageProvider,
witnessed, commit, withContents, param, apply.writes, apply.result);
}
+ else if (witnessed.contains(PROPAGATE_APPLY_MSG))
+ {
+ Propagate propagate = messageProvider.propagateApply();
+ Invariants.nonNull(propagate.partialTxn, "Unable to find
partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider));
+ Invariants.nonNull(propagate.stableDeps, "Unable to find
stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider));
+ return sliceAndApply(rangesForEpoch, propagate,
withContents, param, propagate.writes, propagate.result);
+ }
+ else if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
+ {
+ // once propgate runs locally it merges the local state
with the remote state, which may make this go from PRE_ACCEPT to PRE_APPLIED!
+ Propagate propagate = messageProvider.propagatePreAccept();
+ Invariants.nonNull(propagate.partialTxn, "Unable to find
partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider));
+ Invariants.nonNull(propagate.stableDeps, "Unable to find
stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider));
+
+ var ranges = propagate.committedExecuteAt == null ?
rangesForEpoch.allAt(propagate.txnId) :
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
+ return withContents.apply(param,
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges),
propagate.writes, propagate.result);
+ }
+ else if (witnessed.contains(PROPAGATE_OTHER_MSG))
+ {
+ // the txn/deps may have been erased, won't always be
here...
+ Propagate propagate = messageProvider.propagateOther();
+ var ranges = propagate.committedExecuteAt == null ?
rangesForEpoch.allAt(propagate.txnId) :
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
+ PartialTxn txn = propagate.partialTxn == null ? null :
propagate.partialTxn.slice(ranges, true);
+ PartialDeps deps = propagate.stableDeps == null ? null :
propagate.stableDeps.slice(ranges);
+ return withContents.apply(param, txn, deps,
propagate.writes, propagate.result);
+ }
+ else
+ {
+ throw illegalState("Unable to find messages that lead to
PreApplied state; txn_id=%s, witnessed %s", messageProvider.txnId(), new
LoggedMessageProvider(messageProvider));
+ }
}
case NotDefined:
@@ -406,6 +488,8 @@ public class SerializerSupport
PartialTxn preAcceptedPartialTxn =
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
if (partialTxn == null || partialTxn.keys().size() == 0)
partialTxn = preAcceptedPartialTxn;
else partialTxn = merge(preAcceptedPartialTxn, partialTxn);
+ if (partialTxn == null &&
witnessed.contains(COMMIT_MAXIMAL_REQ))
+ partialTxn = messageProvider.commitMaximal().partialTxn;
case StableWithTxnAndDeps:
case CommitWithTxn:
}
@@ -421,6 +505,7 @@ public class SerializerSupport
// TODO (required): randomised testing that we always restore the exact
same state
public interface MessageProvider
{
+ TxnId txnId();
Set<MessageType> test(Set<MessageType> messages);
Set<MessageType> all();
@@ -437,6 +522,7 @@ public class SerializerSupport
Commit commitMaximal();
Commit stableFastPath();
+ Commit stableSlowPath();
Commit stableMaximal();
@@ -447,6 +533,10 @@ public class SerializerSupport
Apply applyMaximal();
Propagate propagateApply();
+
+ Propagate propagateOther();
+
+ ApplyThenWaitUntilApplied applyThenWaitUntilApplied();
}
private static class LoggedMessageProvider
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java
b/accord-core/src/main/java/accord/messages/Propagate.java
index 49572c55..351c636e 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -513,6 +513,7 @@ public class Propagate implements EpochSupplier,
LocalRequest<Status.Known>
if (toEpoch >= committedExecuteAt.epoch())
return MessageType.PROPAGATE_APPLY_MSG;
case Committed:
+ case Stable:
return MessageType.PROPAGATE_STABLE_MSG;
case PreCommitted:
if (!achieved.definition.isKnown())
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java
b/accord-core/src/main/java/accord/primitives/Routables.java
index 9e7e53e7..b37bcc91 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -24,6 +24,8 @@ import accord.utils.*;
import net.nicoulaj.compilecommand.annotations.Inline;
import java.util.function.Predicate;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import static accord.utils.SortedArrays.Search.FLOOR;
@@ -46,6 +48,10 @@ public interface Routables<K extends Routable> extends
Iterable<K>
int size();
boolean isEmpty();
+ default Stream<K> stream()
+ {
+ return StreamSupport.stream(spliterator(), false);
+ }
boolean intersects(AbstractRanges ranges);
boolean intersects(AbstractKeys<?> keys);
default boolean intersects(Routables<?> routables)
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java
b/accord-core/src/main/java/accord/utils/Invariants.java
index d1061465..842ed5d3 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -50,6 +50,11 @@ public class Invariants
throw createIllegalState(msg);
}
+ public static IllegalStateException illegalState(String fmt, Object...
args)
+ {
+ return illegalState(format(fmt, args));
+ }
+
public static IllegalStateException illegalState()
{
throw illegalState(null);
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 47240516..447b80b0 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@@ -50,12 +51,15 @@ import accord.impl.MessageListener;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.Utils;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
import accord.verify.CompositeVerifier;
import accord.verify.ElleVerifier;
import accord.verify.StrictSerializabilityVerifier;
import accord.verify.Verifier;
+
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -540,10 +544,50 @@ public class BurnTest
}
@Test
- @Timeout(value = 3, unit = TimeUnit.MINUTES)
public void testOne()
{
- run(1L, 1000);
+ run(System.nanoTime());
+ }
+
+ private static void run(long seed)
+ {
+ Duration timeout = Duration.ofMinutes(3);
+ Runnable fn = () -> run(seed, 1000);
+ AsyncResult.Settable<?> promise = AsyncResults.settable();
+ Thread t = new Thread(() -> {
+ try
+ {
+ fn.run();
+ promise.setSuccess(null);
+ }
+ catch (Throwable e)
+ {
+ promise.setFailure(e);
+ }
+ });
+ t.setName("BurnTest with timeout");
+ t.setDaemon(true);
+ try
+ {
+ t.start();
+ AsyncChains.getBlocking(promise, timeout.toNanos(),
TimeUnit.NANOSECONDS);
+ }
+ catch (Throwable thrown)
+ {
+ Throwable cause = thrown;
+ if (cause instanceof ExecutionException)
+ cause = cause.getCause();
+ if (cause instanceof InterruptedException || cause instanceof
TimeoutException)
+ t.interrupt();
+ if (cause instanceof TimeoutException)
+ {
+ TimeoutException override = new TimeoutException("test did not
complete within " + timeout);
+ override.setStackTrace(new StackTraceElement[0]);
+ cause = override;
+ }
+ logger.error("Exception running burn test for seed {}:", seed, t);
+ throw SimulationException.wrap(seed, cause);
+ }
}
private static void run(long seed, int operations)
diff --git a/accord-core/src/test/java/accord/impl/MessageListener.java
b/accord-core/src/test/java/accord/impl/MessageListener.java
index 71bbc0ec..6be9f8de 100644
--- a/accord-core/src/test/java/accord/impl/MessageListener.java
+++ b/accord-core/src/test/java/accord/impl/MessageListener.java
@@ -28,6 +28,8 @@ import accord.messages.SimpleReply;
import accord.messages.TxnRequest;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.topology.Topology;
+
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
@@ -49,6 +51,7 @@ public interface MessageListener
void onMessage(NodeSink.Action action, Node.Id src, Node.Id to, long id,
Message message);
void onClientAction(ClientAction action, Node.Id from, TxnId id, Object
message);
+ void onTopologyChange(Topology topology);
static MessageListener get()
{
@@ -71,6 +74,12 @@ public interface MessageListener
{
}
+
+ @Override
+ public void onTopologyChange(Topology topology)
+ {
+
+ }
}
class DebugListener implements MessageListener
@@ -117,6 +126,16 @@ public interface MessageListener
}
}
+ private Topology previous = null;
+
+ @Override
+ public void onTopologyChange(Topology topology)
+ {
+ if (previous != null)
+ logger.debug("Topology Change {} -> {}", previous.epoch(),
topology.epoch());
+ previous = topology;
+ }
+
private static Object normalizeClientMessage(Object o)
{
if (o instanceof Throwable)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index b949c5ab..da5851f8 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -41,6 +41,7 @@ import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.BarrierType;
import accord.api.MessageSink;
import accord.api.Scheduler;
import accord.burn.BurnTestConfigurationService;
@@ -48,7 +49,12 @@ import accord.burn.TopologyUpdates;
import accord.burn.random.FrequentLargeRange;
import accord.config.LocalConfig;
import accord.config.MutableLocalConfig;
+import accord.coordinate.Barrier;
import accord.coordinate.CoordinationAdapter;
+import accord.coordinate.Exhausted;
+import accord.coordinate.Invalidated;
+import accord.coordinate.Preempted;
+import accord.coordinate.Timeout;
import accord.impl.CoordinateDurabilityScheduling;
import accord.impl.MessageListener;
import accord.impl.PrefixedIntHashKey;
@@ -61,16 +67,20 @@ import accord.local.Node.Id;
import accord.local.Node;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
-import accord.messages.LocalRequest;
import accord.messages.Message;
import accord.messages.MessageType;
import accord.messages.Reply;
import accord.messages.Request;
import accord.messages.SafeCallback;
+import accord.primitives.Keys;
+import accord.primitives.Range;
import accord.primitives.Ranges;
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.topology.Topology;
import accord.topology.TopologyRandomizer;
+import accord.utils.Gens;
+import accord.utils.Invariants;
import accord.utils.RandomSource;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
@@ -79,6 +89,9 @@ import static
accord.impl.basic.Cluster.OverrideLinksKind.NONE;
import static accord.impl.basic.Cluster.OverrideLinksKind.RANDOM_BIDIRECTIONAL;
import static accord.impl.basic.NodeSink.Action.DELIVER;
import static accord.impl.basic.NodeSink.Action.DROP;
+import static accord.utils.AccordGens.keysInsideRanges;
+import static accord.utils.AccordGens.rangeInsideRange;
+import static accord.utils.Gens.mixedDistribution;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -127,6 +140,7 @@ public class Cluster implements Scheduler
final RandomSource random;
final LinkConfig linkConfig;
final Function<Id, Node> lookup;
+ final Function<Id, Journal> journalLookup;
final PendingQueue pending;
final Runnable checkFailures;
final List<Runnable> onDone = new ArrayList<>();
@@ -137,13 +151,14 @@ public class Cluster implements Scheduler
int recurring;
BiFunction<Id, Id, Link> links;
- public Cluster(RandomSource random, MessageListener messageListener,
Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id,
Node> lookup, IntSupplier rf, Consumer<Packet> responseSink)
+ public Cluster(RandomSource random, MessageListener messageListener,
Supplier<PendingQueue> queueSupplier, Runnable checkFailures, Function<Id,
Node> lookup, Function<Id, Journal> journalLookup, IntSupplier rf,
Consumer<Packet> responseSink)
{
this.random = random;
this.messageListener = messageListener;
this.pending = queueSupplier.get();
this.checkFailures = checkFailures;
this.lookup = lookup;
+ this.journalLookup = journalLookup;
this.responseSink = responseSink;
this.linkConfig = defaultLinkConfig(random, rf);
this.links = linkConfig.defaultLinks;
@@ -216,7 +231,7 @@ public class Cluster implements Scheduler
else callback.success(deliver.src, reply);
}
}
- else on.receive((Request) deliver.message, deliver.src, deliver);
+ else journalLookup.apply(deliver.dst).handle((Request)
deliver.message, deliver.src, deliver);
}
else
{
@@ -269,10 +284,11 @@ public class Cluster implements Scheduler
Topology topology = topologyFactory.toTopology(nodes);
Map<Id, Node> nodeMap = new LinkedHashMap<>();
Map<Id, AgentExecutor> executorMap = new LinkedHashMap<>();
+ Map<Id, Journal> journalMap = new LinkedHashMap<>();
try
{
RandomSource random = randomSupplier.get();
- Cluster sinks = new Cluster(randomSupplier.get(), messageListener,
queueSupplier, checkFailures, nodeMap::get, () -> topologyFactory.rf,
responseSink);
+ Cluster sinks = new Cluster(randomSupplier.get(), messageListener,
queueSupplier, checkFailures, nodeMap::get, journalMap::get, () ->
topologyFactory.rf, responseSink);
TopologyUpdates topologyUpdates = new
TopologyUpdates(executorMap::get);
TopologyRandomizer.Listener schemaApply = t -> {
for (Node node : nodeMap.values())
@@ -280,9 +296,11 @@ public class Cluster implements Scheduler
ListStore store = (ListStore)
node.commandStores().dataStore();
store.onTopologyUpdate(node, t);
}
+ messageListener.onTopologyChange(t);
};
TopologyRandomizer configRandomizer = new
TopologyRandomizer(randomSupplier, topology, topologyUpdates, nodeMap::get,
schemaApply);
List<CoordinateDurabilityScheduling> durabilityScheduling = new
ArrayList<>();
+ List<Service> services = new ArrayList<>();
for (Id id : nodes)
{
MessageSink messageSink = sinks.create(id,
randomSupplier.get());
@@ -291,13 +309,15 @@ public class Cluster implements Scheduler
BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges)
-> configRandomizer.onStale(id, sinceAtLeast, ranges);
AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id,
onStale);
executorMap.put(id, nodeExecutor);
+ Journal journal = new Journal(messageListener);
+ journalMap.put(id, journal);
BurnTestConfigurationService configService = new
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology,
nodeMap::get, topologyUpdates);
- BooleanSupplier isLoadedCheck =
random.biasedUniformBools(0.5f);
- Node node = new Node(id, messageSink, LocalRequest::process,
configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS,
nowSupplier),
+ BooleanSupplier isLoadedCheck =
Gens.supplier(Gens.bools().mixedDistribution().next(random), random);
+ Node node = new Node(id, messageSink, journal, configService,
nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
() -> new ListStore(id), new
ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()),
nodeExecutor.agent(),
randomSupplier.get(), sinks,
SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new,
DelayedCommandStores.factory(sinks.pending, isLoadedCheck), new
CoordinationAdapter.DefaultFactory(),
+ SimpleProgressLog::new,
DelayedCommandStores.factory(sinks.pending, isLoadedCheck, journal), new
CoordinationAdapter.DefaultFactory(),
localConfig);
CoordinateDurabilityScheduling durability = new
CoordinateDurabilityScheduling(node);
// TODO (desired): randomise
@@ -306,6 +326,7 @@ public class Cluster implements Scheduler
durabilityScheduling.add(durability);
nodeMap.put(id, node);
durabilityScheduling.add(new
CoordinateDurabilityScheduling(node));
+ services.add(new BarrierService(node, randomSupplier.get()));
}
Runnable updateDurabilityRate;
@@ -328,6 +349,7 @@ public class Cluster implements Scheduler
schemaApply.onUpdate(topology);
// startup
+ journalMap.entrySet().forEach(e ->
e.getValue().start(nodeMap.get(e.getKey())));
AsyncResult<?> startup =
AsyncChains.reduce(nodeMap.values().stream().map(Node::unsafeStart).collect(toList()),
(a, b) -> null).beginAsResult();
while (sinks.processPending());
Assertions.assertTrue(startup.isDone());
@@ -341,10 +363,12 @@ public class Cluster implements Scheduler
Scheduled reconfigure =
sinks.recurring(configRandomizer::maybeUpdateTopology, 1, SECONDS);
durabilityScheduling.forEach(CoordinateDurabilityScheduling::start);
+ services.forEach(Service::start);
noMoreWorkSignal.accept(() -> {
reconfigure.cancel();
durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop);
+ services.forEach(Service::close);
});
readySignal.accept(nodeMap);
@@ -357,6 +381,7 @@ public class Cluster implements Scheduler
chaos.cancel();
reconfigure.cancel();
durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop);
+ services.forEach(Service::close);
sinks.links = sinks.linkConfig.defaultLinks;
// give progress log et al a chance to finish
@@ -379,10 +404,117 @@ public class Cluster implements Scheduler
}
finally
{
+ journalMap.values().forEach(Journal::shutdown);
nodeMap.values().forEach(Node::shutdown);
}
}
+ private interface Service extends AutoCloseable
+ {
+ void start();
+ @Override
+ void close();
+ }
+
+ private static abstract class AbstractService implements Service, Runnable
+ {
+ protected final Node node;
+ protected final RandomSource rs;
+ private Scheduled scheduled;
+
+ protected AbstractService(Node node, RandomSource rs)
+ {
+ this.node = node;
+ this.rs = rs;
+ }
+
+ @Override
+ public void start()
+ {
+ Invariants.checkState(scheduled == null, "Start already
called...");
+ this.scheduled = node.scheduler().recurring(this, 1, SECONDS);
+ }
+
+ protected abstract void doRun() throws Exception;
+
+ @Override
+ public final void run()
+ {
+ try
+ {
+ doRun();
+ }
+ catch (Throwable t)
+ {
+ node.agent().onUncaughtException(t);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ if (scheduled != null)
+ {
+ scheduled.cancel();
+ scheduled = null;
+ }
+ }
+ }
+
+ private static class BarrierService extends AbstractService
+ {
+ private final Supplier<BarrierType> typeSupplier;
+ private final Supplier<Boolean> includeRangeSupplier;
+ private final Supplier<Boolean> wholeOrPartialSupplier;
+
+ private BarrierService(Node node, RandomSource rs)
+ {
+ super(node, rs);
+ this.typeSupplier =
mixedDistribution(BarrierType.values()).next(rs).asSupplier(rs);
+ this.includeRangeSupplier =
Gens.bools().mixedDistribution().next(rs).asSupplier(rs);
+ this.wholeOrPartialSupplier =
Gens.bools().mixedDistribution().next(rs).asSupplier(rs);
+ }
+
+ @Override
+ public void doRun()
+ {
+ Topology current = node.topology().current();
+ Ranges ranges = current.rangesForNode(node.id());
+ if (ranges.isEmpty())
+ return;
+ BarrierType type = typeSupplier.get();
+ if (type == BarrierType.local)
+ {
+ run(node, Keys.of(keysInsideRanges(ranges).next(rs)),
current.epoch(), type);
+ }
+ else
+ {
+ List<Range> subset = new ArrayList<>();
+ for (Range range : ranges)
+ {
+ if (includeRangeSupplier.get())
+ subset.add(wholeOrPartialSupplier.get() ? range :
rangeInsideRange(range).next(rs));
+ }
+ if (subset.isEmpty())
+ return;
+ run(node, Ranges.of(subset.toArray(Range[]::new)),
current.epoch(), type);
+ }
+ }
+
+ private <S extends Seekables<?, ?>> void run(Node node, S
keysOrRanges, long epoch, BarrierType type)
+ {
+ Barrier.barrier(node, keysOrRanges, epoch, type).begin((s, f) -> {
+ if (f != null)
+ {
+ // ignore specific errors
+ if (f instanceof Invalidated || f instanceof Timeout || f
instanceof Preempted || f instanceof Exhausted)
+ return;
+ node.agent().onUncaughtException(f);
+ }
+ });
+ }
+ }
+
private static BiFunction<Id, Id, Link> partition(List<Id> nodes,
RandomSource random, int rf, BiFunction<Id, Id, Link> up)
{
Collections.shuffle(nodes, random.asJdkRandom());
diff --git
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index b33edcc0..ea97c5e0 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -19,6 +19,8 @@
package accord.impl.basic;
import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.function.BooleanSupplier;
@@ -31,16 +33,26 @@ import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.impl.InMemoryCommandStore;
import accord.impl.InMemoryCommandStores;
+import accord.impl.InMemorySafeCommand;
+import accord.impl.InMemorySafeCommandsForKey;
+import accord.impl.InMemorySafeTimestampsForKey;
import accord.impl.PrefixedIntHashKey;
import accord.impl.basic.TaskExecutorService.Task;
+import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores;
+import accord.local.CommonAttributes;
import accord.local.Node;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
+import accord.local.SerializerSupport;
import accord.local.ShardDistributor;
+import accord.messages.Message;
import accord.primitives.Range;
+import accord.primitives.RoutableKey;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
import accord.topology.Topology;
import accord.utils.Invariants;
import accord.utils.RandomSource;
@@ -49,15 +61,15 @@ import accord.utils.async.AsyncChains;
public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
{
- private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore
store, RandomSource random, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService
executorService, BooleanSupplier isLoadedCheck)
+ private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore
store, RandomSource random, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService
executorService, BooleanSupplier isLoadedCheck, Journal journal)
{
- super(time, agent, store, random, shardDistributor,
progressLogFactory, DelayedCommandStore.factory(executorService,
isLoadedCheck));
+ super(time, agent, store, random, shardDistributor,
progressLogFactory, DelayedCommandStore.factory(executorService, isLoadedCheck,
journal));
}
- public static CommandStores.Factory factory(PendingQueue pending,
BooleanSupplier isLoadedCheck)
+ public static CommandStores.Factory factory(PendingQueue pending,
BooleanSupplier isLoadedCheck, Journal journal)
{
return (time, agent, store, random, shardDistributor,
progressLogFactory) ->
- new DelayedCommandStores(time, agent, store, random,
shardDistributor, progressLogFactory, new
SimulatedDelayedExecutorService(pending, agent), isLoadedCheck);
+ new DelayedCommandStores(time, agent, store, random,
shardDistributor, progressLogFactory, new
SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal);
}
@Override
@@ -101,12 +113,56 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
private final SimulatedDelayedExecutorService executor;
private final Queue<Task<?>> pending = new LinkedList<>();
private final BooleanSupplier isLoadedCheck;
+ private final Journal journal;
- public DelayedCommandStore(int id, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder
epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier
isLoadedCheck)
+ public DelayedCommandStore(int id, NodeTimeService time, Agent agent,
DataStore store, ProgressLog.Factory progressLogFactory, EpochUpdateHolder
epochUpdateHolder, SimulatedDelayedExecutorService executor, BooleanSupplier
isLoadedCheck, Journal journal)
{
super(id, time, agent, store, progressLogFactory,
epochUpdateHolder);
this.executor = executor;
this.isLoadedCheck = isLoadedCheck;
+ this.journal = journal;
+ }
+
+ @Override
+ protected void validateRead(Command current)
+ {
+ // "loading" the command doesn't make sense as we don't "store"
the command...
+ if (current.txnId().kind() == Txn.Kind.EphemeralRead)
+ return;
+ //TODO (correctness): these type of txn must be durable but
currently they are not... should make sure this is plugged into the C* journal
properly for reply
+ if (current.txnId().kind() == Txn.Kind.LocalOnly)
+ return;
+ Command.WaitingOn waitingOn = null;
+ if (current.isStable() && !current.isTruncated())
+ waitingOn = current.asCommitted().waitingOn;
+ SerializerSupport.MessageProvider messages =
journal.makeMessageProvider(current.txnId());
+ Command.WaitingOn finalWaitingOn = waitingOn;
+ CommonAttributes.Mutable mutable = current.mutable();
+ mutable.partialDeps(null).removePartialTxn();
+ Command reconstructed;
+ try
+ {
+ reconstructed =
SerializerSupport.reconstruct(unsafeRangesForEpoch(), mutable,
current.saveStatus(), current.executeAt(),
current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null,
current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn,
messages);
+ }
+ catch (IllegalStateException t)
+ {
+ //TODO (correctness): journal doesn’t guarantee we pick the
same records we used to state transition
+ // Journal stores a list of messages it saw in some order it
defines, but when reconstructing a command we don't actually know what messages
were used, this could
+ // lead to a case where deps mismatch, so ignoring this for now
+ if (t.getMessage() != null && t.getMessage().startsWith("Deps
do not match; expected"))
+ return;
+ throw t;
+ }
+ //TODO (correctness): journal doesn’t guarantee we pick the same
records we used to state transition
+ if (current.partialDeps() != null &&
!current.partialDeps().rangeDeps.equals(reconstructed.partialDeps().rangeDeps))
+ return;
+ // for some reasons scope doesn't alaways match, this might be due
to journal... what sucks is that this can also be a bug in the extract, so its
+ // hard to figure out what happened.
+ if (current.partialDeps() != null &&
!current.partialDeps().equals(reconstructed.partialDeps()))
+ return;
+ if (current.isCommitted() && !current.isTruncated() &&
!Objects.equals(current.asCommitted().waitingOn(),
reconstructed.asCommitted().waitingOn()))
+ return;
+// Invariants.checkState(current.equals(reconstructed), "Commands
did not match: expected %s, given %s", current, reconstructed);
}
@Override
@@ -115,9 +171,9 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
return isLoadedCheck.getAsBoolean();
}
- private static CommandStore.Factory
factory(SimulatedDelayedExecutorService executor, BooleanSupplier isLoadedCheck)
+ private static CommandStore.Factory
factory(SimulatedDelayedExecutorService executor, BooleanSupplier
isLoadedCheck, Journal journal)
{
- return (id, time, agent, store, progressLogFactory,
rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store,
progressLogFactory, rangesForEpoch, executor, isLoadedCheck);
+ return (id, time, agent, store, progressLogFactory,
rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store,
progressLogFactory, rangesForEpoch, executor, isLoadedCheck, journal);
}
@Override
@@ -190,5 +246,42 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
{
}
+
+ @Override
+ protected InMemorySafeStore createSafeStore(PreLoadContext context,
RangesForEpoch ranges, Map<TxnId, InMemorySafeCommand> commands,
Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey,
Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys)
+ {
+ return new DelayedSafeStore(this, ranges, context, commands,
timestampsForKey, commandsForKeys);
+ }
+ }
+
+ public static class DelayedSafeStore extends
InMemoryCommandStore.InMemorySafeStore
+ {
+ private final DelayedCommandStore commandStore;
+ public DelayedSafeStore(DelayedCommandStore commandStore,
RangesForEpoch ranges, PreLoadContext context, Map<TxnId, InMemorySafeCommand>
commands, Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey,
Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey)
+ {
+ super(commandStore, ranges, context, commands, timestampsForKey,
commandsForKey);
+ this.commandStore = commandStore;
+ }
+
+ @Override
+ public void postExecute()
+ {
+ if (context instanceof Message)
+ {
+ Message m = (Message) context;
+ if (m.type() != null && !m.type().hasSideEffects())
+ {
+ // double check there are no modifications
+ commands.entrySet().forEach(e -> {
+ InMemorySafeCommand safe = e.getValue();
+ if (!safe.isModified()) return;
+ commandStore.validateRead(safe.current());
+ Command original = safe.original();
+ if (original != null)
+ commandStore.validateRead(original);
+ });
+ }
+ }
+ }
}
}
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java
b/accord-core/src/test/java/accord/impl/basic/Journal.java
new file mode 100644
index 00000000..7c0217b5
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import accord.impl.MessageListener;
+import accord.local.Node;
+import accord.local.SerializerSupport;
+import accord.messages.AbstractEpochRequest;
+import accord.messages.Accept;
+import accord.messages.Apply;
+import accord.messages.ApplyThenWaitUntilApplied;
+import accord.messages.BeginRecovery;
+import accord.messages.Commit;
+import accord.messages.LocalRequest;
+import accord.messages.Message;
+import accord.messages.MessageType;
+import accord.messages.PreAccept;
+import accord.messages.Propagate;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.messages.TxnRequest;
+import accord.primitives.Ballot;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.LongArrayList;
+
+import static accord.messages.MessageType.ACCEPT_INVALIDATE_REQ;
+import static accord.messages.MessageType.ACCEPT_REQ;
+import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
+import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
+import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ;
+import static accord.messages.MessageType.BEGIN_INVALIDATE_REQ;
+import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
+import static accord.messages.MessageType.COMMIT_INVALIDATE_REQ;
+import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
+import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ;
+import static accord.messages.MessageType.INFORM_DURABLE_REQ;
+import static accord.messages.MessageType.INFORM_OF_TXN_REQ;
+import static accord.messages.MessageType.PRE_ACCEPT_REQ;
+import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
+import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
+import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
+import static accord.messages.MessageType.SET_GLOBALLY_DURABLE_REQ;
+import static accord.messages.MessageType.SET_SHARD_DURABLE_REQ;
+import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
+import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
+import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
+
+public class Journal implements LocalRequest.Handler, Runnable
+{
+ private static final TxnIdProvider EPOCH = msg ->
((AbstractEpochRequest<?>) msg).txnId;
+ private static final TxnIdProvider TXN = msg -> ((TxnRequest<?>)
msg).txnId;
+ private static final TxnIdProvider LOCAL = msg -> ((LocalRequest<?>)
msg).primaryTxnId();
+ private static final TxnIdProvider INVL = msg -> ((Commit.Invalidate)
msg).primaryTxnId();
+ private static final Map<MessageType, TxnIdProvider> typeToProvider =
ImmutableMap.<MessageType, TxnIdProvider>builder()
+
.put(PRE_ACCEPT_REQ, TXN)
+
.put(ACCEPT_REQ, TXN)
+
.put(ACCEPT_INVALIDATE_REQ, EPOCH)
+
.put(COMMIT_SLOW_PATH_REQ, TXN)
+
.put(COMMIT_MAXIMAL_REQ, TXN)
+
.put(STABLE_FAST_PATH_REQ, TXN)
+
.put(STABLE_SLOW_PATH_REQ, TXN)
+
.put(STABLE_MAXIMAL_REQ, TXN)
+
.put(COMMIT_INVALIDATE_REQ, INVL)
+
.put(APPLY_MINIMAL_REQ, TXN)
+
.put(APPLY_MAXIMAL_REQ, TXN)
+
.put(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, EPOCH)
+
.put(BEGIN_RECOVER_REQ, TXN)
+
.put(BEGIN_INVALIDATE_REQ, EPOCH)
+
.put(INFORM_OF_TXN_REQ, EPOCH)
+
.put(INFORM_DURABLE_REQ, TXN)
+
.put(SET_SHARD_DURABLE_REQ, EPOCH)
+
.put(SET_GLOBALLY_DURABLE_REQ, EPOCH)
+
.put(PROPAGATE_PRE_ACCEPT_MSG, LOCAL)
+
.put(PROPAGATE_STABLE_MSG, LOCAL)
+
.put(PROPAGATE_APPLY_MSG, LOCAL)
+
.put(PROPAGATE_OTHER_MSG, LOCAL)
+
.build();
+
+ private final Queue<RequestContext> unframedRequests = new ArrayDeque<>();
+ private final LongArrayList waitForEpochs = new LongArrayList();
+ private final Long2ObjectHashMap<ArrayList<RequestContext>>
delayedRequests = new Long2ObjectHashMap<>();
+ private final Map<TxnId, Map<MessageType, Message>> writes = new
HashMap<>();
+ private final MessageListener messageListener;
+ private Node node;
+
+ public Journal(MessageListener messageListener)
+ {
+ this.messageListener = messageListener;
+ }
+
+ public void start(Node node)
+ {
+ this.node = node;
+ node.scheduler().recurring(this, 1, TimeUnit.MILLISECONDS);
+ }
+
+ public void shutdown()
+ {
+ this.node = null;
+ }
+
+ @Override
+ public void handle(LocalRequest<?> message, Node node)
+ {
+ messageListener.onMessage(NodeSink.Action.DELIVER, node.id(),
node.id(), -1, message);
+ if (message.type().hasSideEffects())
+ {
+ // enqueue
+ unframedRequests.add(new RequestContext(message, () ->
node.scheduler().now(() -> message.process(node))));
+ return;
+ }
+ message.process(node);
+ }
+
+ public void handle(Request request, Node.Id from, ReplyContext
replyContext)
+ {
+ if (request.type() != null && request.type().hasSideEffects())
+ {
+ // enqueue
+ unframedRequests.add(new RequestContext(request, () ->
node.receive(request, from, replyContext)));
+ return;
+ }
+ node.receive(request, from, replyContext);
+ }
+
+ private void save(Message request)
+ {
+ MessageType type = request.type();
+ TxnIdProvider provider = typeToProvider.get(type);
+ Invariants.nonNull(provider, "Unknown type %s: %s", type, request);
+ TxnId txnId = provider.txnId(request);
+ writes.computeIfAbsent(txnId, ignore -> new Testing()).put(type,
request);
+ }
+
+ public SerializerSupport.MessageProvider makeMessageProvider(TxnId txnId)
+ {
+ return new MessageProvider(txnId, writes.getOrDefault(txnId,
Map.of()));
+ }
+
+ private static class Testing extends LinkedHashMap<MessageType, Message>
+ {
+ public Map<MessageType, List<Message>> history()
+ {
+ LinkedHashMap<MessageType, List<Message>> history = new
LinkedHashMap<>();
+ for (MessageType k : keySet())
+ {
+ Object current = super.get(k);
+ history.put(k, current instanceof List ? (List<Message>)
current : Collections.singletonList((Message) current));
+ }
+ return history;
+ }
+
+ @Override
+ public Message get(Object key)
+ {
+ Object current = super.get(key);
+ if (current == null || current instanceof Message)
+ return (Message) current;
+ List<Message> messages = (List<Message>) current;
+ return messages.get(messages.size() - 1);
+ }
+
+ @Override
+ public Message put(MessageType key, Message value)
+ {
+ Object current = super.get(key);
+ if (current == null)
+ return super.put(key, value);
+ else if (current instanceof List)
+ {
+ List<Message> list = (List<Message>) current;
+ list.add(value);
+ return list.get(list.size() - 2);
+ }
+ else
+ {
+ List<Message> messages = new ArrayList<>();
+ messages.add((Message) current);
+ messages.add(value);
+ super.put(key, value);
+ return (Message) current;
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ if (this.node == null)
+ return;
+ try
+ {
+ doRun();
+ }
+ catch (Throwable t)
+ {
+ node.agent().onUncaughtException(t);
+ }
+ }
+
+ private void doRun()
+ {
+ ArrayList<RequestContext> requests = null;
+ // check to see if any pending epochs are in
+ waitForEpochs.sort(null);
+ for (int i = 0; i < waitForEpochs.size(); i++)
+ {
+ long waitForEpoch = waitForEpochs.getLong(i);
+ if (!node.topology().hasEpoch(waitForEpoch))
+ break;
+ List<RequestContext> delayed =
delayedRequests.remove(waitForEpoch);
+ if (null == requests) requests = new ArrayList<>(delayed.size());
+ requests.addAll(delayed);
+ }
+ waitForEpochs.removeIfLong(epoch ->
!delayedRequests.containsKey(epoch));
+
+ // for anything queued, put into the pending epochs or schedule
+ RequestContext request;
+ while (null != (request = unframedRequests.poll()))
+ {
+ long waitForEpoch = request.waitForEpoch;
+ if (waitForEpoch != 0 && !node.topology().hasEpoch(waitForEpoch))
+ {
+ delayedRequests.computeIfAbsent(waitForEpoch, ignore -> new
ArrayList<>()).add(request);
+ if (!waitForEpochs.containsLong(waitForEpoch))
+ waitForEpochs.addLong(waitForEpoch);
+ }
+ else
+ {
+ if (null == requests) requests = new ArrayList<>();
+ requests.add(request);
+ }
+ }
+
+ // schedule
+ if (requests != null)
+ {
+ requests.forEach(r -> save(r.message)); // save in batches to
simulate journal more...
+ requests.forEach(Runnable::run);
+ }
+ }
+
+ @FunctionalInterface
+ interface TxnIdProvider
+ {
+ TxnId txnId(Message message);
+ }
+
+ private static class RequestContext implements Runnable
+ {
+ final long waitForEpoch;
+ final Message message;
+ final Runnable fn;
+
+ protected RequestContext(Request request, Runnable fn)
+ {
+ this.waitForEpoch = request.waitForEpoch();
+ this.message = request;
+ this.fn = fn;
+ }
+
+ @Override
+ public void run()
+ {
+ fn.run();
+ }
+ }
+
+ public static class MessageProvider implements
SerializerSupport.MessageProvider
+ {
+ public final TxnId txnId;
+ private final Map<MessageType, Message> writes;
+
+ public MessageProvider(TxnId txnId, Map<MessageType, Message> writes)
+ {
+ this.txnId = txnId;
+ this.writes = writes;
+ }
+
+ @Override
+ public TxnId txnId()
+ {
+ return txnId;
+ }
+
+ @Override
+ public Set<MessageType> test(Set<MessageType> messages)
+ {
+ return Sets.intersection(writes.keySet(), messages);
+ }
+
+ @Override
+ public Set<MessageType> all()
+ {
+ return writes.keySet();
+ }
+
+ public Map<MessageType, Message> allMessages()
+ {
+ var all = all();
+ Map<MessageType, Message> map =
Maps.newHashMapWithExpectedSize(all.size());
+ for (MessageType messageType : all)
+ map.put(messageType, get(messageType));
+ return map;
+ }
+
+ public <T extends Message> T get(MessageType type)
+ {
+ return (T) writes.get(type);
+ }
+
+ @Override
+ public PreAccept preAccept()
+ {
+ return get(PRE_ACCEPT_REQ);
+ }
+
+ @Override
+ public BeginRecovery beginRecover()
+ {
+ return get(BEGIN_RECOVER_REQ);
+ }
+
+ @Override
+ public Propagate propagatePreAccept()
+ {
+ return get(PROPAGATE_PRE_ACCEPT_MSG);
+ }
+
+ @Override
+ public Accept accept(Ballot ballot)
+ {
+ return get(ACCEPT_REQ);
+ }
+
+ @Override
+ public Commit commitSlowPath()
+ {
+ return get(COMMIT_SLOW_PATH_REQ);
+ }
+
+ @Override
+ public Commit commitMaximal()
+ {
+ return get(COMMIT_MAXIMAL_REQ);
+ }
+
+ @Override
+ public Commit stableFastPath()
+ {
+ return get(STABLE_FAST_PATH_REQ);
+ }
+
+ @Override
+ public Commit stableSlowPath()
+ {
+ return get(STABLE_SLOW_PATH_REQ);
+ }
+
+ @Override
+ public Commit stableMaximal()
+ {
+ return get(STABLE_MAXIMAL_REQ);
+ }
+
+ @Override
+ public Propagate propagateStable()
+ {
+ return get(PROPAGATE_STABLE_MSG);
+ }
+
+ @Override
+ public Apply applyMinimal()
+ {
+ return get(APPLY_MINIMAL_REQ);
+ }
+
+ @Override
+ public Apply applyMaximal()
+ {
+ return get(APPLY_MAXIMAL_REQ);
+ }
+
+ @Override
+ public Propagate propagateApply()
+ {
+ return get(PROPAGATE_APPLY_MSG);
+ }
+
+ @Override
+ public Propagate propagateOther()
+ {
+ return get(PROPAGATE_OTHER_MSG);
+ }
+
+ @Override
+ public ApplyThenWaitUntilApplied applyThenWaitUntilApplied()
+ {
+ return get(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ);
+ }
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java
b/accord-core/src/test/java/accord/impl/list/ListRead.java
index d425bd34..d6ce5f22 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -19,6 +19,7 @@
package accord.impl.list;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Function;
import accord.local.SafeCommandStore;
@@ -107,6 +108,23 @@ public class ListRead implements Read
return new ListRead(executor, isEphemeralRead, ((Seekables)
userReadKeys).with(((ListRead)other).userReadKeys),
((Seekables)keys).with(((ListRead)other).keys));
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ListRead listRead = (ListRead) o;
+ return isEphemeralRead == listRead.isEphemeralRead
+ && Objects.equals(userReadKeys, listRead.userReadKeys)
+ && Objects.equals(keys, listRead.keys);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public String toString()
{
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java
b/accord-core/src/test/java/accord/impl/list/ListResult.java
index 5e8cb634..c9da1719 100644
--- a/accord-core/src/test/java/accord/impl/list/ListResult.java
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -19,6 +19,7 @@
package accord.impl.list;
import java.util.Arrays;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -113,6 +114,40 @@ public class ListResult implements Result, Reply
return status;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ListResult that = (ListResult) o;
+ return requestId == that.requestId
+ && Objects.equals(client, that.client)
+ && Objects.equals(txnId, that.txnId)
+ && Objects.equals(readKeys, that.readKeys)
+ && Objects.equals(responseKeys, that.responseKeys)
+ && equals(read, that.read)
+ && Objects.equals(update, that.update)
+ && status == that.status;
+ }
+
+ private static boolean equals(int[][] a, int[][] b)
+ {
+ if (a == b) return true;
+ if (a == null || b == null) return false;
+ if (a.length != b.length) return false;
+ for (int i = 0; i < a.length; i++)
+ {
+ if (!Arrays.equals(a[i], b[i])) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public String toString()
{
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index d669fd8d..2e31cbec 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -47,6 +47,7 @@ import accord.messages.WaitUntilApplied;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.RoutableKey;
+import accord.primitives.Seekable;
import accord.primitives.SyncPoint;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
@@ -257,6 +258,16 @@ public class ListStore implements DataStore
return String.format("(%s -> %s)", sp.syncId, sp.keysOrRanges);
}
+ public String historySeekable(Seekable o)
+ {
+ switch (o.domain())
+ {
+ case Key: return history(o.asKey());
+ case Range: return history(Ranges.single(o.asRange()));
+ default: throw new IllegalArgumentException("Unknown domain: " +
o.domain() + ", input=" + o);
+ }
+ }
+
private String history(Ranges ranges)
{
return history("range", ranges, other -> other.intersects(ranges));
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java
b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 1b9d0fd4..1ff9e7ee 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -23,6 +23,8 @@ import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
+
import accord.impl.*;
import accord.primitives.*;
import org.slf4j.Logger;
@@ -64,6 +66,31 @@ public class ListWrite extends TreeMap<Key, int[]>
implements Write
});
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == this) return true;
+ if (!(o instanceof ListWrite)) return false;
+ ListWrite other = (ListWrite) o;
+ // Can not rely on Map.equals as our value is an array: (new int[]
{2}).equals(new int[] {2}) == false!
+ if (!Sets.difference(keySet(), other.keySet()).isEmpty()
+ || !Sets.difference(other.keySet(), keySet()).isEmpty())
+ return false;
+ // keys match
+ for (Key k : keySet())
+ {
+ if (!Arrays.equals(get(k), other.get(k)))
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public String toString()
{
diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java
b/accord-core/src/test/java/accord/utils/AccordGens.java
index 971c70e7..3247ebb1 100644
--- a/accord-core/src/test/java/accord/utils/AccordGens.java
+++ b/accord-core/src/test/java/accord/utils/AccordGens.java
@@ -213,6 +213,17 @@ public class AccordGens
};
}
+ public static Gen<Key> keysInsideRanges(Ranges ranges)
+ {
+ Invariants.checkArgument(!ranges.isEmpty(), "Ranges empty");
+ RoutingKey sample = ranges.get(0).end();
+ if (sample instanceof PrefixedIntHashKey)
+ return prefixedIntHashKeyInsideRanges(ranges);
+ if (sample instanceof IntKey.Routing)
+ return intKeysInsideRanges(ranges);
+ throw new IllegalArgumentException("Unsupported key type " +
sample.getClass() + "; supported = PrefixedIntHashKey, IntKey");
+ }
+
public static Gen<KeyDeps> keyDeps(Gen<? extends Key> keyGen)
{
return keyDeps(keyGen, txnIds());
@@ -374,6 +385,39 @@ public class AccordGens
return ranges(sizeGen, keyGen, (ignore, a, b) -> factory.apply(a, b));
}
+ public static Gen<Range> rangeInsideRange(Range range)
+ {
+ if (range.end() instanceof PrefixedIntHashKey)
+ return prefixedIntHashKeyRangeInsideRange(range);
+ throw new IllegalArgumentException("Unsupported type: " +
range.start().getClass());
+ }
+
+ public static Gen<Range> prefixedIntHashKeyRangeInsideRange(Range range)
+ {
+ if (!(range.end() instanceof PrefixedIntHashKey))
+ throw new IllegalArgumentException("Only PrefixedIntHashKey
supported; saw " + range.end().getClass());
+ PrefixedIntHashKey start = (PrefixedIntHashKey) range.start();
+ PrefixedIntHashKey end = (PrefixedIntHashKey) range.end();
+ if (start.hash + 1 == end.hash)
+ {
+ // range is of size 1, so can not split into a smaller range...
+ return ignore -> range;
+ }
+ return rs -> {
+ int a = rs.nextInt(start.hash, end.hash);
+ int b = rs.nextInt(start.hash, end.hash);
+ while (a == b)
+ b = rs.nextInt(start.hash, end.hash);
+ if (a > b)
+ {
+ int tmp = a;
+ a = b;
+ b = tmp;
+ }
+ return PrefixedIntHashKey.range(start.prefix, a, b);
+ };
+ }
+
public static Gen<Ranges> prefixedIntHashKeyRanges(int numNodes, int rf)
{
return rs -> {
diff --git a/accord-core/src/test/java/accord/utils/Gens.java
b/accord-core/src/test/java/accord/utils/Gens.java
index 3723fc60..fe44a81b 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
+import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -55,13 +56,23 @@ public class Gens {
return ignore -> constant.get();
}
- public static <T> Gen<T> oneOf(Gen<T>... gens)
+ public static <T> Gen<T> oneOf(Gen<? extends T>... gens)
{
+ switch (gens.length)
+ {
+ case 0: throw new IllegalArgumentException("Unable to select oneOf
an empty list");
+ case 1: return (Gen<T>) gens[0];
+ }
return oneOf(Arrays.asList(gens));
}
- public static <T> Gen<T> oneOf(List<Gen<T>> gens)
+ public static <T> Gen<T> oneOf(List<Gen<? extends T>> gens)
{
+ switch (gens.size())
+ {
+ case 0: throw new IllegalArgumentException("Unable to select oneOf
an empty list");
+ case 1: return (Gen<T>) gens.get(0);
+ }
return rs -> rs.pick(gens).next(rs);
}
@@ -465,6 +476,11 @@ public class Gens {
return new StringDSL();
}
+ public static BooleanSupplier supplier(Gen<Boolean> gen, RandomSource rs)
+ {
+ return () -> gen.next(rs);
+ }
+
public static class BooleanDSL
{
public Gen<Boolean> all()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]