This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4c870dc9 Accord Journal / Determinism
4c870dc9 is described below
commit 4c870dc9b561a841ea9b923ff739953adcc00325
Author: Alex Petrov <[email protected]>
AuthorDate: Mon Jun 17 09:53:43 2024 +0200
Accord Journal / Determinism
* Store intermediate Command states in the log
* Load Command states from the log
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-19757
---
.../java/accord/impl/InMemoryCommandStore.java | 13 -
.../src/main/java/accord/local/Cleanup.java | 1 -
.../src/main/java/accord/local/Command.java | 18 +-
.../src/main/java/accord/local/Listeners.java | 2 +-
.../main/java/accord/local/SafeCommandStore.java | 5 +
.../main/java/accord/local/SerializerSupport.java | 613 ---------------------
.../src/main/java/accord/utils/Invariants.java | 7 +
.../accord/impl/basic/DelayedCommandStores.java | 49 +-
.../src/test/java/accord/impl/basic/Journal.java | 560 +++++++++++--------
.../java/accord/local/BootstrapLocalTxnTest.java | 3 +-
.../test/java/accord/local/CheckedCommands.java | 35 ++
.../java/accord/utils/ReducingRangeMapTest.java | 4 -
12 files changed, 401 insertions(+), 909 deletions(-)
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 5ee3f8d5..4932b181 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -461,7 +461,6 @@ public abstract class InMemoryCommandStore extends
CommandStore
private <T> T executeInContext(InMemoryCommandStore commandStore,
PreLoadContext preLoadContext, Function<? super SafeCommandStore, T> function,
boolean isDirectCall)
{
-
SafeCommandStore safeStore =
commandStore.beginOperation(preLoadContext);
try
{
@@ -623,18 +622,6 @@ public abstract class InMemoryCommandStore extends
CommandStore
}
}
- private static class TimestampAndStatus
- {
- public final Timestamp timestamp;
- public final Status status;
-
- public TimestampAndStatus(Timestamp timestamp, Status status)
- {
- this.timestamp = timestamp;
- this.status = status;
- }
- }
-
public static class GlobalTimestampsForKey extends
GlobalState<TimestampsForKey>
{
private final Key key;
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java
b/accord-core/src/main/java/accord/local/Cleanup.java
index bb6b1d2e..5da56e3c 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -34,7 +34,6 @@ import static
accord.local.SaveStatus.TruncatedApplyWithOutcome;
import static accord.local.SaveStatus.Uninitialised;
import static accord.local.Status.Applied;
import static accord.local.Status.Durability.Majority;
-import static accord.local.Status.Durability.Universal;
import static accord.local.Status.Durability.UniversalOrInvalidated;
import static accord.local.Status.PreCommitted;
import static accord.primitives.Txn.Kind.EphemeralRead;
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index 96df4461..51bd9410 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -69,7 +69,7 @@ import static java.lang.String.format;
public abstract class Command implements CommonAttributes
{
- interface Listener
+ public interface Listener
{
void onChange(SafeCommandStore safeStore, SafeCommand safeCommand);
@@ -275,6 +275,7 @@ public abstract class Command implements CommonAttributes
this.listeners = common.durableListeners();
}
+
@Override
public boolean equals(Object o)
{
@@ -645,9 +646,15 @@ public abstract class Command implements CommonAttributes
SaveStatus saveStatus = saveStatus();
return saveStatus.hasBeen(Status.Committed) &&
!saveStatus.hasBeen(Invalidated);
}
+
public final boolean isStable()
{
SaveStatus saveStatus = saveStatus();
+ return isStable(saveStatus);
+ }
+
+ public static boolean isStable(SaveStatus saveStatus)
+ {
return saveStatus.hasBeen(Status.Stable) &&
!saveStatus.hasBeen(Invalidated);
}
@@ -661,11 +668,6 @@ public abstract class Command implements CommonAttributes
return Invariants.cast(this, Executed.class);
}
- public final boolean isTruncated()
- {
- return status().hasBeen(Status.Truncated);
- }
-
public abstract Command updateAttributes(CommonAttributes attrs, Ballot
promised);
public final Command updateAttributes(CommonAttributes attrs)
@@ -1105,7 +1107,7 @@ public abstract class Command implements CommonAttributes
return Objects.equals(acceptedOrCommitted(),
that.acceptedOrCommitted());
}
- static Accepted accepted(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted)
+ public static Accepted accepted(CommonAttributes common, SaveStatus
status, Timestamp executeAt, Ballot promised, Ballot accepted)
{
return validate(new Accepted(common, status, promised, executeAt,
accepted));
}
@@ -1197,7 +1199,7 @@ public abstract class Command implements CommonAttributes
return committed(command, common, command.promised(),
command.saveStatus(), waitingOn);
}
- static Committed committed(CommonAttributes common, SaveStatus status,
Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn)
+ public static Committed committed(CommonAttributes common, SaveStatus
status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn
waitingOn)
{
return validate(new Committed(common, status, executeAt, promised,
accepted, waitingOn));
}
diff --git a/accord-core/src/main/java/accord/local/Listeners.java
b/accord-core/src/main/java/accord/local/Listeners.java
index 13929713..b4059620 100644
--- a/accord-core/src/main/java/accord/local/Listeners.java
+++ b/accord-core/src/main/java/accord/local/Listeners.java
@@ -51,7 +51,7 @@ public class Listeners<L extends Command.Listener> extends
DeterministicSet<L>
super(listeners);
}
- Listeners<L> mutable()
+ public Listeners<L> mutable()
{
return new Listeners<>(this);
}
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 139bf088..7aea6790 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -194,6 +194,11 @@ public abstract class SafeCommandStore
protected abstract SafeCommandsForKey
getInternalIfLoadedAndInitialised(Key key);
public abstract boolean canExecuteWith(PreLoadContext context);
+ public void load(Command loaded)
+ {
+ update(null, loaded);
+ }
+
protected void update(Command prev, Command updated)
{
updateMaxConflicts(prev, updated);
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java
b/accord-core/src/main/java/accord/local/SerializerSupport.java
deleted file mode 100644
index 7ad168a2..00000000
--- a/accord-core/src/main/java/accord/local/SerializerSupport.java
+++ /dev/null
@@ -1,613 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package accord.local;
-
-import java.util.Set;
-import javax.annotation.Nullable;
-
-import com.google.common.collect.ImmutableSet;
-
-import accord.api.Agent;
-import accord.api.Result;
-import accord.api.VisibleForImplementation;
-import accord.local.Command.WaitingOn;
-import accord.local.CommandStores.RangesForEpoch;
-import accord.local.CommonAttributes.Mutable;
-import accord.messages.Accept;
-import accord.messages.Apply;
-import accord.messages.ApplyThenWaitUntilApplied;
-import accord.messages.BeginRecovery;
-import accord.messages.Commit;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.Propagate;
-import accord.primitives.Ballot;
-import accord.primitives.Deps;
-import accord.primitives.FullRangeRoute;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
-import accord.primitives.Ranges;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.primitives.Writes;
-import accord.utils.Invariants;
-
-import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
-import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
-import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ;
-import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
-import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
-import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ;
-import static accord.messages.MessageType.PRE_ACCEPT_REQ;
-import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
-import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
-import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
-import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
-import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
-import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
-import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
-import static accord.primitives.PartialTxn.merge;
-import static accord.utils.Invariants.checkState;
-import static accord.utils.Invariants.illegalState;
-
-@VisibleForImplementation
-public class SerializerSupport
-{
- private static final Set<MessageType> PRE_ACCEPT_TYPES =
- ImmutableSet.of(PRE_ACCEPT_REQ, BEGIN_RECOVER_REQ,
PROPAGATE_PRE_ACCEPT_MSG);
-
- private static final Set<MessageType> PRE_ACCEPT_COMMIT_TYPES =
- ImmutableSet.<MessageType>builder()
- .addAll(PRE_ACCEPT_TYPES)
- .add(COMMIT_SLOW_PATH_REQ, COMMIT_MAXIMAL_REQ)
- .build();
-
- private static final Set<MessageType> PRE_ACCEPT_STABLE_TYPES =
- ImmutableSet.<MessageType>builder()
- .addAll(PRE_ACCEPT_COMMIT_TYPES)
- .add(STABLE_FAST_PATH_REQ, STABLE_SLOW_PATH_REQ,
STABLE_MAXIMAL_REQ, PROPAGATE_STABLE_MSG)
- .build();
-
- private static final Set<MessageType> APPLY_TYPES =
- ImmutableSet.of(APPLY_MINIMAL_REQ, APPLY_MAXIMAL_REQ, PROPAGATE_APPLY_MSG,
APPLY_THEN_WAIT_UNTIL_APPLIED_REQ);
-
- private static final Set<MessageType> PRE_ACCEPT_COMMIT_APPLY_TYPES =
- ImmutableSet.<MessageType>builder()
- .addAll(PRE_ACCEPT_STABLE_TYPES)
- .addAll(APPLY_TYPES)
- .add(PROPAGATE_OTHER_MSG)
- .build();
-
- private static Command localOnly(Agent agent, RangesForEpoch
rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt,
@Nullable Timestamp executesAtLeast, Ballot promised, Ballot accepted,
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
- {
- //TODO (expected): LocalOnly doesn't reflect normal command flow so
relying on this special casing helps reconstruct work, but is a bit brittle;
should find a more maintainable way.
- TxnId txnId = attrs.txnId();
- FullRangeRoute route = (FullRangeRoute) attrs.route();
- //TODO (correctness): not 100% correct as the ranges was "valid" which
can be mutated "after" the sync point... so might actually have less
- Ranges participantRanges = route.participants().toRanges();
-
- Txn emptyTxn = agent.emptyTxn(txnId.kind(), participantRanges);
- Writes writes = emptyTxn.execute(txnId, txnId, null);
- Result results = emptyTxn.result(txnId, txnId, null);
- Ranges coordinateRanges = rangesForEpoch.coordinates(txnId);
- attrs.partialTxn(emptyTxn.slice(coordinateRanges, true))
- .partialDeps(Deps.NONE.slice(coordinateRanges));
-
- switch (status.status)
- {
- case NotDefined:
- return status == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(txnId)
- :
Command.NotDefined.notDefined(attrs, promised);
- case PreAccepted:
- return Command.PreAccepted.preAccepted(attrs, executeAt,
promised);
- case AcceptedInvalidate:
- case Accepted:
- case PreCommitted:
- return Command.Accepted.accepted(attrs, status, executeAt,
promised, accepted);
- case Committed:
- case Stable:
- return Command.Committed.committed(attrs, status, executeAt,
promised, accepted, waitingOnProvider.provide(attrs.partialDeps()));
- case PreApplied:
- case Applied:
- return Command.Executed.executed(attrs, status, executeAt,
promised, accepted, waitingOnProvider.provide(attrs.partialDeps()), writes,
results);
- case Truncated:
- case Invalidated:
- switch (status)
- {
- case Erased:
- return Command.Truncated.erased(txnId,
attrs.durability(), attrs.route());
- case TruncatedApplyWithOutcome:
- return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, results, executesAtLeast);
- case TruncatedApply:
- return Command.Truncated.truncatedApply(attrs, status,
executeAt, null, null, executesAtLeast);
- default:
- throw new AssertionError("Unexpected truncate status:
" + status);
- }
- default:
- throw new IllegalStateException();
- }
- }
-
- /**
- * Reconstructs Command from register values and protocol messages.
- */
- public static Command reconstruct(Agent agent, RangesForEpoch
rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt,
@Nullable Timestamp executesAtLeast, Ballot promised, Ballot accepted,
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
- {
- if (attrs.txnId().kind() == Txn.Kind.LocalOnly)
- return localOnly(agent, rangesForEpoch, attrs, status, executeAt,
executesAtLeast, promised, accepted, waitingOnProvider, messageProvider);
- switch (status.status)
- {
- case NotDefined:
- return status == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
- :
Command.NotDefined.notDefined(attrs, promised);
- case PreAccepted:
- return preAccepted(rangesForEpoch, attrs, executeAt, promised,
messageProvider);
- case AcceptedInvalidate:
- case Accepted:
- case PreCommitted:
- return accepted(rangesForEpoch, attrs, status, executeAt,
promised, accepted, messageProvider);
- case Committed:
- case Stable:
- return committed(rangesForEpoch, attrs, status, executeAt,
promised, accepted, waitingOnProvider, messageProvider);
- case PreApplied:
- case Applied:
- return executed(rangesForEpoch, attrs, status, executeAt,
promised, accepted, waitingOnProvider, messageProvider);
- case Truncated:
- case Invalidated:
- return truncated(attrs, status, executeAt, executesAtLeast,
messageProvider);
- default:
- throw new IllegalStateException();
- }
- }
-
- private static Command.PreAccepted preAccepted(RangesForEpoch
rangesForEpoch, Mutable attrs, Timestamp executeAt, Ballot promised,
MessageProvider messageProvider)
- {
- Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
- checkState(!witnessed.isEmpty(), "PreAccepted message types not
witnessed; witnessed is ", new LoggedMessageProvider(messageProvider));
- attrs.partialTxn(txnFromPreAcceptOrBeginRecover(rangesForEpoch,
witnessed, messageProvider));
- return Command.PreAccepted.preAccepted(attrs, executeAt, promised);
- }
-
- private static Command.Accepted accepted(RangesForEpoch rangesForEpoch,
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot
accepted, MessageProvider messageProvider)
- {
- if (status.known.isDefinitionKnown())
- {
- Set<MessageType> witnessed =
messageProvider.test(PRE_ACCEPT_TYPES);
- checkState(!witnessed.isEmpty());
- attrs.partialTxn(txnFromPreAcceptOrBeginRecover(rangesForEpoch,
witnessed, messageProvider));
- }
-
- if (status.known.deps.hasProposedDeps())
- {
- Accept accept = messageProvider.accept(accepted);
- attrs.partialDeps(slicePartialDeps(rangesForEpoch, accept));
- }
-
- return Command.Accepted.accepted(attrs, status, executeAt, promised,
accepted);
- }
-
- private static Command.Committed committed(RangesForEpoch rangesForEpoch,
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
- {
- attrs = extract(rangesForEpoch, status, accepted, messageProvider,
(attrs0, txn, deps, i1, i2) -> attrs0.partialTxn(txn).partialDeps(deps), attrs);
- return Command.Committed.committed(attrs, status, executeAt, promised,
accepted, waitingOnProvider.provide(attrs.partialDeps()));
- }
-
- private static Command.Executed executed(RangesForEpoch rangesForEpoch,
Mutable attrs, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot
accepted, WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
- {
- return extract(rangesForEpoch, status, accepted, messageProvider,
(attrs0, txn, deps, writes, result) -> {
- attrs0.partialTxn(txn)
- .partialDeps(deps);
-
- return Command.Executed.executed(attrs, status, executeAt,
promised, accepted, waitingOnProvider.provide(deps), writes, result);
- }, attrs);
- }
-
- private static Command.Truncated truncated(Mutable attrs, SaveStatus
status, Timestamp executeAt, @Nullable Timestamp executesAtLeast,
MessageProvider messageProvider)
- {
- Writes writes = null;
- Result result = null;
-
- switch (status)
- {
- default:
- throw illegalState("Unhandled SaveStatus: " + status);
- case TruncatedApplyWithOutcome:
- case TruncatedApplyWithDeps:
- Set<MessageType> witnessed = messageProvider.test(APPLY_TYPES);
- checkState(!witnessed.isEmpty());
- if (witnessed.contains(APPLY_MAXIMAL_REQ))
- {
- Apply apply = messageProvider.applyMaximal();
- writes = apply.writes;
- result = apply.result;
- }
- else if (witnessed.contains(APPLY_MINIMAL_REQ))
- {
- Apply apply = messageProvider.applyMinimal();
- writes = apply.writes;
- result = apply.result;
- }
- else if (witnessed.contains(PROPAGATE_APPLY_MSG))
- {
- Propagate propagate = messageProvider.propagateApply();
- writes = propagate.writes;
- result = propagate.result;
- }
- else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ))
- {
- ApplyThenWaitUntilApplied apply =
messageProvider.applyThenWaitUntilApplied();
- writes = apply.writes;
- result = apply.result;
- }
- else
- {
- throw new UnsupportedOperationException("Unhandled types:
" + witnessed);
- }
- case TruncatedApply:
- return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result, executesAtLeast);
- case ErasedOrInvalidated:
- return Command.Truncated.erasedOrInvalidated(attrs.txnId(),
attrs.durability(), attrs.route());
- case Erased:
- return Command.Truncated.erased(attrs.txnId(),
attrs.durability(), attrs.route());
- case Invalidated:
- return Command.Truncated.invalidated(attrs.txnId(),
attrs.durableListeners());
- }
- }
-
- public static class TxnAndDeps
- {
- public static TxnAndDeps EMPTY = new TxnAndDeps(null, null);
-
- public final PartialTxn txn;
- public final PartialDeps deps;
-
- TxnAndDeps(PartialTxn txn, PartialDeps deps)
- {
- this.txn = txn;
- this.deps = deps;
- }
- }
-
- interface WithContents<I, O>
- {
- O apply(I in, PartialTxn txn, PartialDeps deps, Writes writes, Result
result);
- }
-
- public static TxnAndDeps extractTxnAndDeps(RangesForEpoch rangesForEpoch,
SaveStatus status, Ballot accepted, MessageProvider messageProvider)
- {
- return extract(rangesForEpoch, status, accepted, messageProvider, (i1,
txn, deps, i2, i3) -> new TxnAndDeps(txn, deps), null);
- }
-
- private static <I, O> O extract(RangesForEpoch rangesForEpoch, SaveStatus
status, Ballot accepted, MessageProvider messageProvider, WithContents<I, O>
withContents, I param)
- {
- // TODO (expected): first check if we have taken the normal path
- Set<MessageType> witnessed;
-
- // TODO (required): we must select the deps we would have used for
initialiseWaitingOn
- switch (status.status)
- {
- case PreAccepted:
- witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
- checkState(!witnessed.isEmpty(), "Unable to find PreAccept
types; witnessed %s", new LoggedMessageProvider(messageProvider));
- return withContents.apply(param,
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider),
null, null, null);
-
- case AcceptedInvalidate:
- case Accepted:
- case PreCommitted:
- {
- PartialTxn txn = null;
- PartialDeps deps = null;
-
- if (status.known.isDefinitionKnown())
- {
- witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
- checkState(!witnessed.isEmpty(), "Unable to find PreAccept
types; witnessed %s", new LoggedMessageProvider(messageProvider));
- txn = txnFromPreAcceptOrBeginRecover(rangesForEpoch,
witnessed, messageProvider);
- }
-
- if (status.known.deps.hasProposedDeps())
- {
- Accept accept = messageProvider.accept(accepted);
- deps = slicePartialDeps(rangesForEpoch, accept);
- }
- return withContents.apply(param, txn, deps, null, null);
- }
- case Committed:
- {
- witnessed = messageProvider.test(PRE_ACCEPT_COMMIT_TYPES);
- Commit commit;
- if (witnessed.contains(COMMIT_MAXIMAL_REQ))
- {
- commit = messageProvider.commitMaximal();
- }
- else
- {
-
Invariants.checkState(witnessed.contains(COMMIT_SLOW_PATH_REQ), "Unable to find
COMMIT_SLOW_PATH_REQ; witnessed %s", new
LoggedMessageProvider(messageProvider));
- commit = messageProvider.commitSlowPath();
- }
-
- return sliceAndApply(rangesForEpoch, messageProvider,
witnessed, commit, withContents, param, null, null);
- }
-
- case Stable:
- {
- // TODO (required): we should piece this back together in the
precedence order of arrival
- witnessed = messageProvider.test(PRE_ACCEPT_STABLE_TYPES);
- Commit commit;
- if (witnessed.contains(STABLE_MAXIMAL_REQ))
- {
- commit = messageProvider.stableMaximal();
- }
- else if (witnessed.contains(PROPAGATE_STABLE_MSG))
- {
- return sliceAndApply(rangesForEpoch,
messageProvider.propagateStable(), withContents, param, null, null);
- }
- else if (witnessed.contains(STABLE_FAST_PATH_REQ))
- {
- commit = messageProvider.stableFastPath();
- }
- else
- {
- checkState(witnessed.contains(STABLE_SLOW_PATH_REQ),
"Unable to find STABLE_SLOW_PATH_REQ; txn_id=%s, witnessed %s",
messageProvider.txnId(), new LoggedMessageProvider(messageProvider));
- if (witnessed.contains(COMMIT_MAXIMAL_REQ))
- {
- commit = messageProvider.commitMaximal();
- }
- else if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
- {
- commit = messageProvider.commitSlowPath();
- }
- else if (witnessed.contains(PRE_ACCEPT_REQ) ||
witnessed.contains(BEGIN_RECOVER_REQ) ||
witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
- {
- Commit slowPath = messageProvider.stableSlowPath();
- Ranges ranges =
rangesForEpoch.allBetween(slowPath.txnId.epoch(), slowPath.executeAt.epoch());
- PartialTxn txn =
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed,
messageProvider).slice(ranges, true);
- PartialDeps deps = slowPath.partialDeps.slice(ranges);
- return withContents.apply(param, txn, deps, null,
null);
- }
- else
- {
- throw illegalState("Unable to find
COMMIT_SLOW_PATH_REQ; witnessed %s", new
LoggedMessageProvider(messageProvider));
- }
- }
-
- return sliceAndApply(rangesForEpoch, messageProvider,
witnessed, commit, withContents, param, null, null);
- }
-
- case PreApplied:
- case Applied:
- {
- witnessed =
messageProvider.test(PRE_ACCEPT_COMMIT_APPLY_TYPES);
- if (witnessed.contains(APPLY_MAXIMAL_REQ))
- {
- Apply apply = messageProvider.applyMaximal();
- Ranges ranges =
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
- return withContents.apply(param, apply.txn.slice(ranges,
true), apply.deps.slice(ranges), apply.writes, apply.result);
- }
- else if (witnessed.contains(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ))
- {
- ApplyThenWaitUntilApplied apply =
messageProvider.applyThenWaitUntilApplied();
- Ranges ranges =
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
- return withContents.apply(param, apply.txn.slice(ranges,
true), apply.deps.slice(ranges), apply.writes, apply.result);
- }
- else if (witnessed.contains(APPLY_MINIMAL_REQ))
- {
- Apply apply = messageProvider.applyMinimal();
- Commit commit;
- if (witnessed.contains(STABLE_MAXIMAL_REQ))
- {
- commit = messageProvider.stableMaximal();
- }
- else if (witnessed.contains(PROPAGATE_STABLE_MSG))
- {
- Propagate propagate =
messageProvider.propagateStable();
- var ranges = propagate.committedExecuteAt == null ?
rangesForEpoch.allAt(propagate.txnId) :
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
- return withContents.apply(param,
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges),
apply.writes, apply.result);
- }
- else if (witnessed.contains(PROPAGATE_APPLY_MSG))
- {
- Propagate propagate = messageProvider.propagateApply();
- var ranges = propagate.committedExecuteAt == null ?
rangesForEpoch.allAt(propagate.txnId) :
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
- return withContents.apply(param,
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges),
apply.writes, apply.result);
- }
- else if (witnessed.contains(COMMIT_MAXIMAL_REQ))
- {
- commit = messageProvider.commitMaximal();
- }
- else if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
- {
- commit = messageProvider.commitSlowPath();
- }
- else if (witnessed.contains(STABLE_FAST_PATH_REQ))
- {
- commit = messageProvider.stableFastPath();
- }
- else if (witnessed.contains(PRE_ACCEPT_REQ) ||
witnessed.contains(BEGIN_RECOVER_REQ) ||
witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
- {
- PartialTxn txn =
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
- Ranges ranges =
rangesForEpoch.allBetween(apply.txnId.epoch(), apply.executeAt.epoch());
- return withContents.apply(param, txn.slice(ranges,
true), apply.deps.slice(ranges), apply.writes, apply.result);
- }
- else
- {
- throw illegalState("Invalid state: insufficient stable
or commit messages found to reconstruct PreApplied or greater SaveStatus;
witnessed " + witnessed);
- }
-
- return sliceAndApply(rangesForEpoch, messageProvider,
witnessed, commit, withContents, param, apply.writes, apply.result);
- }
- else if (witnessed.contains(PROPAGATE_APPLY_MSG))
- {
- Propagate propagate = messageProvider.propagateApply();
- Invariants.nonNull(propagate.partialTxn, "Unable to find
partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider));
- Invariants.nonNull(propagate.stableDeps, "Unable to find
stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider));
- return sliceAndApply(rangesForEpoch, propagate,
withContents, param, propagate.writes, propagate.result);
- }
- else if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
- {
- // once propgate runs locally it merges the local state
with the remote state, which may make this go from PRE_ACCEPT to PRE_APPLIED!
- Propagate propagate = messageProvider.propagatePreAccept();
- Invariants.nonNull(propagate.partialTxn, "Unable to find
partialTxn; witnessed %s", new LoggedMessageProvider(messageProvider));
- Invariants.nonNull(propagate.stableDeps, "Unable to find
stableDeps; witnessed %s", new LoggedMessageProvider(messageProvider));
-
- var ranges = propagate.committedExecuteAt == null ?
rangesForEpoch.allAt(propagate.txnId) :
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
- return withContents.apply(param,
propagate.partialTxn.slice(ranges, true), propagate.stableDeps.slice(ranges),
propagate.writes, propagate.result);
- }
- else if (witnessed.contains(PROPAGATE_OTHER_MSG))
- {
- // the txn/deps may have been erased, won't always be
here...
- Propagate propagate = messageProvider.propagateOther();
- var ranges = propagate.committedExecuteAt == null ?
rangesForEpoch.allAt(propagate.txnId) :
rangesForEpoch.allBetween(propagate.txnId, propagate.committedExecuteAt);
- PartialTxn txn = propagate.partialTxn == null ? null :
propagate.partialTxn.slice(ranges, true);
- PartialDeps deps = propagate.stableDeps == null ? null :
propagate.stableDeps.slice(ranges);
- return withContents.apply(param, txn, deps,
propagate.writes, propagate.result);
- }
- else
- {
- throw illegalState("Unable to find messages that lead to
PreApplied state; txn_id=%s, witnessed %s", messageProvider.txnId(), new
LoggedMessageProvider(messageProvider));
- }
- }
-
- case NotDefined:
- case Truncated:
- case Invalidated:
- return withContents.apply(param, null, null, null, null);
-
- default:
- throw new IllegalStateException();
- }
- }
-
- private static PartialTxn txnFromPreAcceptOrBeginRecover(RangesForEpoch
rangesForEpoch, Set<MessageType> witnessed, MessageProvider messageProvider)
- {
- if (witnessed.contains(PRE_ACCEPT_REQ))
- {
- PreAccept preAccept = messageProvider.preAccept();
- return
preAccept.partialTxn.slice(rangesForEpoch.allBetween(preAccept.txnId.epoch(),
preAccept.maxEpoch), true);
- }
-
- if (witnessed.contains(BEGIN_RECOVER_REQ))
- {
- BeginRecovery beginRecovery = messageProvider.beginRecover();
- return
beginRecovery.partialTxn.slice(rangesForEpoch.allAt(beginRecovery.txnId.epoch()),
true);
- }
-
- // TODO (expected): do we ever propagate only preaccept anymore?
- if (witnessed.contains(PROPAGATE_PRE_ACCEPT_MSG))
- {
- Propagate propagate = messageProvider.propagatePreAccept();
- return
propagate.partialTxn.slice(rangesForEpoch.allBetween(propagate.txnId.epoch(),
propagate.toEpoch), true);
- }
-
- return null;
- }
-
- private static PartialDeps slicePartialDeps(RangesForEpoch rangesForEpoch,
Accept accept)
- {
- return
accept.partialDeps.slice(rangesForEpoch.allBetween(accept.txnId.epoch(),
accept.executeAt.epoch()));
- }
-
- private static <I, O> O sliceAndApply(RangesForEpoch rangesForEpoch,
Propagate propagate, WithContents<I, O> withContents, I param, Writes writes,
Result result)
- {
- Ranges ranges = rangesForEpoch.allBetween(propagate.txnId.epoch(),
propagate.committedExecuteAt.epoch());
- PartialDeps partialDeps = propagate.stableDeps.slice(ranges);
- PartialTxn partialTxn = propagate.partialTxn.slice(ranges, true);
- return withContents.apply(param, partialTxn, partialDeps, writes,
result);
- }
-
- private static <I, O> O sliceAndApply(RangesForEpoch rangesForEpoch,
MessageProvider messageProvider, Set<MessageType> witnessed, Commit commit,
WithContents<I, O> withContents, I param, Writes writes, Result result)
- {
- Ranges ranges = rangesForEpoch.allBetween(commit.txnId.epoch(),
commit.executeAt.epoch());
- PartialDeps partialDeps = commit.partialDeps.slice(ranges);
- PartialTxn partialTxn = commit.partialTxn == null ? null :
commit.partialTxn.slice(ranges, true);
- switch (commit.kind)
- {
- default: throw new AssertionError("Unhandled Commit.Kind: " +
commit.kind);
- case CommitSlowPath:
- case StableFastPath:
- case StableSlowPath:
- PartialTxn preAcceptedPartialTxn =
txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
- if (partialTxn == null || partialTxn.keys().size() == 0)
partialTxn = preAcceptedPartialTxn;
- else partialTxn = merge(preAcceptedPartialTxn, partialTxn);
- if (partialTxn == null &&
witnessed.contains(COMMIT_MAXIMAL_REQ))
- partialTxn = messageProvider.commitMaximal().partialTxn;
- case StableWithTxnAndDeps:
- case CommitWithTxn:
- }
- return withContents.apply(param, partialTxn, partialDeps, writes,
result);
- }
-
-
- public interface WaitingOnProvider
- {
- WaitingOn provide(PartialDeps deps);
- }
-
- // TODO (required): randomised testing that we always restore the exact
same state
- public interface MessageProvider
- {
- TxnId txnId();
- Set<MessageType> test(Set<MessageType> messages);
- Set<MessageType> all();
-
- PreAccept preAccept();
-
- BeginRecovery beginRecover();
-
- Propagate propagatePreAccept();
-
- Accept accept(Ballot ballot);
-
- Commit commitSlowPath();
-
- Commit commitMaximal();
-
- Commit stableFastPath();
- Commit stableSlowPath();
-
- Commit stableMaximal();
-
- Propagate propagateStable();
-
- Apply applyMinimal();
-
- Apply applyMaximal();
-
- Propagate propagateApply();
-
- Propagate propagateOther();
-
- ApplyThenWaitUntilApplied applyThenWaitUntilApplied();
- }
-
- private static class LoggedMessageProvider
- {
- private final MessageProvider messageProvider;
-
- private LoggedMessageProvider(MessageProvider messageProvider)
- {
- this.messageProvider = messageProvider;
- }
-
- @Override
- public String toString()
- {
- return messageProvider.all().toString();
- }
- }
-}
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java
b/accord-core/src/main/java/accord/utils/Invariants.java
index 842ed5d3..ad9d8b35 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -19,6 +19,7 @@
package accord.utils;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
import net.nicoulaj.compilecommand.annotations.Inline;
@@ -101,6 +102,12 @@ public class Invariants
illegalState();
}
+ public static void checkState(boolean condition, Supplier<String> msg)
+ {
+ if (!condition)
+ illegalState(msg.get());
+ }
+
public static void checkState(boolean condition, String msg)
{
if (!condition)
diff --git
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 5ba32268..4e96d890 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
+import accord.api.Result;
import accord.impl.InMemoryCommandStore;
import accord.impl.InMemoryCommandStores;
import accord.impl.InMemorySafeCommand;
@@ -41,15 +42,14 @@ import accord.impl.InMemorySafeCommandsForKey;
import accord.impl.InMemorySafeTimestampsForKey;
import accord.impl.PrefixedIntHashKey;
import accord.impl.basic.TaskExecutorService.Task;
+import accord.impl.mock.MockStore;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores;
-import accord.local.CommonAttributes;
import accord.local.Node;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
-import accord.local.SerializerSupport;
import accord.local.ShardDistributor;
import accord.primitives.Range;
import accord.primitives.RoutableKey;
@@ -77,18 +77,12 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
return Pattern.compile(path);
}
- private static final List<Pattern> KNOWN_ISSUES =
List.of(field(".partialTxn.keys."),
-
field(".partialTxn.read.keys."),
-
field(".partialTxn.read.userReadKeys."),
-
field(".partialTxn.covering."),
-
field(".partialDeps.keyDeps."),
-
field(".partialDeps.rangeDeps."),
-
field(".partialDeps.covering."),
-
field(".additionalKeysOrRanges."),
+ private static final List<Pattern> KNOWN_ISSUES = List.of(
// when a
new epoch is detected and the execute ranges have more than the coordinating
ranges,
// and the
coordinating ranges doesn't include the home key... we drop the query...
// The logic
to stitch messages together is not able to handle this as it doesn't know the
original Topologies
-
field(".partialTxn.query."),
+ // TODO
(required): test int if this is still true
+//
field(".partialTxn.query."),
//
cmd.mutable().build() != cmd. This is due to Command.durability changing
NotDurable to Local depending on the status
field(".durability."));
}
@@ -172,28 +166,12 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
// "loading" the command doesn't make sense as we don't "store"
the command...
if (current.txnId().kind() == Txn.Kind.EphemeralRead)
return;
- Command.WaitingOn waitingOn = null;
- if (current.isStable() && !current.isTruncated())
- waitingOn = current.asCommitted().waitingOn;
- SerializerSupport.MessageProvider messages =
journal.makeMessageProvider(current.txnId());
- Command.WaitingOn finalWaitingOn = waitingOn;
- CommonAttributes.Mutable mutable = current.mutable();
- mutable.partialDeps(null).removePartialTxn();
- Command reconstructed;
- try
- {
- reconstructed = SerializerSupport.reconstruct(agent(),
unsafeRangesForEpoch(), mutable, current.saveStatus(), current.executeAt(),
current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null,
current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn,
messages);
- }
- catch (IllegalStateException t)
- {
- //TODO (correctness): journal doesn't guarantee we pick the
same records we used to state transition
- // Journal stores a list of messages it saw in some order it
defines, but when reconstructing a command we don't actually know what messages
were used, this could
- // lead to a case where deps mismatch, so ignoring this for now
- if (t.getMessage() != null && t.getMessage().startsWith("Deps
do not match; expected"))
- return;
- throw t;
- }
+ Result result = current.result();
+ if (result == null)
+ result = MockStore.RESULT;
+ // Journal will not have result persisted. This part is here for
test purposes and ensuring that we have strict object equality.
+ Command reconstructed = journal.reconstruct(id, current.txnId(),
result);
List<Difference<?>> diff =
ReflectionUtils.recursiveEquals(current, reconstructed);
List<String> filteredDiff = diff.stream().filter(d ->
!DelayedCommandStores.hasKnownIssue(d.path)).map(Object::toString).collect(Collectors.toList());
Invariants.checkState(filteredDiff.isEmpty(), "Commands did not
match: expected %s, given %s, node %s, store %d, diff %s", current,
reconstructed, time, id(), new LazyToString(() -> String.join("\n",
filteredDiff)));
@@ -303,10 +281,11 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
commands.entrySet().forEach(e -> {
InMemorySafeCommand safe = e.getValue();
if (!safe.isModified()) return;
+
+ Command before = safe.original();
+ Command after = safe.current();
+ commandStore.journal.onExecute(commandStore.id(), before,
after, context.primaryTxnId().equals(after.txnId()));
commandStore.validateRead(safe.current());
- Command original = safe.original();
- if (original != null)
- commandStore.validateRead(original);
});
}
}
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java
b/accord-core/src/test/java/accord/impl/basic/Journal.java
index 4c5f63e5..9258c0e0 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -20,102 +20,49 @@ package accord.impl.basic;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import java.util.function.Function;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
+import accord.api.Result;
import accord.impl.MessageListener;
+import accord.local.Command;
+import accord.local.CommonAttributes;
+import accord.local.Listeners;
import accord.local.Node;
-import accord.local.SerializerSupport;
-import accord.messages.AbstractEpochRequest;
-import accord.messages.Accept;
-import accord.messages.Apply;
-import accord.messages.ApplyThenWaitUntilApplied;
-import accord.messages.BeginRecovery;
-import accord.messages.Commit;
+import accord.local.SaveStatus;
+import accord.local.Status;
import accord.messages.LocalRequest;
import accord.messages.Message;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.Propagate;
import accord.messages.ReplyContext;
import accord.messages.Request;
-import accord.messages.TxnRequest;
import accord.primitives.Ballot;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Route;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.primitives.Writes;
import accord.utils.Invariants;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.LongArrayList;
-import static accord.messages.MessageType.ACCEPT_INVALIDATE_REQ;
-import static accord.messages.MessageType.ACCEPT_REQ;
-import static accord.messages.MessageType.APPLY_MAXIMAL_REQ;
-import static accord.messages.MessageType.APPLY_MINIMAL_REQ;
-import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ;
-import static accord.messages.MessageType.BEGIN_INVALIDATE_REQ;
-import static accord.messages.MessageType.BEGIN_RECOVER_REQ;
-import static accord.messages.MessageType.COMMIT_INVALIDATE_REQ;
-import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ;
-import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ;
-import static accord.messages.MessageType.INFORM_DURABLE_REQ;
-import static accord.messages.MessageType.INFORM_OF_TXN_REQ;
-import static accord.messages.MessageType.PRE_ACCEPT_REQ;
-import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
-import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
-import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
-import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
-import static accord.messages.MessageType.SET_GLOBALLY_DURABLE_REQ;
-import static accord.messages.MessageType.SET_SHARD_DURABLE_REQ;
-import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
-import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
-import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
+import static accord.utils.Invariants.illegalState;
+
public class Journal implements LocalRequest.Handler, Runnable
{
- private static final TxnIdProvider EPOCH = msg ->
((AbstractEpochRequest<?>) msg).txnId;
- private static final TxnIdProvider TXN = msg -> ((TxnRequest<?>)
msg).txnId;
- private static final TxnIdProvider LOCAL = msg -> ((LocalRequest<?>)
msg).primaryTxnId();
- private static final TxnIdProvider INVL = msg -> ((Commit.Invalidate)
msg).primaryTxnId();
- private static final Map<MessageType, TxnIdProvider> typeToProvider =
ImmutableMap.<MessageType, TxnIdProvider>builder()
-
.put(PRE_ACCEPT_REQ, TXN)
-
.put(ACCEPT_REQ, TXN)
-
.put(ACCEPT_INVALIDATE_REQ, EPOCH)
-
.put(COMMIT_SLOW_PATH_REQ, TXN)
-
.put(COMMIT_MAXIMAL_REQ, TXN)
-
.put(STABLE_FAST_PATH_REQ, TXN)
-
.put(STABLE_SLOW_PATH_REQ, TXN)
-
.put(STABLE_MAXIMAL_REQ, TXN)
-
.put(COMMIT_INVALIDATE_REQ, INVL)
-
.put(APPLY_MINIMAL_REQ, TXN)
-
.put(APPLY_MAXIMAL_REQ, TXN)
-
.put(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, EPOCH)
-
.put(BEGIN_RECOVER_REQ, TXN)
-
.put(BEGIN_INVALIDATE_REQ, EPOCH)
-
.put(INFORM_OF_TXN_REQ, EPOCH)
-
.put(INFORM_DURABLE_REQ, TXN)
-
.put(SET_SHARD_DURABLE_REQ, EPOCH)
-
.put(SET_GLOBALLY_DURABLE_REQ, EPOCH)
-
.put(PROPAGATE_PRE_ACCEPT_MSG, LOCAL)
-
.put(PROPAGATE_STABLE_MSG, LOCAL)
-
.put(PROPAGATE_APPLY_MSG, LOCAL)
-
.put(PROPAGATE_OTHER_MSG, LOCAL)
-
.build();
-
private final Queue<RequestContext> unframedRequests = new ArrayDeque<>();
private final LongArrayList waitForEpochs = new LongArrayList();
private final Long2ObjectHashMap<ArrayList<RequestContext>>
delayedRequests = new Long2ObjectHashMap<>();
- private final Map<TxnId, Map<MessageType, Message>> writes = new
HashMap<>();
private final MessageListener messageListener;
+ private final Map<Key, List<Diff>> diffs = new HashMap<>();
private Node node;
public Journal(MessageListener messageListener)
@@ -158,66 +105,6 @@ public class Journal implements LocalRequest.Handler,
Runnable
node.receive(request, from, replyContext);
}
- private void save(Message request)
- {
- MessageType type = request.type();
- TxnIdProvider provider = typeToProvider.get(type);
- Invariants.nonNull(provider, "Unknown type %s: %s", type, request);
- TxnId txnId = provider.txnId(request);
- writes.computeIfAbsent(txnId, ignore -> new Testing()).put(type,
request);
- }
-
- public SerializerSupport.MessageProvider makeMessageProvider(TxnId txnId)
- {
- return new MessageProvider(txnId, writes.getOrDefault(txnId,
Map.of()));
- }
-
- private static class Testing extends LinkedHashMap<MessageType, Message>
- {
- public Map<MessageType, List<Message>> history()
- {
- LinkedHashMap<MessageType, List<Message>> history = new
LinkedHashMap<>();
- for (MessageType k : keySet())
- {
- Object current = super.get(k);
- history.put(k, current instanceof List ? (List<Message>)
current : Collections.singletonList((Message) current));
- }
- return history;
- }
-
- @Override
- public Message get(Object key)
- {
- Object current = super.get(key);
- if (current == null || current instanceof Message)
- return (Message) current;
- List<Message> messages = (List<Message>) current;
- return messages.get(messages.size() - 1);
- }
-
- @Override
- public Message put(MessageType key, Message value)
- {
- Object current = super.get(key);
- if (current == null)
- return super.put(key, value);
- else if (current instanceof List)
- {
- List<Message> list = (List<Message>) current;
- list.add(value);
- return list.get(list.size() - 2);
- }
- else
- {
- List<Message> messages = new ArrayList<>();
- messages.add((Message) current);
- messages.add(value);
- super.put(key, value);
- return (Message) current;
- }
- }
- }
-
@Override
public void run()
{
@@ -248,7 +135,7 @@ public class Journal implements LocalRequest.Handler,
Runnable
requests.addAll(delayed);
}
waitForEpochs.removeIfLong(epoch ->
!delayedRequests.containsKey(epoch));
-
+
// for anything queued, put into the pending epochs or schedule
RequestContext request;
while (null != (request = unframedRequests.poll()))
@@ -266,19 +153,163 @@ public class Journal implements LocalRequest.Handler,
Runnable
requests.add(request);
}
}
-
+
// schedule
if (requests != null)
{
- requests.forEach(r -> save(r.message)); // save in batches to
simulate journal more...
requests.forEach(Runnable::run);
}
}
- @FunctionalInterface
- interface TxnIdProvider
+ public Command reconstruct(int commandStoreId, TxnId txnId, Result result)
+ {
+ Key key = new Key(txnId, commandStoreId);
+ List<Diff> diffs = this.diffs.get(key);
+ if (diffs == null || diffs.isEmpty())
+ return null;
+
+ Timestamp executeAt = null;
+ Timestamp executesAtLeast = null;
+ SaveStatus saveStatus = null;
+ Status.Durability durability = null;
+
+ Ballot acceptedOrCommitted = Ballot.ZERO;
+ Ballot promised = Ballot.ZERO;
+
+ Route<?> route = null;
+ PartialTxn partialTxn = null;
+ PartialDeps partialDeps = null;
+ Seekables<?, ?> additionalKeysOrRanges = null;
+
+ Command.WaitingOn waitingOn = null;
+ Writes writes = null;
+ Listeners.Immutable<Command.DurableAndIdempotentListener> listeners =
null;
+
+ for (Diff diff : diffs)
+ {
+ if (diff.txnId != null)
+ txnId = diff.txnId;
+ if (diff.executeAt != null)
+ executeAt = diff.executeAt;
+ if (diff.executesAtLeast != null)
+ executesAtLeast = diff.executesAtLeast;
+ if (diff.saveStatus != null)
+ saveStatus = diff.saveStatus;
+ if (diff.durability != null)
+ durability = diff.durability;
+
+ if (diff.acceptedOrCommitted != null)
+ acceptedOrCommitted = diff.acceptedOrCommitted;
+ if (diff.promised != null)
+ promised = diff.promised;
+
+ if (diff.route != null)
+ route = diff.route;
+ if (diff.partialTxn != null)
+ partialTxn = diff.partialTxn;
+ if (diff.partialDeps != null)
+ partialDeps = diff.partialDeps;
+ if (diff.additionalKeysOrRanges != null)
+ additionalKeysOrRanges = diff.additionalKeysOrRanges;
+
+ if (diff.waitingOn != null)
+ waitingOn = diff.waitingOn;
+ if (diff.writes != null)
+ writes = diff.writes;
+ if (diff.listeners != null)
+ listeners = diff.listeners;
+ }
+
+ if (!txnId.kind().awaitsOnlyDeps())
+ executesAtLeast = null;
+
+ switch (saveStatus.known.outcome)
+ {
+ case Erased:
+ case WasApply:
+ writes = null;
+ result = null;
+ break;
+ }
+
+ CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId);
+ if (partialTxn != null)
+ attrs.partialTxn(partialTxn);
+ if (durability != null)
+ attrs.durability(durability);
+ if (route != null)
+ attrs.route(route);
+
+ // TODO (desired): we can simplify this logic if, instead of diffing,
we will infer the diff from the status
+ if (partialDeps != null &&
+ (saveStatus.known.deps != Status.KnownDeps.NoDeps &&
+ saveStatus.known.deps != Status.KnownDeps.DepsErased &&
+ saveStatus.known.deps != Status.KnownDeps.DepsUnknown))
+ attrs.partialDeps(partialDeps);
+ if (additionalKeysOrRanges != null)
+ attrs.additionalKeysOrRanges(additionalKeysOrRanges);
+ if (listeners != null)
+ attrs.setListeners(listeners);
+ Invariants.checkState(saveStatus != null,
+ "Save status is null after applying %s", diffs);
+
+ switch (saveStatus.status)
+ {
+ case NotDefined:
+ return saveStatus == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
+ :
Command.NotDefined.notDefined(attrs, promised);
+ case PreAccepted:
+ return Command.PreAccepted.preAccepted(attrs, executeAt,
promised);
+ case AcceptedInvalidate:
+ case Accepted:
+ case PreCommitted:
+ return Command.Accepted.accepted(attrs, saveStatus, executeAt,
promised, acceptedOrCommitted);
+ case Committed:
+ case Stable:
+ return Command.Committed.committed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn);
+ case PreApplied:
+ case Applied:
+ return Command.Executed.executed(attrs, saveStatus, executeAt,
promised, acceptedOrCommitted, waitingOn, writes, result);
+ case Invalidated:
+ case Truncated:
+ return truncated(attrs, saveStatus, executeAt,
executesAtLeast, writes, result);
+ default:
+ throw new IllegalStateException("Do not know " +
saveStatus.status + " " + saveStatus);
+ }
+ }
+
+ private static Command.Truncated truncated(CommonAttributes.Mutable attrs,
SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes
writes, Result result)
{
- TxnId txnId(Message message);
+ switch (status)
+ {
+ default:
+ throw illegalState("Unhandled SaveStatus: " + status);
+ case TruncatedApplyWithOutcome:
+ case TruncatedApplyWithDeps:
+ case TruncatedApply:
+ return Command.Truncated.truncatedApply(attrs, status,
executeAt, writes, result, executesAtLeast);
+ case ErasedOrInvalidated:
+ return Command.Truncated.erasedOrInvalidated(attrs.txnId(),
attrs.durability(), attrs.route());
+ case Erased:
+ return Command.Truncated.erased(attrs.txnId(),
attrs.durability(), attrs.route());
+ case Invalidated:
+ return Command.Truncated.invalidated(attrs.txnId(),
attrs.durableListeners());
+ }
+ }
+
+ public void onExecute(int commandStoreId, Command before, Command after,
boolean isPrimary)
+ {
+ if (before == null && after == null)
+ return;
+ Diff diff = diff(before, after);
+ if (!isPrimary)
+ diff = diff.asNonprimary();
+
+ if (diff != null)
+ {
+ Key key = new Key(after.txnId(), commandStoreId);
+ diffs.computeIfAbsent(key, (k_) -> new ArrayList<>()).add(diff);
+ }
}
private static class RequestContext implements Runnable
@@ -301,137 +332,202 @@ public class Journal implements LocalRequest.Handler,
Runnable
}
}
- public static class MessageProvider implements
SerializerSupport.MessageProvider
+ private static class Diff
{
public final TxnId txnId;
- private final Map<MessageType, Message> writes;
- public MessageProvider(TxnId txnId, Map<MessageType, Message> writes)
- {
- this.txnId = txnId;
- this.writes = writes;
- }
+ public final Timestamp executeAt;
+ public final Timestamp executesAtLeast;
+ public final SaveStatus saveStatus;
+ public final Status.Durability durability;
- @Override
- public TxnId txnId()
- {
- return txnId;
- }
+ public final Ballot acceptedOrCommitted;
+ public final Ballot promised;
- @Override
- public Set<MessageType> test(Set<MessageType> messages)
- {
- return Sets.intersection(writes.keySet(), messages);
- }
+ public final Route<?> route;
+ public final PartialTxn partialTxn;
+ public final PartialDeps partialDeps;
- @Override
- public Set<MessageType> all()
- {
- return writes.keySet();
- }
+ public final Writes writes;
+ public final Command.WaitingOn waitingOn;
+ public final Seekables<?, ?> additionalKeysOrRanges;
+ public final Listeners.Immutable<Command.DurableAndIdempotentListener>
listeners;
- public Map<MessageType, Message> allMessages()
- {
- var all = all();
- Map<MessageType, Message> map =
Maps.newHashMapWithExpectedSize(all.size());
- for (MessageType messageType : all)
- map.put(messageType, get(messageType));
- return map;
- }
+ public Diff(TxnId txnId,
+ Timestamp executeAt,
+ Timestamp executesAtLeast,
+ SaveStatus saveStatus,
+ Status.Durability durability,
- public <T extends Message> T get(MessageType type)
- {
- return (T) writes.get(type);
- }
+ Ballot acceptedOrCommitted,
+ Ballot promised,
- @Override
- public PreAccept preAccept()
- {
- return get(PRE_ACCEPT_REQ);
- }
+ Route<?> route,
+ PartialTxn partialTxn,
+ PartialDeps partialDeps,
+ Command.WaitingOn waitingOn,
- @Override
- public BeginRecovery beginRecover()
+ Writes writes,
+ Seekables<?, ?> additionalKeysOrRanges,
+ Listeners.Immutable<Command.DurableAndIdempotentListener>
listeners)
{
- return get(BEGIN_RECOVER_REQ);
- }
+ this.txnId = txnId;
+ this.executeAt = executeAt;
+ this.executesAtLeast = executesAtLeast;
+ this.saveStatus = saveStatus;
+ this.durability = durability;
- @Override
- public Propagate propagatePreAccept()
- {
- return get(PROPAGATE_PRE_ACCEPT_MSG);
- }
+ this.acceptedOrCommitted = acceptedOrCommitted;
+ this.promised = promised;
- @Override
- public Accept accept(Ballot ballot)
- {
- return get(ACCEPT_REQ);
- }
+ this.route = route;
+ this.partialTxn = partialTxn;
+ this.partialDeps = partialDeps;
- @Override
- public Commit commitSlowPath()
- {
- return get(COMMIT_SLOW_PATH_REQ);
+ this.writes = writes;
+ this.waitingOn = waitingOn;
+ this.additionalKeysOrRanges = additionalKeysOrRanges;
+ this.listeners = listeners;
}
- @Override
- public Commit commitMaximal()
+ // We allow only save status, waitingOn, and listeners to be updated
by non-primary transactions
+ public Diff asNonprimary()
{
- return get(COMMIT_MAXIMAL_REQ);
+ return new Diff(null, null, null, saveStatus, null, null, null,
null, null, null, waitingOn, null, null, listeners);
}
- @Override
- public Commit stableFastPath()
+ public boolean allNulls()
{
- return get(STABLE_FAST_PATH_REQ);
+ if (txnId != null) return false;
+ if (executeAt != null) return false;
+ if (executesAtLeast != null) return false;
+ if (saveStatus != null) return false;
+ if (durability != null) return false;
+ if (acceptedOrCommitted != null) return false;
+ if (promised != null) return false;
+ if (route != null) return false;
+ if (partialTxn != null) return false;
+ if (partialDeps != null) return false;
+ if (writes != null) return false;
+ if (waitingOn != null) return false;
+ if (additionalKeysOrRanges != null) return false;
+ if (listeners != null) return false;
+ return true;
}
@Override
- public Commit stableSlowPath()
- {
- return get(STABLE_SLOW_PATH_REQ);
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder("SavedDiff{");
+ if (txnId != null)
+ builder.append("txnId = ").append(txnId).append(" ");
+ if (executeAt != null)
+ builder.append("executeAt = ").append(executeAt).append(" ");
+ if (executesAtLeast != null)
+ builder.append("executesAtLeast =
").append(executesAtLeast).append(" ");
+ if (saveStatus != null)
+ builder.append("saveStatus = ").append(saveStatus).append(" ");
+ if (durability != null)
+ builder.append("durability = ").append(durability).append(" ");
+ if (acceptedOrCommitted != null)
+ builder.append("acceptedOrCommitted =
").append(acceptedOrCommitted).append(" ");
+ if (promised != null)
+ builder.append("promised = ").append(promised).append(" ");
+ if (route != null)
+ builder.append("route = ").append(route).append(" ");
+ if (partialTxn != null)
+ builder.append("partialTxn = ").append(partialTxn).append(" ");
+ if (partialDeps != null)
+ builder.append("partialDeps = ").append(partialDeps).append("
");
+ if (writes != null)
+ builder.append("writes = ").append(writes).append(" ");
+ if (waitingOn != null)
+ builder.append("waitingOn = ").append(waitingOn).append(" ");
+ if (additionalKeysOrRanges != null)
+ builder.append("additionalKeysOrRanges =
").append(additionalKeysOrRanges).append(" ");
+ if (listeners != null)
+ builder.append("listeners = ").append(listeners).append(" ");
+ builder.append("}");
+ return builder.toString();
}
+ }
- @Override
- public Commit stableMaximal()
- {
- return get(STABLE_MAXIMAL_REQ);
- }
+ static Diff diff(Command before, Command after)
+ {
+ if (Objects.equals(before, after))
+ return null;
+
+ Diff diff = new Diff(ifNotEqual(before, after, Command::txnId, false),
+ ifNotEqual(before, after, Command::executeAt,
true),
+ ifNotEqual(before, after,
Command::executesAtLeast, true),
+ ifNotEqual(before, after, Command::saveStatus,
false),
+
+ ifNotEqual(before, after, Command::durability,
false),
+ ifNotEqual(before, after,
Command::acceptedOrCommitted, false),
+ ifNotEqual(before, after, Command::promised,
false),
+
+ ifNotEqual(before, after, Command::route, true),
+ ifNotEqual(before, after, Command::partialTxn,
false),
+ ifNotEqual(before, after, Command::partialDeps,
false),
+ ifNotEqual(before, after, Journal::getWaitingOn,
true),
+ ifNotEqual(before, after, Command::writes, false),
+ ifNotEqual(before, after,
Command::additionalKeysOrRanges, false),
+ ifNotEqual(before, after,
Command::durableListeners, false));
+ if (diff.allNulls())
+ return null;
+
+ return diff;
+ }
- @Override
- public Propagate propagateStable()
- {
- return get(PROPAGATE_STABLE_MSG);
- }
+ static Command.WaitingOn getWaitingOn(Command command)
+ {
+ if (command instanceof Command.Committed)
+ return command.asCommitted().waitingOn();
- @Override
- public Apply applyMinimal()
- {
- return get(APPLY_MINIMAL_REQ);
- }
+ return null;
+ }
- @Override
- public Apply applyMaximal()
- {
- return get(APPLY_MAXIMAL_REQ);
- }
+ private static <OBJ, VAL> VAL ifNotEqual(OBJ lo, OBJ ro, Function<OBJ,
VAL> convert, boolean allowClassMismatch)
+ {
+ VAL l = null;
+ VAL r = null;
+ if (lo != null) l = convert.apply(lo);
+ if (ro != null) r = convert.apply(ro);
- @Override
- public Propagate propagateApply()
+ if (l == r)
+ return null;
+ if (l == null || r == null)
+ return r;
+ assert allowClassMismatch || l.getClass() == r.getClass() :
String.format("%s != %s", l.getClass(), r.getClass());
+
+ if (l.equals(r))
+ return null;
+
+ return r;
+ }
+
+ public static class Key
+ {
+ final TxnId timestamp;
+ final int commandStoreId;
+
+ public Key(TxnId timestamp, int commandStoreId)
{
- return get(PROPAGATE_APPLY_MSG);
+ this.timestamp = timestamp;
+ this.commandStoreId = commandStoreId;
}
- @Override
- public Propagate propagateOther()
+ public boolean equals(Object o)
{
- return get(PROPAGATE_OTHER_MSG);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Key key = (Key) o;
+ return commandStoreId == key.commandStoreId &&
Objects.equals(timestamp, key.timestamp);
}
- @Override
- public ApplyThenWaitUntilApplied applyThenWaitUntilApplied()
+ public int hashCode()
{
- return get(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ);
+ return Objects.hash(timestamp, commandStoreId);
}
}
-}
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
index ab38eb7f..22b06545 100644
--- a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
+++ b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
@@ -112,11 +112,10 @@ class BootstrapLocalTxnTest
}
return target;
}))
- // validate cmd
+ // validateRead is called implicitly _on command
completion_
.flatMap(target ->
store.execute(contextFor(localSyncId), safe -> {
SafeCommand cmd = safe.get(localSyncId,
route.homeKey());
Command current = cmd.current();
- validate.accept(current);
Assertions.assertThat(current.saveStatus()).isEqualTo(target == Cleanup.NO ?
SaveStatus.Applied : target.appliesIfNot);
}))
.begin(on.agent());
diff --git a/accord-core/src/test/java/accord/local/CheckedCommands.java
b/accord-core/src/test/java/accord/local/CheckedCommands.java
index 64b89e0e..7e93ba3c 100644
--- a/accord-core/src/test/java/accord/local/CheckedCommands.java
+++ b/accord-core/src/test/java/accord/local/CheckedCommands.java
@@ -33,34 +33,69 @@ import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Writes;
+import java.util.function.BiConsumer;
+
import static accord.utils.Invariants.illegalState;
public class CheckedCommands
{
public static void preaccept(SafeCommandStore safeStore, TxnId txnId,
PartialTxn partialTxn, FullRoute<?> route, @Nullable RoutingKey progressKey)
+ {
+ preaccept(safeStore, txnId, partialTxn, route, progressKey, (l, r) ->
{});
+ }
+
+ public static void preaccept(SafeCommandStore safeStore, TxnId txnId,
PartialTxn partialTxn, FullRoute<?> route, @Nullable RoutingKey progressKey,
BiConsumer<Command, Command> consumer)
{
SafeCommand safeCommand = safeStore.get(txnId, txnId, route);
+ Command before = safeCommand.current();
Commands.AcceptOutcome result = Commands.preaccept(safeStore,
safeCommand, txnId, txnId.epoch(), partialTxn, route, progressKey);
+ Command after = safeCommand.current();
if (result != Commands.AcceptOutcome.Success) throw
illegalState("Command mutation rejected: " + result);
+ consumer.accept(before, after);
}
public static void accept(SafeCommandStore safeStore, TxnId txnId, Ballot
ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey
progressKey, Timestamp executeAt, PartialDeps partialDeps)
{
+ accept(safeStore, txnId, ballot, route, keys, progressKey, executeAt,
partialDeps, (l, r) -> {});
+ }
+
+ public static void accept(SafeCommandStore safeStore, TxnId txnId, Ballot
ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey
progressKey, Timestamp executeAt, PartialDeps partialDeps, BiConsumer<Command,
Command> consumer)
+ {
+ SafeCommand safeCommand = safeStore.get(txnId, txnId, route);
+ Command before = safeCommand.current();
Commands.AcceptOutcome result = Commands.accept(safeStore, txnId,
ballot, route, keys, progressKey, executeAt, partialDeps);
+ Command after = safeCommand.current();
if (result != Commands.AcceptOutcome.Success) throw
illegalState("Command mutation rejected: " + result);
+ consumer.accept(before, after);
}
public static void commit(SafeCommandStore safeStore, SaveStatus
saveStatus, Ballot ballot, TxnId txnId, Route<?> route, @Nullable RoutingKey
progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps
partialDeps)
+ {
+ commit(safeStore, saveStatus, ballot, txnId, route, progressKey,
partialTxn, executeAt, partialDeps, (l, r) -> {});
+ }
+
+ public static void commit(SafeCommandStore safeStore, SaveStatus
saveStatus, Ballot ballot, TxnId txnId, Route<?> route, @Nullable RoutingKey
progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps
partialDeps, BiConsumer<Command, Command> consumer)
{
SafeCommand safeCommand = safeStore.get(txnId, txnId, route);
+ Command before = safeCommand.current();
Commands.CommitOutcome result = Commands.commit(safeStore,
safeCommand, saveStatus, ballot, txnId, route, progressKey, partialTxn,
executeAt, partialDeps);
+ Command after = safeCommand.current();
if (result != Commands.CommitOutcome.Success) throw
illegalState("Command mutation rejected: " + result);
+ consumer.accept(before, after);
}
public static void apply(SafeCommandStore safeStore, TxnId txnId, Route<?>
route, @Nullable RoutingKey progressKey, Timestamp executeAt, @Nullable
PartialDeps partialDeps, @Nullable PartialTxn partialTxn, Writes writes, Result
result)
+ {
+ apply(safeStore, txnId, route, progressKey, executeAt, partialDeps,
partialTxn, writes, result, (l, r) -> {});
+ }
+
+ public static void apply(SafeCommandStore safeStore, TxnId txnId, Route<?>
route, @Nullable RoutingKey progressKey, Timestamp executeAt, @Nullable
PartialDeps partialDeps, @Nullable PartialTxn partialTxn, Writes writes, Result
result, BiConsumer<Command, Command> consumer)
{
SafeCommand safeCommand = safeStore.get(txnId, txnId, route);
+ Command before = safeCommand.current();
Commands.ApplyOutcome outcome = Commands.apply(safeStore, safeCommand,
txnId, route, progressKey, executeAt, partialDeps, partialTxn, writes, result);
+ Command after = safeCommand.current();
if (outcome != Commands.ApplyOutcome.Success) throw
illegalState("Command mutation rejected: " + outcome);
+ consumer.accept(before, after);
}
}
diff --git a/accord-core/src/test/java/accord/utils/ReducingRangeMapTest.java
b/accord-core/src/test/java/accord/utils/ReducingRangeMapTest.java
index ca5cf4a9..68ae553b 100644
--- a/accord-core/src/test/java/accord/utils/ReducingRangeMapTest.java
+++ b/accord-core/src/test/java/accord/utils/ReducingRangeMapTest.java
@@ -35,8 +35,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import accord.api.RoutingKey;
import accord.impl.IntKey;
@@ -53,7 +51,6 @@ import static java.lang.Integer.MIN_VALUE;
// TODO (desired): test start inclusive ranges
public class ReducingRangeMapTest
{
- static final Logger logger =
LoggerFactory.getLogger(ReducingRangeMapTest.class);
static final ReducingRangeMap<Timestamp> EMPTY = new ReducingRangeMap<>();
static final RoutingKey MINIMUM_EXCL = new IntKey.Routing(MIN_VALUE);
static final RoutingKey MAXIMUM_EXCL = new IntKey.Routing(MAX_VALUE);
@@ -214,7 +211,6 @@ public class ReducingRangeMapTest
try
{
Random random = new Random(seed);
- logger.info(id);
List<RandomWithCanonical> merge = new ArrayList<>();
while (numberOfMerges-- > 0)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]