This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch semi-integrated-burn-test-rebased in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 3dca66b608324ed627aaa6a6111a2563028d219b Author: Alex Petrov <[email protected]> AuthorDate: Fri Nov 15 20:25:52 2024 +0100 Fixes --- .../main-test/java/accord/burn/BurnTestBase.java | 7 +- .../main-test/java/accord/impl/basic/Cluster.java | 2 +- .../java/accord/impl/basic/InMemoryJournal.java | 13 +- .../java/accord/impl/basic/VerifyingJournal.java | 94 ++++ .../java/accord/impl/list/ListResult.java | 2 +- .../main/java/accord/coordinate/PersistTxn.java | 1 - .../src/main/java/accord/impl/CommandChange.java | 598 --------------------- .../java/accord/impl/DurabilityScheduling.java | 11 +- .../main/java/accord/impl/TimestampsForKeys.java | 7 +- .../src/main/java/accord/local/Command.java | 8 +- accord-core/src/main/java/accord/local/Node.java | 4 +- .../src/main/java/accord/messages/CheckStatus.java | 22 +- .../src/main/java/accord/primitives/Timestamp.java | 4 +- 13 files changed, 135 insertions(+), 638 deletions(-) diff --git a/accord-core/src/main-test/java/accord/burn/BurnTestBase.java b/accord-core/src/main-test/java/accord/burn/BurnTestBase.java index 0f5a34c9..8f2389f1 100644 --- a/accord-core/src/main-test/java/accord/burn/BurnTestBase.java +++ b/accord-core/src/main-test/java/accord/burn/BurnTestBase.java @@ -92,7 +92,6 @@ import accord.utils.RandomSource; import accord.utils.Utils; import accord.utils.async.AsyncExecutor; import accord.utils.async.TimeoutUtils; -import accord.verify.CompositeVerifier; import accord.verify.StrictSerializabilityVerifier; import accord.verify.Verifier; import org.agrona.collections.Int2ObjectHashMap; @@ -302,7 +301,7 @@ public class BurnTestBase f2.get(); } - void burn(RandomSource random, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency, PendingQueue pendingQueue, Function<Id, Journal> journalFactory) + protected void burn(RandomSource random, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency, PendingQueue pendingQueue, Function<Id, Journal> journalFactory) { List<Throwable> failures = Collections.synchronizedList(new ArrayList<>()); AtomicLong progress = new AtomicLong(); @@ -520,7 +519,7 @@ public class BurnTestBase protected Verifier createVerifier(String prefix, int keyCount) { - return new StrictSerializabilityVerifier(prefix, keyCount); + return new StrictSerializabilityVerifier(prefix, keyCount); } public static void main(String[] args) @@ -660,7 +659,7 @@ public class BurnTestBase } } - private static List<Id> generateIds(boolean clients, int count) + protected static List<Id> generateIds(boolean clients, int count) { List<Id> ids = new ArrayList<>(); for (int i = 1; i <= count ; ++i) diff --git a/accord-core/src/main-test/java/accord/impl/basic/Cluster.java b/accord-core/src/main-test/java/accord/impl/basic/Cluster.java index bd2fb921..e9c9d9ed 100644 --- a/accord-core/src/main-test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/main-test/java/accord/impl/basic/Cluster.java @@ -716,7 +716,7 @@ public class Cluster if (!store.unsafeCommands().containsKey(txnId)) { Command beforeCommand = before.get(txnId); - if (beforeCommand.saveStatus() == SaveStatus.Erased) + if (beforeCommand.saveStatus() == SaveStatus.Erased || beforeCommand.saveStatus() == SaveStatus.Uninitialised) continue; if (store.unsafeGetRedundantBefore().min(beforeCommand.participants().owns(), RedundantBefore.Entry::shardRedundantBefore).compareTo(txnId) > 0) diff --git a/accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java index a74232d4..27249631 100644 --- a/accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java @@ -63,7 +63,6 @@ import static accord.primitives.Status.Invalidated; import static accord.primitives.Status.Truncated; import static accord.utils.Invariants.illegalState; -// TODO: looks like we also have accord via dependency here somehow? public class InMemoryJournal implements Journal { private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> diffsPerCommandStore = new Int2ObjectHashMap<>(); @@ -90,8 +89,16 @@ public class InMemoryJournal implements Journal // TODO: currently unused! RedundantBefore redundantBefore, DurableBefore durableBefore) { - List<Diff> diffs = this.diffsPerCommandStore.get(commandStoreId).get(txnId); - return reconstruct(diffs); + NavigableMap<TxnId, List<Diff>> commandStore = this.diffsPerCommandStore.get(commandStoreId); + + if (commandStore == null) + return null; + + List<Diff> saved = this.diffsPerCommandStore.get(commandStoreId).get(txnId); + if (saved == null) + return null; + + return reconstruct(saved); } @Override diff --git a/accord-core/src/main-test/java/accord/impl/basic/VerifyingJournal.java b/accord-core/src/main-test/java/accord/impl/basic/VerifyingJournal.java new file mode 100644 index 00000000..0c2f80bd --- /dev/null +++ b/accord-core/src/main-test/java/accord/impl/basic/VerifyingJournal.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl.basic; + +import java.util.NavigableMap; + +import accord.api.Journal; +import accord.local.Command; +import accord.local.CommandStores; +import accord.local.DurableBefore; +import accord.local.RedundantBefore; +import accord.primitives.Ranges; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; + +public class VerifyingJournal implements Journal +{ + private final Journal model; + private final Journal sut; + + public VerifyingJournal(Journal model, Journal sut) + { + this.model = model; + this.sut = sut; + } + + public Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) + { + Command res = sut.loadCommand(commandStoreId, txnId, redundantBefore, durableBefore); + Command model = this.model.loadCommand(commandStoreId, txnId, redundantBefore, durableBefore); + Invariants.checkState(res.equals(model)); + return res; + } + + public void saveCommand(int store, CommandUpdate update, Runnable onFlush) + { + model.saveCommand(store, update, null); + sut.saveCommand(store, update, onFlush); + } + + public void purge(CommandStores commandStores) + { + model.purge(commandStores); + sut.purge(commandStores); + } + + public void replay(CommandStores commandStores) + { + sut.replay(commandStores); + } + + // TODO (required): model loading + public RedundantBefore loadRedundantBefore(int commandStoreId) + { + return sut.loadRedundantBefore(commandStoreId); + } + + public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId) + { + return sut.loadBootstrapBeganAt(commandStoreId); + } + + public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId) + { + return sut.loadSafeToRead(commandStoreId); + } + + public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int commandStoreId) + { + return sut.loadRangesForEpoch(commandStoreId); + } + + public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush) + { + sut.saveStoreState(store, fieldUpdates, onFlush); + } +} diff --git a/accord-core/src/main-test/java/accord/impl/list/ListResult.java b/accord-core/src/main-test/java/accord/impl/list/ListResult.java index adfa1fe3..c752bd0b 100644 --- a/accord-core/src/main-test/java/accord/impl/list/ListResult.java +++ b/accord-core/src/main-test/java/accord/impl/list/ListResult.java @@ -42,7 +42,7 @@ public class ListResult implements Result, Reply public final Keys responseKeys; public final int[][] read; // equal in size to keys.size() public final ListUpdate update; - private final Status status; + public final Status status; public ListResult(Status status, Id client, long requestId, TxnId txnId, Seekables<?, ?> readKeys, Keys responseKeys, int[][] read, ListUpdate update) { diff --git a/accord-core/src/main/java/accord/coordinate/PersistTxn.java b/accord-core/src/main/java/accord/coordinate/PersistTxn.java index c9139f01..797e8363 100644 --- a/accord-core/src/main/java/accord/coordinate/PersistTxn.java +++ b/accord-core/src/main/java/accord/coordinate/PersistTxn.java @@ -22,7 +22,6 @@ import accord.api.Result; import accord.local.Node; import accord.primitives.Deps; import accord.primitives.FullRoute; -import accord.primitives.Participants; import accord.primitives.Route; import accord.primitives.Timestamp; import accord.primitives.Txn; diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java b/accord-core/src/main/java/accord/impl/CommandChange.java deleted file mode 100644 index 60d1a209..00000000 --- a/accord-core/src/main/java/accord/impl/CommandChange.java +++ /dev/null @@ -1,598 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.impl; - -//import java.util.function.Function; -// -//import com.google.common.annotations.VisibleForTesting; -// -//import accord.api.Agent; -//import accord.api.Result; -//import accord.local.Cleanup; -//import accord.local.Command; -//import accord.local.CommonAttributes; -//import accord.local.DurableBefore; -//import accord.local.RedundantBefore; -//import accord.local.StoreParticipants; -//import accord.primitives.Ballot; -//import accord.primitives.PartialDeps; -//import accord.primitives.PartialTxn; -//import accord.primitives.SaveStatus; -//import accord.primitives.Status; -//import accord.primitives.Timestamp; -//import accord.primitives.TxnId; -//import accord.primitives.Writes; -//import accord.utils.Invariants; -// -//import static accord.impl.CommandChange.Fields.ACCEPTED; -//import static accord.impl.CommandChange.Fields.CLEANUP; -//import static accord.impl.CommandChange.Fields.DURABILITY; -//import static accord.impl.CommandChange.Fields.EXECUTES_AT_LEAST; -//import static accord.impl.CommandChange.Fields.EXECUTE_AT; -//import static accord.impl.CommandChange.Fields.FIELDS; -//import static accord.impl.CommandChange.Fields.PARTIAL_DEPS; -//import static accord.impl.CommandChange.Fields.PARTIAL_TXN; -//import static accord.impl.CommandChange.Fields.PARTICIPANTS; -//import static accord.impl.CommandChange.Fields.PROMISED; -//import static accord.impl.CommandChange.Fields.SAVE_STATUS; -//import static accord.impl.CommandChange.Fields.WAITING_ON; -//import static accord.impl.CommandChange.Fields.WRITES; -//import static accord.impl.CommandChange.Load.ALL; -//import static accord.local.Cleanup.NO; -//import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME; -//import static accord.primitives.Known.KnownDeps.DepsErased; -//import static accord.primitives.Known.KnownDeps.DepsUnknown; -//import static accord.primitives.Known.KnownDeps.NoDeps; -//import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome; -//import static accord.primitives.Status.Durability.NotDurable; -//import static accord.utils.Invariants.illegalState; -//import static org.apache.cassandra.service.accord.SavedCommand.MinimalCommand; -// -///** -// * Class representing a change in the Command after its execution by SafeCommandStore -// */ -//public class CommandChange -//{ -// // This enum is order-dependent -// public enum Fields -// { -// PARTICIPANTS, // stored first so we can index it -// SAVE_STATUS, -// PARTIAL_DEPS, -// EXECUTE_AT, -// EXECUTES_AT_LEAST, -// DURABILITY, -// ACCEPTED, -// PROMISED, -// WAITING_ON, -// PARTIAL_TXN, -// WRITES, -// CLEANUP, -// ; -// -// public static final Fields[] FIELDS = values(); -// } -// -// // TODO (required): calculate flags once -// private static boolean anyFieldChanged(int flags) -// { -// return (flags >>> 16) != 0; -// } -// -// private static int validateFlags(int flags) -// { -// Invariants.checkState(0 == (~(flags >>> 16) & (flags & 0xffff))); -// return flags; -// } -// -// /** -// * Collects flags -// */ -// public static int getFlags(Command before, Command after) -// { -// int flags = 0; -// -// flags = collectFlags(before, after, Command::executeAt, true, EXECUTE_AT, flags); -// flags = collectFlags(before, after, Command::executesAtLeast, true, EXECUTES_AT_LEAST, flags); -// flags = collectFlags(before, after, Command::saveStatus, false, SAVE_STATUS, flags); -// flags = collectFlags(before, after, Command::durability, false, DURABILITY, flags); -// -// flags = collectFlags(before, after, Command::acceptedOrCommitted, false, ACCEPTED, flags); -// flags = collectFlags(before, after, Command::promised, false, PROMISED, flags); -// -// flags = collectFlags(before, after, Command::participants, true, PARTICIPANTS, flags); -// flags = collectFlags(before, after, Command::partialTxn, false, PARTIAL_TXN, flags); -// flags = collectFlags(before, after, Command::partialDeps, false, PARTIAL_DEPS, flags); -// -// // TODO: waitingOn vs WaitingOnWithExecutedAt? -// flags = collectFlags(before, after, CommandChange::getWaitingOn, true, WAITING_ON, flags); -// -// flags = collectFlags(before, after, Command::writes, false, WRITES, flags); -// -// return flags; -// } -// -// private static Command.WaitingOn getWaitingOn(Command command) -// { -// if (command instanceof Command.Committed) -// return command.asCommitted().waitingOn(); -// -// return null; -// } -// -// private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ, VAL> convert, boolean allowClassMismatch, CommandChange.Fields field, int flags) -// { -// VAL l = null; -// VAL r = null; -// if (lo != null) l = convert.apply(lo); -// if (ro != null) r = convert.apply(ro); -// -// if (l == r) -// return flags; // no change -// -// if (r == null) -// flags = setFieldIsNull(field, flags); -// -// if (l == null || r == null) -// return setFieldChanged(field, flags); -// -// assert allowClassMismatch || l.getClass() == r.getClass() : String.format("%s != %s", l.getClass(), r.getClass()); -// -// if (l.equals(r)) -// return flags; // no change -// -// return setFieldChanged(field, flags); -// } -// -// private static int setFieldChanged(CommandChange.Fields field, int oldFlags) -// { -// return oldFlags | (0x10000 << field.ordinal()); -// } -// -// @VisibleForTesting -// public static boolean getFieldChanged(CommandChange.Fields field, int oldFlags) -// { -// return (oldFlags & (0x10000 << field.ordinal())) != 0; -// } -// -// static int toIterableSetFields(int flags) -// { -// return flags >>> 16; -// } -// -// static CommandChange.Fields nextSetField(int iterable) -// { -// int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable)); -// return i == 32 ? null : FIELDS[i]; -// } -// -// static int unsetIterableFields(CommandChange.Fields field, int iterable) -// { -// return iterable & ~(1 << field.ordinal()); -// } -// -// @VisibleForTesting -// static boolean getFieldIsNull(CommandChange.Fields field, int oldFlags) -// { -// return (oldFlags & (1 << field.ordinal())) != 0; -// } -// -// private static int setFieldIsNull(CommandChange.Fields field, int oldFlags) -// { -// return oldFlags | (1 << field.ordinal()); -// } -// -// public enum Load -// { -// ALL(0), -// PURGEABLE(SAVE_STATUS, PARTICIPANTS, DURABILITY, EXECUTE_AT, WRITES), -// MINIMAL(SAVE_STATUS, PARTICIPANTS, EXECUTE_AT); -// -// final int mask; -// -// Load(int mask) -// { -// this.mask = mask; -// } -// -// Load(Fields ... fields) -// { -// int mask = -1; -// for (Fields field : fields) -// mask &= ~(1<< field.ordinal()); -// this.mask = mask; -// } -// } -// -// /** -// * Helper class for reconstructing Command from a sequence of logged CommandChanges -// */ -// public static class CommandBuilder -// { -// final int mask; -// int flags; -// -// TxnId txnId; -// -// Timestamp executeAt; -// Timestamp executeAtLeast; -// SaveStatus saveStatus; -// Status.Durability durability; -// -// Ballot acceptedOrCommitted; -// Ballot promised; -// -// StoreParticipants participants; -// PartialTxn partialTxn; -// PartialDeps partialDeps; -// -// byte[] waitingOnBytes; -// CommandChange.WaitingOnProvider waitingOn; -// Writes writes; -// Result result; -// Cleanup cleanup; -// -// boolean nextCalled; -// int count; -// -// public CommandBuilder(TxnId txnId, CommandChange.Load load) -// { -// this.mask = load.mask; -// init(txnId); -// } -// -// public CommandBuilder(TxnId txnId) -// { -// this(txnId, ALL); -// } -// -// public CommandBuilder(CommandChange.Load load) -// { -// this.mask = load.mask; -// } -// -// public CommandBuilder() -// { -// this(ALL); -// } -// -// public TxnId txnId() -// { -// return txnId; -// } -// -// public Timestamp executeAt() -// { -// return executeAt; -// } -// -// public Timestamp executeAtLeast() -// { -// return executeAtLeast; -// } -// -// public SaveStatus saveStatus() -// { -// return saveStatus; -// } -// -// public Status.Durability durability() -// { -// return durability; -// } -// -// public Ballot acceptedOrCommitted() -// { -// return acceptedOrCommitted; -// } -// -// public Ballot promised() -// { -// return promised; -// } -// -// public StoreParticipants participants() -// { -// return participants; -// } -// -// public PartialTxn partialTxn() -// { -// return partialTxn; -// } -// -// public PartialDeps partialDeps() -// { -// return partialDeps; -// } -// -// public CommandChange.WaitingOnProvider waitingOn() -// { -// return waitingOn; -// } -// -// public Writes writes() -// { -// return writes; -// } -// -// public Result result() -// { -// return result; -// } -// -// public void clear() -// { -// flags = 0; -// txnId = null; -// -// executeAt = null; -// executeAtLeast = null; -// saveStatus = null; -// durability = null; -// -// acceptedOrCommitted = null; -// promised = null; -// -// participants = null; -// partialTxn = null; -// partialDeps = null; -// -// waitingOnBytes = null; -// waitingOn = null; -// writes = null; -// result = null; -// cleanup = null; -// -// nextCalled = false; -// count = 0; -// } -// -// public void reset(TxnId txnId) -// { -// clear(); -// init(txnId); -// } -// -// public void init(TxnId txnId) -// { -// this.txnId = txnId; -// durability = NotDurable; -// acceptedOrCommitted = promised = Ballot.ZERO; -// waitingOn = (txn, deps) -> null; -// // TODO -// //result = CommandSerializers.APPLIED; -// } -// -// public boolean isEmpty() -// { -// return !nextCalled; -// } -// -// public int count() -// { -// return count; -// } -// -// public Cleanup shouldCleanup(Agent agent, RedundantBefore redundantBefore, DurableBefore durableBefore) -// { -// if (!nextCalled) -// return NO; -// -// if (saveStatus == null || participants == null) -// return Cleanup.NO; -// -// Cleanup cleanup = Cleanup.shouldCleanupPartial(agent, txnId, saveStatus, durability, participants, redundantBefore, durableBefore); -// if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0) -// cleanup = this.cleanup; -// return cleanup; -// } -// -// // TODO (expected): avoid allocating new builder -// public CommandBuilder maybeCleanup(Cleanup cleanup) -// { -// if (saveStatus() == null) -// return this; -// -// switch (cleanup) -// { -// case EXPUNGE: -// case ERASE: -// return null; -// -// case EXPUNGE_PARTIAL: -// return expungePartial(cleanup, saveStatus, true); -// -// case VESTIGIAL: -// case INVALIDATE: -// return saveStatusOnly(); -// -// case TRUNCATE_WITH_OUTCOME: -// case TRUNCATE: -// return expungePartial(cleanup, cleanup.appliesIfNot, cleanup == TRUNCATE_WITH_OUTCOME); -// -// case NO: -// return this; -// default: -// throw new IllegalStateException("Unknown cleanup: " + cleanup);} -// } -// -// public CommandBuilder expungePartial(Cleanup cleanup, SaveStatus saveStatus, boolean includeOutcome) -// { -// Invariants.checkState(txnId != null); -// CommandBuilder builder = new CommandBuilder(txnId, ALL); -// -// builder.count++; -// builder.nextCalled = true; -// -// Invariants.checkState(saveStatus != null); -// builder.flags = setFieldChanged(SAVE_STATUS, builder.flags); -// builder.saveStatus = saveStatus; -// builder.flags = setFieldChanged(CLEANUP, builder.flags); -// builder.cleanup = cleanup; -// if (executeAt != null) -// { -// builder.flags = setFieldChanged(EXECUTE_AT, builder.flags); -// builder.executeAt = executeAt; -// } -// if (durability != null) -// { -// builder.flags = setFieldChanged(DURABILITY, builder.flags); -// builder.durability = durability; -// } -// if (participants != null) -// { -// builder.flags = setFieldChanged(PARTICIPANTS, builder.flags); -// builder.participants = participants; -// } -// if (includeOutcome && builder.writes != null) -// { -// builder.flags = setFieldChanged(WRITES, builder.flags); -// builder.writes = writes; -// } -// -// return builder; -// } -// -// public CommandBuilder saveStatusOnly() -// { -// Invariants.checkState(txnId != null); -// CommandBuilder builder = new CommandBuilder(txnId, ALL); -// -// builder.count++; -// builder.nextCalled = true; -// -// // TODO: these accesses can be abstracted away -// if (saveStatus != null) -// { -// builder.flags = setFieldChanged(SAVE_STATUS, builder.flags); -// builder.saveStatus = saveStatus; -// } -// -// return builder; -// } -// -// public MinimalCommand asMinimal() -// { -// return new MinimalCommand(txnId, saveStatus, participants, durability, executeAt, writes); -// } -// -// @VisibleForTesting -// public void forceResult(Result newValue) -// { -// this.result = newValue; -// } -// -// public Command construct() -// { -// if (!nextCalled) -// return null; -// -// Invariants.checkState(txnId != null); -// CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId); -// if (partialTxn != null) -// attrs.partialTxn(partialTxn); -// if (durability != null) -// attrs.durability(durability); -// if (participants != null) -// attrs.setParticipants(participants); -// if (partialDeps != null && -// (saveStatus.known.deps != NoDeps && -// saveStatus.known.deps != DepsErased && -// saveStatus.known.deps != DepsUnknown)) -// attrs.partialDeps(partialDeps); -// -// Command.WaitingOn waitingOn = null; -// if (this.waitingOn != null) -// waitingOn = this.waitingOn.provide(txnId, partialDeps); -// -// switch (saveStatus.status) -// { -// case NotDefined: -// return saveStatus == SaveStatus.Uninitialised ? Command.NotDefined.uninitialised(attrs.txnId()) -// : Command.NotDefined.notDefined(attrs, promised); -// case PreAccepted: -// return Command.PreAccepted.preAccepted(attrs, executeAt, promised); -// case AcceptedInvalidate: -// case Accepted: -// case PreCommitted: -// if (saveStatus == SaveStatus.AcceptedInvalidate) -// return Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, acceptedOrCommitted); -// else -// return Command.Accepted.accepted(attrs, saveStatus, executeAt, promised, acceptedOrCommitted); -// case Committed: -// case Stable: -// return Command.Committed.committed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn); -// case PreApplied: -// case Applied: -// return Command.Executed.executed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes, result); -// case Truncated: -// case Invalidated: -// return truncated(attrs, saveStatus, executeAt, executeAtLeast, writes, result); -// default: -// throw new IllegalStateException(); -// } -// } -// -// private static Command.Truncated truncated(CommonAttributes.Mutable attrs, SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes writes, Result result) -// { -// switch (status) -// { -// default: -// throw illegalState("Unhandled SaveStatus: " + status); -// case TruncatedApplyWithOutcome: -// case TruncatedApplyWithDeps: -// case TruncatedApply: -// if (status != TruncatedApplyWithOutcome) -// result = null; -// if (attrs.txnId().kind().awaitsOnlyDeps()) -// return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result, executesAtLeast); -// return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result, null); -// case ErasedOrVestigial: -// return Command.Truncated.erasedOrInvalidOrVestigial(attrs.txnId(), attrs.durability(), attrs.participants()); -// case Erased: -// return Command.Truncated.erased(attrs.txnId(), attrs.durability(), attrs.participants()); -// case Invalidated: -// return Command.Truncated.invalidated(attrs.txnId()); -// } -// } -// -// public String toString() -// { -// return "Diff {" + -// "txnId=" + txnId + -// ", executeAt=" + executeAt + -// ", saveStatus=" + saveStatus + -// ", durability=" + durability + -// ", acceptedOrCommitted=" + acceptedOrCommitted + -// ", promised=" + promised + -// ", participants=" + participants + -// ", partialTxn=" + partialTxn + -// ", partialDeps=" + partialDeps + -// ", waitingOn=" + waitingOn + -// ", writes=" + writes + -// '}'; -// } -// } -// -// public interface WaitingOnProvider -// { -// Command.WaitingOn provide(TxnId txnId, PartialDeps deps); -// } -//} \ No newline at end of file diff --git a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java index ada18fc9..329ba009 100644 --- a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java +++ b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java @@ -314,10 +314,8 @@ public class DurabilityScheduling implements ConfigurationService.Listener index *= 2; numberOfSplits *= 2; } - if (fail instanceof Timeout) - logger.trace("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits, syncId, ranges, fail); - else - logger.debug("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits, syncId, ranges, fail); + + logger.trace("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits, syncId, ranges, fail); } } else @@ -339,10 +337,7 @@ public class DurabilityScheduling implements ConfigurationService.Listener .addCallback((success, fail) -> { if (fail != null && fail.getClass() != SyncPointErased.class) { - if (fail instanceof Timeout) - logger.trace("Exception coordinating shard durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail); - else - logger.debug("Exception coordinating shard durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail); + logger.trace("Exception coordinating shard durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail); retryCoordinateDurability(node, exclusiveSyncPoint, nextIndex); } else diff --git a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java index d54080a6..f954f648 100644 --- a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java +++ b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java @@ -21,6 +21,7 @@ package accord.impl; import accord.api.RoutingKey; import accord.api.VisibleForImplementation; import accord.local.SafeCommandStore; +import accord.local.cfk.CommandsForKey; import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -48,7 +49,8 @@ public class TimestampsForKeys { if (safeStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId, current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY) return current; - throw illegalState("%s is less than the most recent write timestamp %s", executeAt, lastWrite); + if (CommandsForKey.reportLinearizabilityViolations()) + throw illegalState("%s is less than the most recent write timestamp %s", executeAt, lastWrite); } Timestamp lastExecuted = current.lastExecutedTimestamp(); @@ -61,7 +63,8 @@ public class TimestampsForKeys { if (!safeStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable())) return current; - throw illegalState("%s is less than the most recent executed timestamp %s", executeAt, lastExecuted); + if (CommandsForKey.reportLinearizabilityViolations()) + throw illegalState("%s is less than the most recent executed timestamp %s", executeAt, lastExecuted); } long micros = executeAt.hlc(); diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index cf64e6cd..a089036a 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -328,8 +328,8 @@ public abstract class Command implements CommonAttributes { default: throw new AssertionError("Unhandled Outcome: " + known.outcome); case Apply: - Invariants.checkState(writes != null, "Writes is null"); - Invariants.checkState(result != null, "Result is null"); + Invariants.checkState(writes != null, "Writes is null %s", validate); + Invariants.checkState(result != null, "Result is null %s", validate); break; case Invalidated: Invariants.checkState(validate.durability().isMaybeInvalidated(), "%s is not invalidated", validate.durability()); @@ -337,8 +337,8 @@ public abstract class Command implements CommonAttributes Invariants.checkState(validate.durability() != Local); case Erased: case WasApply: - Invariants.checkState(writes == null, "Writes exist"); - Invariants.checkState(result == null, "Results exist"); + Invariants.checkState(writes == null, "Writes exist for %s", validate); + Invariants.checkState(result == null, "Results exist %s", validate); break; } } diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index a213c112..38939974 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -671,7 +671,9 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ */ public TxnId nextTxnId(Txn.Kind rw, Domain domain) { - return new TxnId(uniqueNow(), rw, domain); + TxnId txnId = new TxnId(uniqueNow(), rw, domain); + Invariants.checkState((txnId.lsb & (0xffff & ~TxnId.IDENTITY_FLAGS)) == 0); + return txnId; } public AsyncResult<Result> coordinate(Txn txn) diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java b/accord-core/src/main/java/accord/messages/CheckStatus.java index e4e5a0a8..ca2ea321 100644 --- a/accord-core/src/main/java/accord/messages/CheckStatus.java +++ b/accord-core/src/main/java/accord/messages/CheckStatus.java @@ -18,6 +18,7 @@ package accord.messages; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import accord.api.Result; @@ -30,48 +31,43 @@ import accord.local.Node.Id; import accord.local.PreLoadContext; import accord.local.SafeCommand; import accord.local.SafeCommandStore; -import accord.primitives.SaveStatus; -import accord.primitives.Status; import accord.local.StoreParticipants; import accord.primitives.Ballot; +import accord.primitives.Known; import accord.primitives.KnownMap; -import accord.primitives.WithQuorum; import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; import accord.primitives.Participants; import accord.primitives.ProgressToken; import accord.primitives.Ranges; import accord.primitives.Route; +import accord.primitives.SaveStatus; +import accord.primitives.Status; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Unseekables; +import accord.primitives.WithQuorum; import accord.primitives.Writes; import accord.topology.Topologies; import accord.utils.Invariants; import accord.utils.MapReduceConsume; - -import javax.annotation.Nonnull; +import accord.utils.async.Cancellable; import static accord.coordinate.Infer.InvalidIf.IfUncommitted; import static accord.coordinate.Infer.InvalidIf.IsNotInvalid; import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid; +import static accord.messages.TxnRequest.computeScope; +import static accord.primitives.Route.castToRoute; +import static accord.primitives.Route.isRoute; import static accord.primitives.Status.Durability; import static accord.primitives.Status.Durability.Local; import static accord.primitives.Status.Durability.Majority; import static accord.primitives.Status.Durability.ShardUniversal; import static accord.primitives.Status.Durability.Universal; - -import accord.primitives.Known; -import accord.utils.async.Cancellable; - -import static accord.primitives.Status.Invalidated; import static accord.primitives.Status.NotDefined; import static accord.primitives.Status.Stable; import static accord.primitives.Status.Truncated; -import static accord.messages.TxnRequest.computeScope; import static accord.primitives.WithQuorum.HasQuorum; -import static accord.primitives.Route.castToRoute; -import static accord.primitives.Route.isRoute; public class CheckStatus extends AbstractRequest<CheckStatus.CheckStatusReply> implements Request, PreLoadContext, MapReduceConsume<SafeCommandStore, CheckStatus.CheckStatusReply> diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java b/accord-core/src/main/java/accord/primitives/Timestamp.java index 0a9de552..f488ab8b 100644 --- a/accord-core/src/main/java/accord/primitives/Timestamp.java +++ b/accord-core/src/main/java/accord/primitives/Timestamp.java @@ -38,8 +38,8 @@ public class Timestamp implements Comparable<Timestamp>, EpochSupplier */ private static final int MERGE_FLAGS = 0x8000; // TODO (testing): is this the correct set of identity bits? - private static final long IDENTITY_LSB = 0xFFFFFFFFFFFF001EL; - public static final int IDENTITY_FLAGS = 0x001E; + private static final long IDENTITY_LSB = 0xFFFFFFFF_FFFF001FL; + public static final int IDENTITY_FLAGS = 0x00000000_0000001F; public static final long MAX_EPOCH = (1L << 48) - 1; private static final long HLC_INCR = 1L << 16; static final long MAX_FLAGS = HLC_INCR - 1; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
