This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2b9e5400 Accord fixes: - Bad ArrayBuffers recycling logic - RX must
ensure dependencies TRANSITIVE_VISIBLE - Permit constructing "antiRange" that
spans multiple prefixes - Not computing range CommandSummary IsDep correctly
- Truncated commands that aren't shard durable could not repopulate CFK on
replay, permitting recovery of another command to make an incorrect decision -
NPE on async persist of RX (i.e. supplying no callback) - NPE in
Builder.shouldCleanup when durabili [...]
2b9e5400 is described below
commit 2b9e54004f702c7b626e94391af21fa080292975
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sun Feb 23 22:14:31 2025 +0000
Accord fixes:
- Bad ArrayBuffers recycling logic
- RX must ensure dependencies TRANSITIVE_VISIBLE
- Permit constructing "antiRange" that spans multiple prefixes
- Not computing range CommandSummary IsDep correctly
- Truncated commands that aren't shard durable could not repopulate CFK on
replay, permitting recovery of another command to make an incorrect decision
- NPE on async persist of RX (i.e. supplying no callback)
- NPE in Builder.shouldCleanup when durability is null
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20370
---
.../accord/coordinate/CoordinationAdapter.java | 6 +-
.../java/accord/coordinate/RecoverWithRoute.java | 14 +-
.../src/main/java/accord/impl/AbstractLoader.java | 1 +
.../src/main/java/accord/impl/CommandChange.java | 10 +-
.../main/java/accord/impl/RetiredSafeCommand.java | 72 -------
.../src/main/java/accord/local/Cleanup.java | 18 +-
.../src/main/java/accord/local/Command.java | 86 ++++----
.../main/java/accord/local/CommandSummaries.java | 14 +-
.../src/main/java/accord/local/Commands.java | 63 +++---
.../main/java/accord/local/RedundantBefore.java | 2 +-
.../main/java/accord/local/cfk/CommandsForKey.java | 2 +-
.../src/main/java/accord/local/cfk/Updating.java | 25 ++-
.../src/main/java/accord/messages/Propagate.java | 4 +-
.../src/main/java/accord/primitives/Deps.java | 111 +++++++----
.../src/main/java/accord/primitives/KeyDeps.java | 2 +-
.../java/accord/primitives/KeyOrRangeDeps.java | 33 ++++
.../src/main/java/accord/primitives/Range.java | 22 +++
.../src/main/java/accord/primitives/RangeDeps.java | 7 +-
.../main/java/accord/primitives/SaveStatus.java | 19 +-
.../src/main/java/accord/utils/ArrayBuffers.java | 13 +-
.../main/java/accord/utils/RelationMultiMap.java | 164 ++++++++--------
.../test/java/accord/impl/PrefixedIntHashKey.java | 8 +-
.../test/java/accord/utils/ArrayBuffersTest.java | 217 +++++++++++++++++++++
23 files changed, 607 insertions(+), 306 deletions(-)
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
index a38d112c..6981a076 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
@@ -336,7 +336,8 @@ public interface CoordinationAdapter<R>
@Override
void invokeSuccess(Node node, FullRoute<?> route, TxnId txnId,
Timestamp executeAt, Txn txn, Deps deps, BiConsumer<? super Result, Throwable>
callback)
{
- callback.accept(txn.result(txnId, executeAt, null), null);
+ if (callback != null)
+ callback.accept(txn.result(txnId, executeAt, null), null);
}
}
@@ -347,7 +348,8 @@ public interface CoordinationAdapter<R>
@Override
void invokeSuccess(Node node, FullRoute<?> route, TxnId txnId,
Timestamp executeAt, Txn txn, Deps deps, BiConsumer<? super SyncPoint<U>,
Throwable> callback)
{
- callback.accept(new SyncPoint<>(txnId, executeAt, deps,
(FullRoute<U>)route), null);
+ if (callback != null)
+ callback.accept(new SyncPoint<>(txnId, executeAt, deps,
(FullRoute<U>)route), null);
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 57e36815..078b3fcb 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -256,9 +256,17 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
{
Participants<?> hasDeps =
full.map.knownFor(Known.DepsOnly, route);
missingDeps = route.without(hasDeps);
- // convert to plain Deps as when we merge with latest
deps we may erroneously keep the
- // PartialDeps if e.g. an empty range of deps is found
- deps = new
Deps(full.stableDeps.reconstitutePartial(hasDeps));
+ if (full.stableDeps == null)
+ {
+ Invariants.require(hasDeps.isEmpty());
+ deps = Deps.NONE;
+ }
+ else
+ {
+ // convert to plain Deps as when we merge with
latest deps we may erroneously keep the
+ // PartialDeps if e.g. an empty range of deps is
found
+ deps = new
Deps(full.stableDeps.reconstitutePartial(hasDeps));
+ }
}
LatestDeps.withStable(node.coordinationAdapter(txnId,
Recovery), node, txnId, full.executeAt, full.partialTxn, deps, missingDeps,
route, SHARE, route, callback, mergedDeps -> {
node.withEpoch(full.executeAt.epoch(), node.agent(), t
-> WrappableException.wrap(t), () -> {
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
index c55ac6c8..566b07d1 100644
--- a/accord-core/src/main/java/accord/impl/AbstractLoader.java
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -49,6 +49,7 @@ public abstract class AbstractLoader implements Journal.Loader
case NO:
break;
case INVALIDATE:
+ case TRUNCATE_WITH_OUTCOME_AND_DEPS:
case TRUNCATE_WITH_OUTCOME:
case TRUNCATE:
case ERASE:
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java
b/accord-core/src/main/java/accord/impl/CommandChange.java
index e38ff635..021ebd60 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -268,6 +268,8 @@ public class CommandChange
if (saveStatus == null || participants == null)
return Cleanup.NO;
+ Status.Durability durability = this.durability;
+ if (durability == null) durability = NotDurable;
Cleanup cleanup = Cleanup.shouldCleanup(input, agent, txnId,
executeAt, saveStatus, durability, participants, redundantBefore,
durableBefore);
if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
cleanup = this.cleanup;
@@ -340,6 +342,7 @@ public class CommandChange
this.result = newValue;
}
+ // TODO (expected): we shouldn't need to filter participants here, we
will do it anyway before using in SafeCommandStore
public Command construct(RedundantBefore redundantBefore)
{
if (!nextCalled)
@@ -378,21 +381,22 @@ public class CommandChange
return executed(txnId, saveStatus, durability,
participants, promised, executeAt, partialTxn, partialDeps,
acceptedOrCommitted, waitingOn, writes, result);
case Truncated:
case Invalidated:
- return truncated(txnId, saveStatus, durability,
participants, executeAt, executesAtLeast, writes, result);
+ return truncated(txnId, saveStatus, durability,
participants, executeAt, partialDeps, executesAtLeast, writes, result);
default:
throw new UnhandledEnum(saveStatus.status);
}
}
- private static Command.Truncated truncated(TxnId txnId, SaveStatus
status, Status.Durability durability, StoreParticipants participants, Timestamp
executeAt, Timestamp executesAtLeast, Writes writes, Result result)
+ private static Command.Truncated truncated(TxnId txnId, SaveStatus
status, Status.Durability durability, StoreParticipants participants, Timestamp
executeAt, PartialDeps partialDeps, Timestamp executesAtLeast, Writes writes,
Result result)
{
switch (status)
{
default: throw new UnhandledEnum(status);
+ case TruncatedApplyWithOutcomeAndDeps:
case TruncatedApplyWithOutcome:
case TruncatedApply:
case TruncatedUnapplied:
- return Command.Truncated.truncated(txnId, status,
durability, participants, executeAt, writes, result, executesAtLeast);
+ return Command.Truncated.truncated(txnId, status,
durability, participants, executeAt, partialDeps, writes, result,
executesAtLeast);
case Vestigial:
return vestigial(txnId, participants);
case Erased:
diff --git a/accord-core/src/main/java/accord/impl/RetiredSafeCommand.java
b/accord-core/src/main/java/accord/impl/RetiredSafeCommand.java
deleted file mode 100644
index b92bcaac..00000000
--- a/accord-core/src/main/java/accord/impl/RetiredSafeCommand.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.impl;
-
-import accord.local.Command;
-import accord.local.SafeCommand;
-import accord.local.StoreParticipants;
-import accord.primitives.SaveStatus;
-import accord.primitives.TxnId;
-import accord.utils.Invariants;
-
-import static accord.primitives.SaveStatus.Erased;
-import static accord.primitives.SaveStatus.Vestigial;
-import static accord.primitives.Status.Durability.NotDurable;
-import static accord.primitives.Status.Durability.UniversalOrInvalidated;
-
-public class RetiredSafeCommand extends SafeCommand
-{
- final Command erased;
-
- public RetiredSafeCommand(TxnId txnId, SaveStatus saveStatus)
- {
- super(txnId);
- this.erased = erased(txnId, saveStatus);
- }
-
- public static Command erased(TxnId txnId, SaveStatus saveStatus)
- {
- Invariants.requireArgument(saveStatus.compareTo(Vestigial) >= 0);
- return new Command.Truncated(txnId, saveStatus, saveStatus == Erased ?
UniversalOrInvalidated : NotDurable, StoreParticipants.empty(txnId), null,
null, null);
- }
-
- @Override
- public Command current()
- {
- return erased;
- }
-
- @Override
- public void invalidate()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean invalidated()
- {
- return false;
- }
-
- @Override
- protected void set(Command command)
- {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java
b/accord-core/src/main/java/accord/local/Cleanup.java
index 4bcee716..ea12b16c 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -51,6 +51,7 @@ import static accord.primitives.SaveStatus.Erased;
import static accord.primitives.SaveStatus.Invalidated;
import static accord.primitives.SaveStatus.TruncatedApply;
import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
+import static accord.primitives.SaveStatus.TruncatedApplyWithOutcomeAndDeps;
import static accord.primitives.SaveStatus.Uninitialised;
import static accord.primitives.SaveStatus.Vestigial;
import static accord.primitives.Status.Applied;
@@ -69,8 +70,7 @@ import static accord.primitives.TxnId.Cardinality.Any;
public enum Cleanup
{
NO(Uninitialised),
- // we don't know if the command has been applied or invalidated as we have
incomplete information
- // so erase what information we don't need in future to decide this
+ TRUNCATE_WITH_OUTCOME_AND_DEPS(TruncatedApplyWithOutcomeAndDeps),
TRUNCATE_WITH_OUTCOME(TruncatedApplyWithOutcome),
TRUNCATE(TruncatedApply),
VESTIGIAL(Vestigial),
@@ -177,6 +177,9 @@ public enum Cleanup
if (ifUndecided != null)
return ifUndecided;
+ if (!redundant.all(SHARD_ONLY_APPLIED))
+ return truncateWithOutcomeAndDeps(txnId);
+
if (input == FULL)
{
Participants<?> waitsOn = participants.waitsOn();
@@ -198,7 +201,7 @@ public enum Cleanup
case Local:
case NotDurable:
case ShardUniversal:
- return truncateWithOutcome();
+ return truncateWithOutcome(txnId);
case MajorityOrInvalidated:
case Majority:
@@ -290,7 +293,14 @@ public enum Cleanup
{
return INVALIDATE;
}
- private static Cleanup truncateWithOutcome() { return
TRUNCATE_WITH_OUTCOME; }
+ private static Cleanup truncateWithOutcomeAndDeps(TxnId txnId)
+ {
+ return TRUNCATE_WITH_OUTCOME_AND_DEPS;
+ }
+ private static Cleanup truncateWithOutcome(TxnId txnId)
+ {
+ return TRUNCATE_WITH_OUTCOME;
+ }
private static Cleanup truncate(TxnId txnId)
{
return TRUNCATE;
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index dfd522fb..8d84b14b 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -32,6 +32,7 @@ import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.KeyDeps;
import accord.primitives.Known;
+import accord.primitives.Known.Outcome;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
@@ -61,6 +62,7 @@ import com.google.common.annotations.VisibleForTesting;
import static accord.local.Command.Committed.committed;
import static accord.local.Command.Executed.executed;
import static accord.local.Command.NotAcceptedWithoutDefinition.notAccepted;
+import static accord.primitives.Known.KnownDeps.DepsKnown;
import static accord.primitives.Known.KnownDeps.DepsUnknown;
import static accord.primitives.Known.KnownExecuteAt.ApplyAtKnown;
import static accord.primitives.Routable.Domain.Key;
@@ -68,6 +70,8 @@ import static accord.primitives.Routable.Domain.Range;
import static accord.primitives.Routables.Slice;
import static accord.primitives.SaveStatus.AcceptedInvalidate;
import static accord.primitives.SaveStatus.TruncatedApply;
+import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
+import static accord.primitives.SaveStatus.TruncatedApplyWithOutcomeAndDeps;
import static accord.primitives.SaveStatus.TruncatedUnapplied;
import static accord.primitives.SaveStatus.Vestigial;
import static accord.primitives.SaveStatus.ReadyToExecute;
@@ -76,7 +80,6 @@ import static accord.primitives.Status.Durability.Local;
import static accord.primitives.Status.Durability.NotDurable;
import static accord.primitives.Status.Durability.ShardUniversal;
import static accord.primitives.Status.Durability.UniversalOrInvalidated;
-import static accord.primitives.Status.Stable;
import static accord.primitives.Txn.Kind.Write;
import static accord.utils.Invariants.Paranoia.LINEAR;
import static accord.utils.Invariants.Paranoia.NONE;
@@ -414,7 +417,7 @@ public abstract class Command implements ICommand
public static Truncated erased(TxnId txnId, Status.Durability
durability, StoreParticipants participants)
{
- return validate(new Truncated(txnId, SaveStatus.Erased,
durability, participants, null, null, null));
+ return validate(new Truncated(txnId, SaveStatus.Erased,
durability, participants, null, null, null, null));
}
public static Truncated vestigial(Command command)
@@ -429,7 +432,7 @@ public abstract class Command implements ICommand
public static Truncated vestigial(TxnId txnId, StoreParticipants
participants)
{
- return validate(new Truncated(txnId, SaveStatus.Vestigial,
NotDurable, participants, null, null, null));
+ return validate(new Truncated(txnId, SaveStatus.Vestigial,
NotDurable, participants, null, null, null, null));
}
public static Truncated truncated(Command command)
@@ -442,13 +445,17 @@ public abstract class Command implements ICommand
Invariants.requireArgument(command.known().isExecuteAtKnown());
// TODO (expected): centralise this translation so applied
consistently
SaveStatus newSaveStatus = command.known().is(ApplyAtKnown) ?
TruncatedApply : TruncatedUnapplied;
- Durability durability =
Durability.mergeAtLeast(command.durability(), ShardUniversal);
- if (command.txnId().awaitsOnlyDeps())
- {
- Timestamp executesAtLeast = command.hasBeen(Stable) ?
command.executesAtLeast() : null;
- return validate(new TruncatedAwaitsOnlyDeps(command.txnId(),
newSaveStatus, durability, participants, command.executeAt(), null, null,
executesAtLeast));
- }
- return validate(new Truncated(command.txnId(), newSaveStatus,
durability, participants, command.executeAt(), null, null));
+ return truncated(command, participants, newSaveStatus);
+ }
+
+ public static Truncated truncatedApplyWithOutcomeAndDeps(Executed
command)
+ {
+ return truncatedApplyWithOutcomeAndDeps(command,
command.participants());
+ }
+
+ public static Truncated truncatedApplyWithOutcomeAndDeps(Executed
command, StoreParticipants participants)
+ {
+ return truncated(command, participants,
TruncatedApplyWithOutcomeAndDeps);
}
public static Truncated truncatedApplyWithOutcome(Executed command)
@@ -458,44 +465,50 @@ public abstract class Command implements ICommand
public static Truncated truncatedApplyWithOutcome(Executed command,
StoreParticipants participants)
{
- Durability durability =
Durability.mergeAtLeast(command.durability(), ShardUniversal);
- if (command.txnId().awaitsOnlyDeps())
- return validate(new TruncatedAwaitsOnlyDeps(command.txnId(),
SaveStatus.TruncatedApplyWithOutcome, durability, command.participants(),
command.executeAt(), command.writes, command.result,
command.executesAtLeast()));
- return validate(new Truncated(command.txnId(),
SaveStatus.TruncatedApplyWithOutcome, durability, participants,
command.executeAt(), command.writes, command.result));
+ return truncated(command, participants, TruncatedApplyWithOutcome);
}
- public static Truncated truncated(ICommand common, SaveStatus
saveStatus, Timestamp executeAt, Writes writes, Result result)
+ public static Truncated truncated(TxnId txnId, SaveStatus saveStatus,
Durability durability, StoreParticipants participants, @Nullable Timestamp
executeAt, @Nullable PartialDeps partialDeps, @Nullable Writes writes, Result
result)
{
- return truncated(common.txnId(), saveStatus, common.durability(),
common.participants(), executeAt, writes, result);
+ Invariants.requireArgument(!txnId.awaitsOnlyDeps());
+ durability = checkTruncatedApplyInvariants(durability, saveStatus,
executeAt);
+ return validate(new Truncated(txnId, saveStatus, durability,
participants, executeAt, partialDeps, writes, result));
}
- public static Truncated truncated(TxnId txnId, SaveStatus saveStatus,
Durability durability, StoreParticipants participants, Timestamp executeAt,
Writes writes, Result result)
+ public static Truncated truncated(Command command, StoreParticipants
participants, SaveStatus newSaveStatus)
{
- Invariants.requireArgument(!txnId.awaitsOnlyDeps());
- durability = checkTruncatedApplyInvariants(durability, saveStatus,
executeAt);
- return validate(new Truncated(txnId, saveStatus, durability,
participants, executeAt, writes, result));
+ Timestamp executesAtLeast = command.executesAtLeast();
+ PartialDeps partialDeps = newSaveStatus.known.is(DepsKnown) ?
command.partialDeps : null;
+ Writes writes = null;
+ Result result = null;
+ if (newSaveStatus.known.is(Outcome.Apply))
+ {
+ writes = command.writes();
+ result = command.result();
+ }
+ return truncated(command.txnId(), newSaveStatus,
command.durability(), participants, command.executeAt, partialDeps, writes,
result, executesAtLeast);
}
- public static Truncated truncated(ICommand common, SaveStatus
saveStatus, Timestamp executeAt, Writes writes, Result result, @Nullable
Timestamp dependencyExecutesAt)
+ public static Truncated truncated(ICommand common, SaveStatus
saveStatus, @Nullable Timestamp executeAt, @Nullable PartialDeps partialDeps,
@Nullable Writes writes, @Nullable Result result, @Nullable Timestamp
executesAtLeast)
{
- return truncated(common.txnId(), saveStatus, common.durability(),
common.participants(), executeAt, writes, result, dependencyExecutesAt);
+ return truncated(common.txnId(), saveStatus, common.durability(),
common.participants(), executeAt, partialDeps, writes, result, executesAtLeast);
}
- public static Truncated truncated(TxnId txnId, SaveStatus saveStatus,
Durability durability, StoreParticipants participants, Timestamp executeAt,
Writes writes, Result result, @Nullable Timestamp dependencyExecutesAt)
+ public static Truncated truncated(TxnId txnId, SaveStatus saveStatus,
Durability durability, StoreParticipants participants, Timestamp executeAt,
PartialDeps partialDeps, Writes writes, Result result, @Nullable Timestamp
executesAtLeast)
{
+ durability = checkTruncatedApplyInvariants(durability, saveStatus,
executeAt);
if (!txnId.awaitsOnlyDeps())
{
- Invariants.require(dependencyExecutesAt == null);
- return truncated(txnId, saveStatus, durability, participants,
executeAt, writes, result);
+ Invariants.require(executesAtLeast == null);
+ return truncated(txnId, saveStatus, durability, participants,
executeAt, partialDeps, writes, result);
}
- durability = checkTruncatedApplyInvariants(durability, saveStatus,
executeAt);
- return validate(new TruncatedAwaitsOnlyDeps(txnId, saveStatus,
durability, participants, executeAt, writes, result, dependencyExecutesAt));
+ return validate(new TruncatedAwaitsOnlyDeps(txnId, saveStatus,
durability, participants, executeAt, partialDeps, writes, result,
executesAtLeast));
}
private static Durability checkTruncatedApplyInvariants(Durability
durability, SaveStatus saveStatus, Timestamp executeAt)
{
Invariants.requireArgument(executeAt != null);
- Invariants.requireArgument(saveStatus == SaveStatus.TruncatedApply
|| saveStatus == SaveStatus.TruncatedUnapplied || saveStatus ==
SaveStatus.TruncatedApplyWithOutcome);
+ Invariants.requireArgument(saveStatus.status == Status.Truncated);
return Durability.mergeAtLeast(durability, ShardUniversal);
}
@@ -514,7 +527,7 @@ public abstract class Command implements ICommand
{
// NOTE: we *must* save participants here so that on replay we
properly repopulate CommandsForKey
// (otherwise we may see this as a transitive transaction, but
never mark it invalidated)
- return validate(new Truncated(txnId, SaveStatus.Invalidated,
UniversalOrInvalidated, participants, Timestamp.NONE, null, null));
+ return validate(new Truncated(txnId, SaveStatus.Invalidated,
UniversalOrInvalidated, participants, Timestamp.NONE, null, null, null));
}
@Nullable final Writes writes;
@@ -527,9 +540,9 @@ public abstract class Command implements ICommand
this.result = copy.result();
}
- public Truncated(TxnId txnId, SaveStatus saveStatus, Durability
durability, @Nonnull StoreParticipants participants, @Nullable Timestamp
executeAt, @Nullable Writes writes, @Nullable Result result)
+ public Truncated(TxnId txnId, SaveStatus saveStatus, Durability
durability, @Nonnull StoreParticipants participants, @Nullable Timestamp
executeAt, @Nullable PartialDeps partialDeps, @Nullable Writes writes,
@Nullable Result result)
{
- super(txnId, saveStatus, durability, participants, Ballot.MAX,
executeAt, null, null, Ballot.MAX);
+ super(txnId, saveStatus, durability, participants, Ballot.MAX,
executeAt, null, partialDeps, Ballot.MAX);
this.writes = writes;
this.result = result;
}
@@ -560,7 +573,7 @@ public abstract class Command implements ICommand
@Override
public Command updateAttributes(StoreParticipants participants, Ballot
promised, Durability durability)
{
- return validate(new Truncated(txnId(), saveStatus(), durability,
participants, executeAt(), writes, result));
+ return validate(new Truncated(txnId(), saveStatus(), durability,
participants, executeAt(), partialDeps(), writes, result));
}
}
@@ -578,9 +591,9 @@ public abstract class Command implements ICommand
this.executesAtLeast = executesAtLeast;
}
- public TruncatedAwaitsOnlyDeps(TxnId txnId, SaveStatus saveStatus,
Durability durability, StoreParticipants participants, @Nullable Timestamp
executeAt, @Nullable Writes writes, @Nullable Result result, @Nullable
Timestamp executesAtLeast)
+ public TruncatedAwaitsOnlyDeps(TxnId txnId, SaveStatus saveStatus,
Durability durability, StoreParticipants participants, @Nullable Timestamp
executeAt, @Nullable PartialDeps partialDeps, @Nullable Writes writes,
@Nullable Result result, @Nullable Timestamp executesAtLeast)
{
- super(txnId, saveStatus, durability, participants, executeAt,
writes, result);
+ super(txnId, saveStatus, durability, participants, executeAt,
partialDeps, writes, result);
this.executesAtLeast = executesAtLeast;
}
@@ -599,7 +612,7 @@ public abstract class Command implements ICommand
@Override
public Command updateAttributes(StoreParticipants participants, Ballot
promised, Durability durability)
{
- return validate(new TruncatedAwaitsOnlyDeps(txnId(), saveStatus(),
durability, participants, executeAt(), writes, result, executesAtLeast));
+ return validate(new TruncatedAwaitsOnlyDeps(txnId(), saveStatus(),
durability, participants, executeAt(), partialDeps(), writes, result,
executesAtLeast));
}
}
@@ -1652,9 +1665,10 @@ public abstract class Command implements ICommand
case Applying:
case Applied:
return validateCommandClass(status, Executed.class, klass);
- case TruncatedApply:
case TruncatedUnapplied:
+ case TruncatedApply:
case TruncatedApplyWithOutcome:
+ case TruncatedApplyWithOutcomeAndDeps:
if (txnId.awaitsOnlyDeps())
return validateCommandClass(status,
TruncatedAwaitsOnlyDeps.class, klass);
case Erased:
diff --git a/accord-core/src/main/java/accord/local/CommandSummaries.java
b/accord-core/src/main/java/accord/local/CommandSummaries.java
index 7e0ef190..96cadd62 100644
--- a/accord-core/src/main/java/accord/local/CommandSummaries.java
+++ b/accord-core/src/main/java/accord/local/CommandSummaries.java
@@ -219,12 +219,14 @@ public interface CommandSummaries
}
else
{
- boolean isStable = summaryStatus.compareTo(ACCEPTED)
>= 0;
- // this should be in same domain as intersecting, as
will be taken from relevant Deps entry
- Participants<?> participants =
partialDeps.participants(findAsDep);
- isDep = participants != null &&
participants.containsAll(intersecting)
- ? (isStable ? IsDep.IS_STABLE_DEP :
IsDep.IS_COORD_DEP)
- : (isStable ? IsDep.IS_NOT_STABLE_DEP :
IsDep.IS_NOT_COORD_DEP);
+ boolean isCoordDeps =
summaryStatus.compareTo(ACCEPTED) < 0;
+ int index = partialDeps.indexOf(findAsDep);
+ // TODO (desired): don't construct participants, pass
intersecting as parameter
+ boolean isAnyDep = index >= 0 &&
partialDeps.isStable(index)
+ &&
partialDeps.participants(index).containsAll(intersecting);
+
+ isDep = isAnyDep ? (isCoordDeps ? IsDep.IS_COORD_DEP
: IsDep.IS_STABLE_DEP)
+ : (isCoordDeps ?
IsDep.IS_NOT_COORD_DEP : IsDep.IS_NOT_STABLE_DEP);
}
}
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index 477b2757..c3d1b90e 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -69,7 +69,6 @@ import static accord.local.Cleanup.shouldCleanup;
import static accord.local.Command.Truncated.erased;
import static accord.local.Command.Truncated.invalidated;
import static accord.local.Command.Truncated.truncated;
-import static accord.local.Command.Truncated.truncatedApplyWithOutcome;
import static accord.local.Command.Truncated.vestigial;
import static accord.local.Commands.Validated.INSUFFICIENT;
import static accord.local.Commands.Validated.UPDATE_TXN_IGNORE_DEPS;
@@ -120,6 +119,7 @@ import static accord.primitives.Txn.Kind.EphemeralRead;
import static accord.primitives.Txn.Kind.Write;
import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps;
import static accord.utils.Invariants.illegalState;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
public class Commands
{
@@ -554,15 +554,17 @@ public class Commands
}
}
- protected static void postApply(SafeCommandStore safeStore, Command
command)
+ protected static void postApply(SafeCommandStore safeStore, TxnId txnId,
long t0)
{
+ SafeCommand safeCommand = safeStore.get(txnId);
+ Command command = safeCommand.current();
logger.trace("{} applied, setting status to Applied and notifying
listeners", command);
- if (!command.hasBeen(Applied))
- {
- SafeCommand safeCommand = safeStore.get(command.txnId());
- safeCommand.applied(safeStore);
- safeStore.notifyListeners(safeCommand, command);
- }
+ if (command.hasBeen(Applied))
+ return;
+
+ safeCommand.applied(safeStore);
+ safeStore.notifyListeners(safeCommand, command);
+ if (t0 >= 0)
safeStore.agent().metricsEventsListener().onApplied(command, t0);
}
/**
@@ -584,16 +586,13 @@ public class Commands
// TODO (required, API): do we care about tracking the write
persistence latency, when this is just a memtable write?
// the only reason it will be slow is because Memtable flushes are
backed-up (which will be reported elsewhere)
// TODO (required): this is anyway non-monotonic and milliseconds
granularity
- long t0 = safeStore.node().now();
+ long t0 = safeStore.node().elapsed(MICROSECONDS);
TxnId txnId = command.txnId();
Participants<?> executes = command.participants().stillExecutes(); //
including any keys we aren't writing
return command.writes().apply(safeStore, executes,
command.partialTxn())
// TODO (expected): once we guarantee execution order
KeyHistory can be ASYNC
.flatMap(unused -> unsafeStore.build(contextFor(txnId,
executes, SYNC), ss -> {
- Command cmd = ss.get(txnId).current();
- if (!cmd.hasBeen(Applied))
- ss.agent().metricsEventsListener().onApplied(cmd, t0);
- postApply(ss, command);
+ postApply(ss, txnId, t0);
return null;
}));
}
@@ -604,14 +603,15 @@ public class Commands
CommandStore unsafeStore = safeStore.commandStore();
Command.Executed executed = command.asExecuted();
Participants<?> executes = executed.participants().stillExecutes();
- if (!executes.isEmpty())
- return command.writes().apply(safeStore, executes,
command.partialTxn())
- .flatMap(unused -> unsafeStore.build(context, ss -> {
- postApply(ss, command);
- return null;
- }));
- else
+ if (executes.isEmpty())
return AsyncChains.success(null);
+
+ TxnId txnId = command.txnId();
+ return command.writes().apply(safeStore, executes,
command.partialTxn())
+ .flatMap(unused -> unsafeStore.build(context, ss -> {
+ postApply(ss, txnId, -1);
+ return null;
+ }));
}
public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand
safeCommand, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
@@ -722,6 +722,7 @@ public class Commands
switch (dependency.saveStatus())
{
default: throw new AssertionError("Unhandled saveStatus: " +
dependency.saveStatus());
+ case TruncatedApplyWithOutcomeAndDeps:
case TruncatedApplyWithOutcome:
case TruncatedApply:
case TruncatedUnapplied:
@@ -826,32 +827,22 @@ public class Commands
maybeExecute(safeStore, safeCommand, false, true);
}
- public static void setTruncatedApplyOrErasedVestigial(SafeCommandStore
safeStore, SafeCommand safeCommand, StoreParticipants participants, @Nullable
Timestamp executeAt)
+ public static void setTruncatedOrVestigial(SafeCommandStore safeStore,
SafeCommand safeCommand, StoreParticipants participants)
{
Command command = safeCommand.current();
SaveStatus saveStatus = command.saveStatus();
if (saveStatus.compareTo(TruncatedApply) >= 0) return;
participants = command.participants().supplementOrMerge(saveStatus,
participants);
- if (executeAt == null) executeAt = command.executeAtIfKnown();
+ Timestamp executeAt = command.executeAtIfKnown();
if (participants.route() == null || executeAt == null)
{
- safeCommand.update(safeStore,
Command.Truncated.vestigial(command));
+ safeCommand.update(safeStore, vestigial(command));
if (participants.route() != null &&
!safeStore.coordinateRanges(command.txnId()).contains(participants.route().homeKey()))
safeStore.progressLog().clear(command.txnId());
}
else
{
- command = command.updateParticipants(participants);
- if (!safeCommand.txnId().awaitsOnlyDeps())
- {
- safeCommand.update(safeStore,
Command.Truncated.truncated(command, TruncatedApply, executeAt, null, null));
- }
- else if (safeCommand.current().saveStatus().hasBeen(Applied))
- {
- Timestamp executesAtLeast =
safeCommand.current().executesAtLeast();
- if (executesAtLeast == null) safeCommand.update(safeStore,
erased(command));
- else safeCommand.update(safeStore,
Command.Truncated.truncated(command, TruncatedApply, executeAt, null, null,
executesAtLeast));
- }
+ safeCommand.update(safeStore, truncated(command, participants));
safeStore.progressLog().clear(command.txnId());
}
}
@@ -929,11 +920,11 @@ public class Commands
result = invalidated(command, newParticipants);
break;
+ case TRUNCATE_WITH_OUTCOME_AND_DEPS:
case TRUNCATE_WITH_OUTCOME:
- Invariants.requireArgument(!command.hasBeen(Truncated), "%s",
command);
Invariants.requireArgument(command.known().is(Apply));
Invariants.requireArgument(command.known().is(ApplyAtKnown));
- result = truncatedApplyWithOutcome(command.asExecuted());
+ result = truncated(command, newParticipants,
cleanup.appliesIfNot);
break;
case TRUNCATE:
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 50966703..9b23cae3 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -994,7 +994,7 @@ public class RedundantBefore extends
ReducingRangeMap<RedundantBefore.Bounds>
private boolean mayFilterStaleOrPreBootstrapOrRetiredOrNotOwned(TxnId
txnId, @Nullable Timestamp executeAt, Participants<?> participants)
{
- return (minLocallyRetiredEpoch < txnId.epoch() &&
locallyRetiredRanges.intersects(participants))
+ return (minLocallyRetiredEpoch <= txnId.epoch() &&
locallyRetiredRanges.intersects(participants))
|| (executeAt != null && executeAt.epoch() < maxStartEpoch)
|| mayFilterStaleOrPreBootstrap(txnId, participants);
}
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index 1f551ae3..f2fab5ca 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -831,7 +831,6 @@ public class CommandsForKey extends CommandsForKeyUpdate
public enum InternalStatus
{
- // TODO (expected): use TRANSITIVE instead of TRANSITIVE_VISIBLE when
we don't need to sync dependencies
TRANSITIVE
(SummaryStatus.NOT_DIRECTLY_WITNESSED, false, false, false, false),
TRANSITIVE_VISIBLE
(SummaryStatus.NOT_DIRECTLY_WITNESSED, false, false, false, false),
PREACCEPTED_WITHOUT_DEPS (SummaryStatus.PREACCEPTED,
false, false, true, false),
@@ -883,6 +882,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
FROM_SAVE_STATUS.put(SaveStatus.PreApplied, STABLE);
FROM_SAVE_STATUS.put(SaveStatus.Applying, STABLE);
FROM_SAVE_STATUS.put(SaveStatus.Applied, APPLIED_NOT_DURABLE);
+ FROM_SAVE_STATUS.put(SaveStatus.TruncatedApplyWithOutcomeAndDeps,
APPLIED_DURABLE);
// We don't map TruncatedApplyX or Erased as we want to retain
them as APPLIED
// esp. to support pruning where we expect the prunedBefore
entr*ies* to be APPLIED
// Note importantly that we have multiple logical pruned befores -
the last APPLIED
diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java
b/accord-core/src/main/java/accord/local/cfk/Updating.java
index 1becfcad..47a92b92 100644
--- a/accord-core/src/main/java/accord/local/cfk/Updating.java
+++ b/accord-core/src/main/java/accord/local/cfk/Updating.java
@@ -52,11 +52,13 @@ import accord.utils.RelationMultiMap;
import accord.utils.SortedArrays;
import accord.utils.SortedList.MergeCursor;
+import static
accord.api.ProtocolModifiers.Toggles.isTransitiveDependencyVisible;
import static accord.local.CommandSummaries.SummaryStatus.APPLIED;
import static accord.local.KeyHistory.SYNC;
import static accord.local.cfk.CommandsForKey.InternalStatus.COMMITTED;
import static accord.local.cfk.CommandsForKey.InternalStatus.INVALIDATED;
import static accord.local.cfk.CommandsForKey.InternalStatus.PRUNED;
+import static accord.local.cfk.CommandsForKey.InternalStatus.TRANSITIVE;
import static
accord.local.cfk.CommandsForKey.InternalStatus.TRANSITIVE_VISIBLE;
import static accord.local.cfk.CommandsForKey.NOT_LOADING_PRUNED;
import static accord.local.cfk.CommandsForKey.NO_INFOS;
@@ -329,6 +331,10 @@ class Updating
int c = t.compareTo(d);
if (c == 0)
{
+ // TODO (expected): if plainTxnId implies TRANSITIVE_VISIBLE
dependencies,
+ // we should ensure any existing TRANSITIVE entries are
upgraded.
+ // OR we should remove TRANSITIVE for simplicity,
+ // OR document/enforce that TRANSITIVE_VISIBLE can only be
applied to dependencies of unmanaged transactions
if (d.is(UNSTABLE) && t.compareTo(COMMITTED) < 0 &&
t.witnesses(d))
{
if (missingCount == missing.length)
@@ -872,7 +878,7 @@ class Updating
// used only to decide if an executeAt is included _on the assumption
the TxnId is_. For ?[EX] this is all timestamps
Timestamp compareExecuteAt = waitingTxnId.awaitsOnlyDeps() ?
Timestamp.MAX : command.executeAt();
- TxnInfo[] byId = cfk.byId;
+ TxnInfo[] byId = cfk.byId, newById = null;
RelationMultiMap.SortedRelationList<TxnId> txnIds =
command.partialDeps().keyDeps.txnIdsWithFlags(cfk.key());
TxnId[] missing = NO_TXNIDS;
int missingCount = 0;
@@ -968,7 +974,16 @@ class Updating
effectiveExecutesAt =
Timestamp.nonNullOrMax(effectiveExecutesAt, txn.executeAt);
}
}
- if (c == 0) ++i;
+ if (c == 0)
+ {
+ if (txn.is(TRANSITIVE) &&
isTransitiveDependencyVisible(waitingTxnId))
+ {
+ if (newById == null)
+ newById = byId.clone();
+ newById[j] = TxnInfo.create(txn,
TRANSITIVE_VISIBLE, txn.mayExecute(), txn.statusOverrides(), txn.executeAt,
txn.missing(), txn.ballot());
+ }
+ ++i;
+ }
}
++j;
}
@@ -1022,7 +1037,8 @@ class Updating
waitingToExecuteAt = updateExecuteAtLeast(waitingToExecuteAt,
effectiveExecutesAt, safeCommand);
if (!readyToApply || missingCount > 0)
{
- TxnInfo[] newById = byId, newCommittedByExecuteAt =
cfk.committedByExecuteAt;
+ if (newById == null) newById = byId;
+ TxnInfo[] newCommittedByExecuteAt = cfk.committedByExecuteAt;
int newMinUndecidedById = cfk.minUndecidedById;
int newMaxAppliedPreBootstrapWriteById =
cfk.maxAppliedPreBootstrapWriteById;
Object[] newLoadingPruned = cfk.loadingPruned;
@@ -1050,8 +1066,9 @@ class Updating
++minUndecidedMissingIndex;
TxnId minUndecidedMissing = minUndecidedMissingIndex
== missingCount ? null : missing[minUndecidedMissingIndex];
TxnId minUndecided =
TxnId.nonNullOrMin(minUndecidedMissing, cfk.minUndecided());
+ TxnInfo[] copyById = newById;
newById = new TxnInfo[byId.length + missingCount];
- newCommittedByExecuteAt = insertAdditionsOnly(byId,
cfk.committedByExecuteAt, newById, missing, missingCount, cfk.bounds,
waitingTxnId);
+ newCommittedByExecuteAt =
insertAdditionsOnly(copyById, cfk.committedByExecuteAt, newById, missing,
missingCount, cfk.bounds, waitingTxnId);
// we can safely use missing[prunedIndex] here because
we only fill missing with transactions for which we manage execution
if (minUndecided != null)
newMinUndecidedById = Arrays.binarySearch(newById,
minUndecided);
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java
b/accord-core/src/main/java/accord/messages/Propagate.java
index 93b2c7cc..7a24b028 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -391,7 +391,7 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
if (stillOwnsOrMayExecute.isEmpty() &&
known.hasFullyTruncated(staleTouches))
{
- Commands.setTruncatedApplyOrErasedVestigial(safeStore,
safeCommand, participants, executeAtIfKnown);
+ Commands.setTruncatedOrVestigial(safeStore, safeCommand,
participants);
return null;
}
}
@@ -416,7 +416,7 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
}
// TODO (expected): we might prefer to adopt Redundant status, and
permit ourselves to later accept the result of the execution and/or definition
- Commands.setTruncatedApplyOrErasedVestigial(safeStore, safeCommand,
participants, executeAtIfKnown);
+ Commands.setTruncatedOrVestigial(safeStore, safeCommand, participants);
return null;
}
diff --git a/accord-core/src/main/java/accord/primitives/Deps.java
b/accord-core/src/main/java/accord/primitives/Deps.java
index a1ff4158..ae3699ad 100644
--- a/accord-core/src/main/java/accord/primitives/Deps.java
+++ b/accord-core/src/main/java/accord/primitives/Deps.java
@@ -20,6 +20,7 @@ package accord.primitives;
import accord.api.RoutingKey;
import accord.local.cfk.CommandsForKey;
+import accord.primitives.Routable.Domain;
import accord.utils.IndexedFunction;
import accord.utils.Invariants;
import accord.utils.MergeFewDisjointSortedListsCursor;
@@ -27,15 +28,20 @@ import accord.utils.RelationMultiMap;
import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.SortedList;
import accord.utils.SortedList.MergeCursor;
+import accord.utils.TriFunction;
+import accord.utils.UnhandledEnum;
import java.util.AbstractList;
import java.util.List;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;
+import static accord.local.cfk.CommandsForKey.managesExecution;
import static accord.primitives.Routables.Slice.Minimal;
+import static accord.primitives.Timestamp.Flag.UNSTABLE;
import static accord.utils.Invariants.illegalState;
/**
@@ -101,7 +107,7 @@ public class Deps
public AbstractBuilder<T> addNormalise(Unseekable keyOrRange, TxnId
txnId)
{
if (keyOrRange.domain() == txnId.domain()) add(keyOrRange, txnId);
- else if (keyOrRange.domain() == Routable.Domain.Key)
add(keyOrRange.asRange(), txnId);
+ else if (keyOrRange.domain() == Domain.Key)
add(keyOrRange.asRange(), txnId);
else throw illegalState();
return this;
}
@@ -113,7 +119,7 @@ public class Deps
{
default: throw new AssertionError();
case Key:
- if (CommandsForKey.managesExecution(txnId))
+ if (managesExecution(txnId))
{
keyBuilder.add(keyOrRange.asRoutingKey(), txnId);
}
@@ -195,26 +201,12 @@ public class Deps
public boolean contains(TxnId txnId)
{
- Routable.Domain domain = txnId.domain();
- if (domain.isRange())
- return rangeDeps.contains(txnId);
-
- if (CommandsForKey.managesExecution(txnId))
- return keyDeps.contains(txnId);
-
- return directKeyDeps.contains(txnId);
+ return applyToId(txnId, KeyOrRangeDeps::contains);
}
public boolean intersects(TxnId txnId, Ranges ranges)
{
- Routable.Domain domain = txnId.domain();
- if (domain.isRange())
- return rangeDeps.intersects(txnId, ranges);
-
- if (CommandsForKey.managesExecution(txnId))
- return keyDeps.intersects(txnId, ranges);
-
- return directKeyDeps.intersects(txnId, ranges);
+ return applyToId(txnId, ranges, KeyOrRangeDeps::intersects);
}
public MergeCursor<TxnId, DepList> txnIds(RoutingKey key)
@@ -293,23 +285,9 @@ public class Deps
return keyDeps.txnIdCount() + rangeDeps.txnIdCount() +
directKeyDeps.txnIdCount();
}
- public TxnId txnId(int i)
+ public TxnId txnId(int index)
{
- {
- int keyDepsLimit = keyDeps.txnIdCount();
- if (i < keyDepsLimit)
- return keyDeps.txnId(i);
- i -= keyDepsLimit;
- }
-
- {
- int directKeyDepsLimit = directKeyDeps.txnIdCount();
- if (i < directKeyDepsLimit)
- return directKeyDeps.txnId(i);
- i -= directKeyDepsLimit;
- }
-
- return rangeDeps.txnId(i);
+ return applyToIndex(index, KeyOrRangeDeps::txnId);
}
public List<TxnId> txnIds()
@@ -326,13 +304,72 @@ public class Deps
};
}
+ public int indexOf(TxnId txnId)
+ {
+ switch (txnId.domain())
+ {
+ default: throw new UnhandledEnum(txnId.domain());
+ case Key:
+ {
+ if (managesExecution(txnId))
+ return keyDeps.indexOf(txnId);
+ int index = directKeyDeps.indexOf(txnId);
+ return index < 0 ? -1 : keyDeps.txnIdCount() + index;
+ }
+ case Range:
+ {
+ int index = rangeDeps.indexOf(txnId);
+ return index < 0 ? -1 : keyDeps.txnIdCount() +
directKeyDeps.txnIdCount() + index;
+ }
+ }
+ }
+
public Participants<?> participants(TxnId txnId)
+ {
+ return applyToId(txnId, KeyOrRangeDeps::participants);
+ }
+
+ public Participants<?> participants(int index)
+ {
+ return applyToIndex(index, KeyOrRangeDeps::participants);
+ }
+
+ public boolean isStable(int index)
+ {
+ return !applyToIndex(index,
KeyOrRangeDeps::txnIdWithFlags).is(UNSTABLE);
+ }
+
+ private <T> T applyToIndex(int index, IndexedFunction<KeyOrRangeDeps, T>
apply)
+ {
+ int keyDepsLimit = keyDeps.txnIdCount();
+ if (index < keyDepsLimit)
+ return apply.apply(keyDeps, index);
+ index -= keyDepsLimit;
+
+ int directKeyDepsLimit = directKeyDeps.txnIdCount();
+ if (index < directKeyDepsLimit)
+ return apply.apply(directKeyDeps, index);
+ index -= directKeyDepsLimit;
+
+ return apply.apply(rangeDeps, index);
+ }
+
+ private <T> T applyToId(TxnId txnId, BiFunction<KeyOrRangeDeps, TxnId, T>
apply)
+ {
+ return applyToId(txnId, apply, (d, id, f) -> f.apply(d, id));
+ }
+
+ private <P, T> T applyToId(TxnId txnId, P p, TriFunction<KeyOrRangeDeps,
TxnId, P, T> apply)
{
switch (txnId.domain())
{
- default: throw new AssertionError();
- case Key: return CommandsForKey.managesExecution(txnId) ?
keyDeps.participants(txnId) : directKeyDeps.participants(txnId);
- case Range: return rangeDeps.participants(txnId);
+ default: throw new UnhandledEnum(txnId.domain());
+ case Key:
+ if (managesExecution(txnId))
+ return apply.apply(keyDeps, txnId, p);
+ return apply.apply(directKeyDeps, txnId, p);
+ case Range:
+ return apply.apply(rangeDeps, txnId, p);
}
}
diff --git a/accord-core/src/main/java/accord/primitives/KeyDeps.java
b/accord-core/src/main/java/accord/primitives/KeyDeps.java
index b02b9030..fea955cd 100644
--- a/accord-core/src/main/java/accord/primitives/KeyDeps.java
+++ b/accord-core/src/main/java/accord/primitives/KeyDeps.java
@@ -54,7 +54,7 @@ import static accord.utils.SortedArrays.Search.FAST;
* A collection of dependencies for a transaction, organised by the key the
dependency is adopted via.
* An inverse map from TxnId to Key may also be constructed and stored in this
collection.
*/
-public class KeyDeps implements Iterable<Map.Entry<RoutingKey, TxnId>>
+public class KeyDeps implements Iterable<Map.Entry<RoutingKey, TxnId>>,
KeyOrRangeDeps
{
public static final KeyDeps NONE = new KeyDeps(RoutingKeys.EMPTY,
NO_TXNIDS, NO_INTS);
diff --git a/accord-core/src/main/java/accord/primitives/KeyOrRangeDeps.java
b/accord-core/src/main/java/accord/primitives/KeyOrRangeDeps.java
new file mode 100644
index 00000000..74a9d4f5
--- /dev/null
+++ b/accord-core/src/main/java/accord/primitives/KeyOrRangeDeps.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.primitives;
+
+import accord.utils.SortedArrays;
+
+public interface KeyOrRangeDeps
+{
+ Participants<?> participants(TxnId txnId);
+ Participants<?> participants(int indexOf);
+ TxnId txnId(int indexOf);
+ TxnId txnIdWithFlags(int indexOf);
+ int txnIdCount();
+ SortedArrays.SortedArrayList<TxnId> txnIdsWithFlags();
+ boolean intersects(TxnId txnId, Ranges ranges);
+ boolean contains(TxnId txnId);
+}
diff --git a/accord-core/src/main/java/accord/primitives/Range.java
b/accord-core/src/main/java/accord/primitives/Range.java
index 81911584..853e9c62 100644
--- a/accord-core/src/main/java/accord/primitives/Range.java
+++ b/accord-core/src/main/java/accord/primitives/Range.java
@@ -43,6 +43,11 @@ public abstract class Range implements
Comparable<RoutableKey>, Unseekable, Seek
super(start, end);
}
+ public EndInclusive(RoutingKey start, RoutingKey end, AntiRangeMarker
antiRange)
+ {
+ super(start, end, antiRange);
+ }
+
@Override
public int compareTo(RoutableKey key)
{
@@ -93,6 +98,11 @@ public abstract class Range implements
Comparable<RoutableKey>, Unseekable, Seek
super(start, end);
}
+ public StartInclusive(RoutingKey start, RoutingKey end,
AntiRangeMarker antiRange)
+ {
+ super(start, end, antiRange);
+ }
+
@Override
public int compareTo(RoutableKey key)
{
@@ -202,6 +212,9 @@ public abstract class Range implements
Comparable<RoutableKey>, Unseekable, Seek
};
}
+ // used to construct an unsafe Range used only for representing an absence
of information. Imposes weaker invariants.
+ public enum AntiRangeMarker { ANTI_RANGE };
+
private final RoutingKey start;
private final RoutingKey end;
@@ -215,6 +228,15 @@ public abstract class Range implements
Comparable<RoutableKey>, Unseekable, Seek
this.end = end;
}
+ private Range(RoutingKey start, RoutingKey end, AntiRangeMarker antiRange)
+ {
+ // TODO (expected): should we at least relax to permit an empty Range?
+ Invariants.requireArgument(start.compareTo(end) < 0, "%s >= %s",
start, end);
+ Invariants.require(startInclusive() != endInclusive(), "Range must
have one side inclusive, and the other exclusive. Range of different types
should not be mixed.");
+ this.start = start;
+ this.end = end;
+ }
+
public RoutingKey start()
{
return start;
diff --git a/accord-core/src/main/java/accord/primitives/RangeDeps.java
b/accord-core/src/main/java/accord/primitives/RangeDeps.java
index e6b6be66..5ab90c16 100644
--- a/accord-core/src/main/java/accord/primitives/RangeDeps.java
+++ b/accord-core/src/main/java/accord/primitives/RangeDeps.java
@@ -88,7 +88,7 @@ import static accord.utils.SortedArrays.Search.FAST;
* TODO (testing): confirm we are de-overlapping ranges per txnId
* TODO (testing): randomised testing of all iteration methods
*/
-public class RangeDeps implements Iterable<Map.Entry<Range, TxnId>>
+public class RangeDeps implements Iterable<Map.Entry<Range, TxnId>>,
KeyOrRangeDeps
{
public static class SerializerSupport
{
@@ -471,6 +471,11 @@ public class RangeDeps implements
Iterable<Map.Entry<Range, TxnId>>
return ranges(txnId);
}
+ public Ranges participants(int indexOf)
+ {
+ return ranges(indexOf);
+ }
+
public Ranges ranges(TxnId txnId)
{
int txnIdx = Arrays.binarySearch(txnIds, txnId);
diff --git a/accord-core/src/main/java/accord/primitives/SaveStatus.java
b/accord-core/src/main/java/accord/primitives/SaveStatus.java
index e77a7f38..fb6091f8 100644
--- a/accord-core/src/main/java/accord/primitives/SaveStatus.java
+++ b/accord-core/src/main/java/accord/primitives/SaveStatus.java
@@ -104,7 +104,9 @@ public enum SaveStatus
Applying (Status.PreApplied,
LocalExecution.Applying),
// similar to Truncated, but doesn't imply we have any global knowledge
about application
Applied (Status.Applied,
LocalExecution.Applied),
- // TruncatedApplyWithDeps is a state never adopted within a single
replica; it is however a useful state we may enter by combining state from
multiple replicas
+ // TruncatedApplyWithOutcomeAndDeps exists to support re-populating
CommandsForKey on replay with any dependencies needed for computing recovery
superseding-rejects decisions
+ // TODO (expected): test replay
+ TruncatedApplyWithOutcomeAndDeps(Status.Truncated, FullRoute,
DefinitionErased, ApplyAtKnown, DepsKnown, Apply,
CleaningUp),
TruncatedApplyWithOutcome (Status.Truncated, FullRoute,
DefinitionErased, ApplyAtKnown, DepsErased, Apply,
CleaningUp),
TruncatedApply (Status.Truncated,
MaybeRoute, DefinitionErased, ApplyAtKnown, DepsErased,
WasApply,CleaningUp),
TruncatedUnapplied (Status.Truncated,
MaybeRoute, DefinitionErased, ExecuteAtKnown, DepsErased,
WasApply,CleaningUp),
@@ -364,27 +366,28 @@ public enum SaveStatus
switch (status)
{
default: throw new AssertionError("Unexpected status: " +
status);
+ case Erased:
case Vestigial:
if (known.outcome().isInvalidated())
return Invalidated;
- if (!known.outcome().isOrWasApply() ||
known.is(ExecuteAtKnown))
- return Vestigial;
-
- case Erased:
if (!known.outcome().isOrWasApply() ||
!known.is(ExecuteAtKnown))
- return Erased;
+ return status;
case TruncatedUnapplied:
if (!known.is(ApplyAtKnown))
return TruncatedUnapplied;
case TruncatedApply:
- if (known.outcome() != Apply)
+ if (!known.is(Outcome.Apply))
return TruncatedApply;
case TruncatedApplyWithOutcome:
- if (known.deps() != DepsKnown)
+ if (!known.is(DepsKnown))
+ return TruncatedApplyWithOutcome;
+
+ case TruncatedApplyWithOutcomeAndDeps:
+ if (!known.is(DefinitionKnown))
return TruncatedApplyWithOutcome;
return Applied;
diff --git a/accord-core/src/main/java/accord/utils/ArrayBuffers.java
b/accord-core/src/main/java/accord/utils/ArrayBuffers.java
index 5e5328ca..92446978 100644
--- a/accord-core/src/main/java/accord/utils/ArrayBuffers.java
+++ b/accord-core/src/main/java/accord/utils/ArrayBuffers.java
@@ -30,6 +30,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.function.IntFunction;
+import com.google.common.annotations.VisibleForTesting;
+
import static accord.utils.Invariants.illegalState;
/**
@@ -428,13 +430,16 @@ public class ArrayBuffers
* A very simple cache that simply stores the largest {@code maxCount}
arrays smaller than {@code maxSize}.
* Works on both primitive and Object arrays.
*/
- private static abstract class AbstractBufferCache<B>
+ @VisibleForTesting
+ static abstract class AbstractBufferCache<B>
{
interface Clear<B>
{
void clear(B array, int usedSize);
}
+ static final int MIN_SIZE = 8;
+
final IntFunction<B> allocator;
final Clear<B> clear;
final B empty;
@@ -444,7 +449,7 @@ public class ArrayBuffers
AbstractBufferCache(IntFunction<B> allocator, Clear<B> clear, int
maxCount, int maxSize)
{
this.allocator = allocator;
- this.maxSize = maxSize;
+ this.maxSize = Math.max(MIN_SIZE, maxSize);
this.cached = (B[])new Object[maxCount];
this.empty = allocator.apply(0);
this.clear = clear;
@@ -458,6 +463,7 @@ public class ArrayBuffers
if (minSize > maxSize)
return allocator.apply(minSize);
+ minSize = Math.max(minSize, MIN_SIZE);
for (int i = 0 ; i < cached.length ; ++i)
{
if (cached[i] != null && Array.getLength(cached[i]) >= minSize)
@@ -476,6 +482,9 @@ public class ArrayBuffers
if (bufferSize == 0 || bufferSize > maxSize)
return true;
+ Invariants.require(bufferSize >= MIN_SIZE,
+ "Trying to return an array of size %d we cannot
have allocated, suggesting a logic bug that could lead to overwriting an in-use
array.",
+ bufferSize);
if (bufferSize == usedSize && !force)
return false;
diff --git a/accord-core/src/main/java/accord/utils/RelationMultiMap.java
b/accord-core/src/main/java/accord/utils/RelationMultiMap.java
index a7b24315..5e09ee1a 100644
--- a/accord-core/src/main/java/accord/utils/RelationMultiMap.java
+++ b/accord-core/src/main/java/accord/utils/RelationMultiMap.java
@@ -326,11 +326,11 @@ public class RelationMultiMap
{
final MergeAdapter<K, V> adapter;
final PassThroughObjectBuffers<K> keyBuffers;
- K[] bufKeys;
- V[] bufValues;
- int[] buf = null;
- int bufKeysLength, bufValuesLength = 0, bufLength = 0;
- T from = null;
+ K[] prevKeys, nextKeys;
+ V[] prevValues, nextValues;
+ int[] prevKeysToValues, nextKeysToValues;
+ int prevKeysLength, prevValuesLength, prevKeysToValuesLength;
+ T prev, next;
public LinearMerger(MergeAdapter<K, V> adapter)
{
@@ -340,110 +340,108 @@ public class RelationMultiMap
}
@Override
- public Object construct(K[] keys, int keysLength, V[] txnIds, int
txnIdsLength, int[] out, int outLength)
+ public Object construct(K[] keys, int keysLength, V[] values, int
valuesLength, int[] out, int outLength)
{
- if (from == null)
+ boolean isNext = true, isPrev = true;
+ if (keys != prevKeys)
{
- // if our input buffers were themselves buffers, we want to
discard them unless they have been returned back to us
- discard(keys, txnIds, out);
+ Invariants.require(len(prevKeys, prevKeysLength) !=
keysLength);
+ if (prevKeysLength >= 0)
+ keyBuffers.realDiscard(prevKeys, prevKeysLength);
+ prevKeys = keys;
+ prevKeysLength = keys == nextKeys ? -1 : keysLength;
+ isNext = keysLength == nextKeys.length;
+ isPrev = false;
}
- else if (buf != out)
+ if (values != prevValues)
{
- // the output is not equal to a prior input
- from = null;
+ if (prevValuesLength >= 0)
+ realDiscard(prevValues, prevValuesLength);
+ prevValues = values;
+ prevValuesLength = values == nextValues ? -1 : valuesLength;
+ isNext &= valuesLength == nextValues.length;
+ isPrev = false;
}
-
- if (from == null)
- {
- bufKeys = keys;
- bufKeysLength = keysLength;
- bufValues = txnIds;
- bufValuesLength = txnIdsLength;
- buf = out;
- bufLength = outLength;
- }
- else
+ if (out != prevKeysToValues)
{
- Invariants.require(keys == bufKeys && keysLength ==
bufKeysLength);
- Invariants.require(txnIds == bufValues && txnIdsLength ==
bufValuesLength);
- Invariants.require(outLength == bufLength);
+ if (prevKeysToValuesLength >= 0)
+ realDiscard(prevKeysToValues, prevKeysToValuesLength);
+ prevKeysToValues = out;
+ prevKeysToValuesLength = out == nextKeysToValues ? -1 :
outLength;
+ isNext &= out == nextKeysToValues;
+ isPrev = false;
}
+
+ if (!isPrev)
+ prev = isNext ? next : null;
+
return null;
}
public void update(T merge, K[] keys, V[] values, int[] keysToValues)
{
- if (buf == null)
+ if (prevKeysToValues == null)
{
- bufKeys = keys;
- bufKeysLength = keys.length;
- bufValues = values;
- bufValuesLength = values.length;
- buf = keysToValues;
- bufLength = keysToValues.length;
- from = merge;
+ prevKeys = keys;
+ prevKeysLength = -1;
+ prevValues = values;
+ prevValuesLength = -1;
+ prevKeysToValues = keysToValues;
+ prevKeysToValuesLength = -1;
+ prev = merge;
return;
}
+ nextKeys = keys;
+ nextValues = values;
+ nextKeysToValues = keysToValues;
+ next = merge;
linearUnion(
- bufKeys, bufKeysLength, bufValues, bufValuesLength, buf,
bufLength,
- keys, keys.length, values, values.length, keysToValues,
keysToValues.length,
- adapter.keyComparator(), adapter.valueComparator(),
- adapter.keyMerger(), adapter.valueMerger(),
- keyBuffers, this, this, this
+ prevKeys, len(prevKeys, prevKeysLength), prevValues,
len(prevValues, prevValuesLength), prevKeysToValues, len(prevKeysToValues,
prevKeysToValuesLength),
+ keys, keys.length, values, values.length, keysToValues,
keysToValues.length,
+ adapter.keyComparator(), adapter.valueComparator(),
+ adapter.keyMerger(), adapter.valueMerger(),
+ keyBuffers, this, this, this
);
- if (buf == keysToValues)
- {
- Invariants.require(keys == bufKeys && keys.length ==
bufKeysLength);
- Invariants.require(values == bufValues && values.length ==
bufValuesLength);
- Invariants.require(keysToValues.length == bufLength);
- from = merge;
- }
}
- public T get(SimpleConstructor<K[], V[], T> constructor, T none)
+ private static int len(Object[] prev, int prevLength)
{
- if (buf == null)
- return none;
-
- if (from != null)
- return from;
+ return prevLength < 0 ? prev.length : prevLength;
+ }
- return constructor.construct(keyBuffers.realComplete(bufKeys,
bufKeysLength),
- realComplete(bufValues, bufValuesLength),
- realComplete(buf, bufLength));
+ private static int len(int[] prev, int prevLength)
+ {
+ return prevLength < 0 ? prev.length : prevLength;
}
- /**
- * Free buffers unless they are equal to the corresponding parameter
- */
- void discard(K[] freeKeysIfNot, V[] freeValuesIfNot, int[]
freeBufIfNot)
+ public T get(SimpleConstructor<K[], V[], T> constructor, T none)
{
- if (from != null)
- return;
+ if (prevKeysToValues == null)
+ return none;
- if (bufKeys != freeKeysIfNot)
- {
- keyBuffers.realDiscard(bufKeys, bufKeysLength);
- bufKeys = null;
- }
- if (bufValues != freeValuesIfNot)
- {
- realDiscard(bufValues, bufValuesLength);
- bufValues = null;
- }
- if (buf != freeBufIfNot)
- {
- realDiscard(buf, bufLength);
- buf = null;
- }
+ if (prev != null)
+ return prev;
+
+ K[] keys = prevKeysLength < 0 ? prevKeys :
keyBuffers.realComplete(prevKeys, prevKeysLength);
+ V[] values = prevValuesLength < 0 ? prevValues :
realComplete(prevValues, prevValuesLength);
+ int[] keysToValues = prevKeysToValuesLength < 0 ? prevKeysToValues
: realComplete(prevKeysToValues, prevKeysToValuesLength);
+ return constructor.construct(keys, values, keysToValues);
}
@Override
public void close()
{
- if (from == null)
- discard(null, null, null);
+ if (prevKeysLength >= 0 && prevKeys != null)
+ keyBuffers.realDiscard(prevKeys, prevKeysLength);
+ if (prevValuesLength >= 0 && prevValues != null)
+ realDiscard(prevValues, prevValuesLength);
+ if (prevKeysToValuesLength >= 0 && prevKeysToValues != null)
+ realDiscard(prevKeysToValues, prevKeysToValuesLength);
+ prevKeys = nextKeys = null;
+ prevValues = nextValues = null;
+ prevKeysToValues = nextKeysToValues = null;
+ next = prev = null;
}
}
@@ -624,17 +622,15 @@ public class RelationMultiMap
@Nullable BiFunction<? super K, ? super K, ? extends K>
keyMerger, @Nullable BiFunction<? super V, ? super V, ? extends V> valueMerger,
ObjectBuffers<K> keyBuffers, ObjectBuffers<V> valueBuffers,
IntBuffers intBuffers, Constructor<K, V, T> constructor)
{
- K[] outKeys = null;
- V[] outValues = null;
int[] remapLeft = null, remapRight = null, out = null;
int outLength = 0, outKeysLength = 0, outValuesLength = 0;
try
{
- outKeys = SortedArrays.linearUnion(leftKeys, 0, leftKeysLength,
rightKeys, 0, rightKeysLength, keyComparator, keyMerger, keyBuffers);
+ K[] outKeys = SortedArrays.linearUnion(leftKeys, 0,
leftKeysLength, rightKeys, 0, rightKeysLength, keyComparator, keyMerger,
keyBuffers);
outKeysLength = keyBuffers.sizeOfLast(outKeys);
- outValues = SortedArrays.linearUnion(leftValues, 0,
leftValuesLength, rightValues, 0, rightValuesLength, valueComparator,
valueMerger, valueBuffers);
+ V[] outValues = SortedArrays.linearUnion(leftValues, 0,
leftValuesLength, rightValues, 0, rightValuesLength, valueComparator,
valueMerger, valueBuffers);
outValuesLength = valueBuffers.sizeOfLast(outValues);
remapLeft = remapToSuperset(leftValues, leftValuesLength,
outValues, outValuesLength, valueComparator, intBuffers);
@@ -862,10 +858,6 @@ public class RelationMultiMap
}
finally
{
- if (outKeys != null)
- keyBuffers.discard(outKeys, outKeysLength);
- if (outValues != null)
- valueBuffers.discard(outValues, outValuesLength);
if (out != null)
intBuffers.discard(out, outLength);
if (remapLeft != null)
diff --git a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
b/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
index 721bc207..e6404ef9 100644
--- a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
@@ -31,6 +31,7 @@ import accord.primitives.RoutableKey;
import accord.utils.CRCUtils;
import accord.utils.Invariants;
+import static accord.primitives.Range.AntiRangeMarker.ANTI_RANGE;
import static accord.utils.Utils.toArray;
public abstract class PrefixedIntHashKey implements RoutableKey
@@ -147,7 +148,7 @@ public abstract class PrefixedIntHashKey implements
RoutableKey
@Override
public accord.primitives.Range newAntiRange(RoutingKey s,
RoutingKey e)
{
- return new Range((PrefixedIntRoutingKey) s,
(PrefixedIntRoutingKey) e);
+ return new Range((PrefixedIntRoutingKey) s,
(PrefixedIntRoutingKey) e, ANTI_RANGE);
}
};
}
@@ -224,6 +225,11 @@ public abstract class PrefixedIntHashKey implements
RoutableKey
super(start, end);
}
+ private Range(PrefixedIntRoutingKey start, PrefixedIntRoutingKey end,
AntiRangeMarker antiRange)
+ {
+ super(start, end, antiRange);
+ }
+
@Override
public accord.primitives.Range newRange(RoutingKey s, RoutingKey e)
{
diff --git a/accord-core/src/test/java/accord/utils/ArrayBuffersTest.java
b/accord-core/src/test/java/accord/utils/ArrayBuffersTest.java
new file mode 100644
index 00000000..0f826d97
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/ArrayBuffersTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.ArrayBuffers.*;
+import static accord.utils.Property.qt;
+
+public class ArrayBuffersTest
+{
+ public interface TestCase<T>
+ {
+ void initialize(int maxCount, int maxSize);
+ T get(RandomSource rng);
+ void check(T v);
+ T complete(T v, RandomSource rng);
+ T resize(T v, RandomSource rng);
+ void discard(T v);
+ }
+
+ @Test
+ public void simpleIntsTest()
+ {
+ simpleTest(new IntBufTestCase());
+ }
+
+ public static class IntBufTestCase implements TestCase<int[]>
+ {
+ private IntBufferCache cache;
+
+ @Override
+ public void initialize(int maxCount, int maxSize)
+ {
+ this.cache = new IntBufferCache(maxCount, maxSize);
+ }
+
+ @Override
+ public int[] get(RandomSource rng)
+ {
+ int[] arr = cache.getInts(rng.nextInt(1, 100));
+ Arrays.fill(arr, 255);
+ return arr;
+ }
+
+ @Override
+ public void check(int[] arr)
+ {
+ for (int v : arr)
+ Assertions.assertThat(v).isEqualTo(255);
+ }
+
+ @Override
+ public int[] complete(int[] v, RandomSource rng)
+ {
+ return new int[0];
+ }
+
+ @Override
+ public int[] resize(int[] arr, RandomSource rng)
+ {
+ int usedSize = arr.length == 1 ? 1 : rng.nextInt(1, arr.length);
+ arr = cache.resize(arr,
+ usedSize,
+ rng.nextInt(usedSize, 100));
+ Arrays.fill(arr, 255);
+ return arr;
+ }
+
+ @Override
+ public void discard(int[] v)
+ {
+ Arrays.fill(v, 0);
+ cache.discard(v, 0);
+ }
+ }
+
+ @Test
+ public void simpleLongsTest()
+ {
+ simpleTest(new LongBufTestCase());
+ }
+
+ public static class LongBufTestCase implements TestCase<long[]>
+ {
+ private LongBufferCache cache;
+
+ @Override
+ public void initialize(int maxCount, int maxSize)
+ {
+ this.cache = new LongBufferCache(maxCount, maxSize);
+ }
+
+ @Override
+ public long[] get(RandomSource rng)
+ {
+ long[] arr = cache.getLongs(rng.nextInt(1, 100));
+ Arrays.fill(arr, 255);
+ return arr;
+ }
+
+ @Override
+ public void check(long[] arr)
+ {
+ for (long v : arr)
+ Assertions.assertThat(v).isEqualTo(255);
+ }
+
+ @Override
+ public long[] complete(long[] v, RandomSource rng)
+ {
+ return new long[0];
+ }
+
+ @Override
+ public long[] resize(long[] arr, RandomSource rng)
+ {
+ int usedSize = arr.length == 1 ? 1 : rng.nextInt(1, arr.length);
+ arr = cache.resize(arr,
+ usedSize,
+ rng.nextInt(usedSize, 100));
+ Arrays.fill(arr, 255);
+ return arr;
+ }
+
+ @Override
+ public void discard(long[] v)
+ {
+ Arrays.fill(v, 0);
+ cache.discard(v, 0);
+ }
+ }
+
+ private <T> void simpleTest(TestCase<T> testCase)
+ {
+ Gen<Operation> opGen = Gens.enums().all(Operation.class);
+ qt().forAll(Gens.random())
+ .check(rng -> {
+ for (int maxCount = 1; maxCount < 10; maxCount += 2)
+ {
+ for (int maxSize = 0; maxSize < 14; maxSize += 2)
+ {
+ testCase.initialize(maxCount, 1 << maxSize);
+
+ List<T> taken = new ArrayList<>();
+ for (int i = 0; i < 1000; i++)
+ {
+ switch (opGen.next(rng))
+ {
+ case GetInts:
+ {
+ T arr = testCase.get(rng);
+ taken.add(arr);
+ break;
+ }
+ case Resize:
+ {
+ if (taken.isEmpty())
+ continue;
+ T v = taken.remove(rng.nextInt(0,
taken.size()));
+ testCase.check(v);
+ v = testCase.resize(v, rng);
+ taken.add(v);
+ break;
+ }
+ case Complete:
+ {
+ if (taken.isEmpty())
+ continue;
+ T v = taken.remove(rng.nextInt(0,
taken.size()));
+ testCase.check(v);
+ testCase.check(testCase.complete(v, rng));
+ taken.add(v);
+ break;
+ }
+ case Discard:
+ {
+ if (taken.isEmpty())
+ continue;
+ T v = taken.remove(rng.nextInt(0,
taken.size()));
+ testCase.discard(v);
+ break;
+ }
+ }
+ }
+ }
+ }
+ });
+ }
+
+ public enum Operation
+ {
+ GetInts, Complete, Resize, Discard
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]