This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch semi-integrated-burn-test-rebased in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit f0b29213ab32d39e3b030bf217ee8b19836cdfff Author: Alex Petrov <[email protected]> AuthorDate: Fri Nov 15 16:46:07 2024 +0100 Move files --- .../main-test/java/accord/impl/basic/Cluster.java | 2 +- .../java/accord/impl/basic/InMemoryJournal.java} | 340 ++++++------ .../java/accord/impl/basic/LoggingJournal.java | 120 +++++ accord-core/src/main/java/accord/api/Journal.java | 102 ++++ .../src/main/java/accord/impl/CommandChange.java | 598 +++++++++++++++++++++ .../java/accord/impl/DurabilityScheduling.java | 11 +- .../java/accord/impl/InMemoryCommandStore.java | 152 +++--- .../src/main/java/accord/local/CommandStore.java | 3 + .../java/accord/messages/ReadEphemeralTxnData.java | 3 +- .../src/test/java/accord/burn/BurnTest.java | 25 +- .../coordinate/CoordinateTransactionTest.java | 19 +- .../test/java/accord/impl/RemoteListenersTest.java | 7 + .../java/accord/local/cfk/CommandsForKeyTest.java | 6 + 13 files changed, 1155 insertions(+), 233 deletions(-) diff --git a/accord-core/src/main-test/java/accord/impl/basic/Cluster.java b/accord-core/src/main-test/java/accord/impl/basic/Cluster.java index cf0604ad..bd2fb921 100644 --- a/accord-core/src/main-test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/main-test/java/accord/impl/basic/Cluster.java @@ -560,6 +560,7 @@ public class Cluster NavigableMap<RoutableKey, Timestamped<int[]>> prevData = listStore.copyOfCurrentData(); listStore.clear(); listStore.restoreFromSnapshot(); + Int2ObjectHashMap<NavigableMap<TxnId, Command>> beforeStores = copyCommands(stores); for (CommandStore s : stores) { DelayedCommandStores.DelayedCommandStore store = (DelayedCommandStores.DelayedCommandStore) s; @@ -567,7 +568,6 @@ public class Cluster } Journal journal = journalMap.get(id); - Int2ObjectHashMap<NavigableMap<TxnId, Command>> beforeStores = copyCommands(stores); journal.replay(nodeMap.get(id).commandStores()); while (sinks.drain(pred)); CommandsForKey.enableLinearizabilityViolationsReporting(); diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java b/accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java similarity index 67% rename from accord-core/src/test/java/accord/impl/basic/Journal.java rename to accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java index 27f9cb19..a74232d4 100644 --- a/accord-core/src/test/java/accord/impl/basic/Journal.java +++ b/accord-core/src/main-test/java/accord/impl/basic/InMemoryJournal.java @@ -20,76 +20,156 @@ package accord.impl.basic; import java.util.AbstractList; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Objects; -import java.util.Set; import java.util.TreeMap; import java.util.function.Function; -import java.util.function.IntFunction; import java.util.stream.Collectors; +import com.google.common.collect.ImmutableSortedMap; + +import accord.api.Journal; import accord.api.Result; import accord.impl.InMemoryCommandStore; import accord.local.Cleanup; import accord.local.Command; import accord.local.CommandStore; +import accord.local.CommandStores; import accord.local.Commands; import accord.local.CommonAttributes; +import accord.local.DurableBefore; import accord.local.Node; -import accord.primitives.Known; -import accord.primitives.SaveStatus; -import accord.primitives.Status; +import accord.local.RedundantBefore; import accord.local.StoreParticipants; import accord.primitives.Ballot; +import accord.primitives.Known; import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SaveStatus; +import accord.primitives.Status; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Writes; import accord.utils.Invariants; -import org.agrona.collections.Long2ObjectHashMap; +import org.agrona.collections.Int2ObjectHashMap; -import static accord.primitives.SaveStatus.Erased; import static accord.primitives.SaveStatus.NotDefined; import static accord.primitives.SaveStatus.Stable; import static accord.primitives.Status.Invalidated; import static accord.primitives.Status.Truncated; import static accord.utils.Invariants.illegalState; -public class Journal +// TODO: looks like we also have accord via dependency here somehow? +public class InMemoryJournal implements Journal { - private final Long2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> diffsPerCommandStore = new Long2ObjectHashMap<>(); + private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> diffsPerCommandStore = new Int2ObjectHashMap<>(); + private final FieldStates fieldStates; + + private static class FieldStates + { + RedundantBefore redundantBefore = RedundantBefore.EMPTY; + NavigableMap<TxnId, Ranges> bootstrapBeganAt = ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); + NavigableMap<Timestamp, Ranges> safeToRead = ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY); + CommandStores.RangesForEpoch.Snapshot rangesForEpoch = null; + } private final Node.Id id; - public Journal(Node.Id id) + public InMemoryJournal(Node.Id id) { this.id = id; + this.fieldStates = new FieldStates(); } - public void purge(IntFunction<CommandStore> storeSupplier) + @Override + public Command loadCommand(int commandStoreId, TxnId txnId, + // TODO: currently unused! + RedundantBefore redundantBefore, DurableBefore durableBefore) { - for (Map.Entry<Long, NavigableMap<TxnId, List<Diff>>> e : diffsPerCommandStore.entrySet()) + List<Diff> diffs = this.diffsPerCommandStore.get(commandStoreId).get(txnId); + return reconstruct(diffs); + } + + @Override + public void saveCommand(int store, CommandUpdate diff, Runnable onFlush) + { + if (diff == null) + return; + + if (diff.after.saveStatus() == SaveStatus.Erased) { - int commandStoreId = e.getKey().intValue(); + diffsPerCommandStore.computeIfAbsent(store, (k) -> new TreeMap<>()) + .remove(diff.after.txnId()); + return; + } + + diffsPerCommandStore.computeIfAbsent(store, (k) -> new TreeMap<>()) + .computeIfAbsent(diff.txnId, (k_) -> new ArrayList<>()) + .add(diff(diff.before, diff.after)); + + if (onFlush!= null) + onFlush.run(); + } + + @Override + public RedundantBefore loadRedundantBefore(int commandStoreId) + { + return fieldStates.redundantBefore; + } + + @Override + public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId) + { + return fieldStates.bootstrapBeganAt; + } + + @Override + public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId) + { + return fieldStates.safeToRead; + } + + @Override + public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int commandStoreId) + { + return fieldStates.rangesForEpoch; + } + + public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush) + { + if (fieldUpdates.newRedundantBefore != null) + fieldStates.redundantBefore = fieldUpdates.newRedundantBefore; + if (fieldUpdates.newSafeToRead != null) + fieldStates.safeToRead = fieldUpdates.newSafeToRead; + if (fieldUpdates.newBootstrapBeganAt != null) + fieldStates.bootstrapBeganAt = fieldUpdates.newBootstrapBeganAt; + if (fieldUpdates.newRangesForEpoch != null) + fieldStates.rangesForEpoch = fieldUpdates.newRangesForEpoch; + + if (onFlush!= null) + onFlush.run(); + } + + @Override + public void purge(CommandStores commandStores) + { + for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> e : diffsPerCommandStore.entrySet()) + { + int commandStoreId = e.getKey(); Map<TxnId, List<Diff>> localJournal = e.getValue(); - CommandStore store = storeSupplier.apply(commandStoreId); + CommandStore store = commandStores.forId(commandStoreId); if (store == null) continue; - Map<TxnId, List<Diff>> updates = new HashMap<>(); - List<TxnId> removals = new ArrayList<>(); for (Map.Entry<TxnId, List<Diff>> e2 : localJournal.entrySet()) { TxnId txnId = e2.getKey(); List<Diff> diffs = e2.getValue(); - Command command = reconstruct(diffs, Reconstruct.Last).get(0); + Command command = reconstruct(diffs); if (command.status() == Truncated || command.status() == Invalidated) continue; // Already truncated Cleanup cleanup = Cleanup.shouldCleanup(store.agent(), command, store.unsafeGetRedundantBefore(), store.durableBefore()); @@ -117,30 +197,40 @@ public class Journal } } - public void reconstructAll(InMemoryCommandStore.Loader loader, int commandStoreId) + @Override + public void replay(CommandStores commandStores) { - // Nothing to do here, journal is empty for this command store - if (!diffsPerCommandStore.containsKey(commandStoreId)) - return; - - // copy to avoid concurrent modification when appending to journal - Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffsPerCommandStore.get(commandStoreId)); - for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet()) - e.setValue(new ArrayList<>(e.getValue())); + OnDone sync = new OnDone() { + public void success() {} + public void failure(Throwable t) { throw new RuntimeException("Caught an exception during replay", t); } + }; - List<Command> toApply = new ArrayList<>(); - for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet()) + for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> diffEntry : diffsPerCommandStore.entrySet()) { - if (e.getValue().isEmpty()) continue; - Command command = reconstruct(commandStoreId, e.getKey(), e.getValue()); - if (command.saveStatus().compareTo(Stable) >= 0 && !command.hasBeen(Truncated)) - toApply.add(command); - loader.load(command); - } + int commandStoreId = diffEntry.getKey(); + // copy to avoid concurrent modification when appending to journal + Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffEntry.getValue()); + + InMemoryCommandStore commandStore = (InMemoryCommandStore) commandStores.forId(commandStoreId); + Loader loader = commandStore.loader(); + + for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet()) + e.setValue(new ArrayList<>(e.getValue())); - toApply.sort(Comparator.comparing(Command::executeAt)); - for (Command command : toApply) - loader.apply(command); + List<Command> toApply = new ArrayList<>(); + for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet()) + { + if (e.getValue().isEmpty()) continue; + Command command = reconstruct(e.getValue()); + if (command.saveStatus().compareTo(Stable) >= 0 && !command.hasBeen(Truncated)) + toApply.add(command); + loader.load(command, sync); + } + + toApply.sort(Comparator.comparing(Command::executeAt)); + for (Command command : toApply) + loader.apply(command, sync); + } } static class ErasedList extends AbstractList<Diff> @@ -211,28 +301,10 @@ public class Journal } } - private enum Reconstruct - { - Each, - Last - } - - public Command reconstruct(int commandStoreId, TxnId txnId) - { - return reconstruct(commandStoreId, txnId, diffsPerCommandStore.get(commandStoreId).get(txnId)); - } - - public Command reconstruct(int commandStoreId, TxnId txnId, List<Diff> diffs) - { - return reconstruct(diffs, Reconstruct.Last).get(0); - } - - private List<Command> reconstruct(List<Diff> diffs, Reconstruct reconstruct) + private Command reconstruct(List<Diff> diffs) { Invariants.checkState(diffs != null && !diffs.isEmpty()); - List<Command> results = new ArrayList<>(); - TxnId txnId = null; Timestamp executeAt = null; Timestamp executesAtLeast = null; @@ -260,13 +332,7 @@ public class Journal if (diff.executesAtLeast != null) executesAtLeast = diff.executesAtLeast.get(); if (diff.saveStatus != null) - { - Set<SaveStatus> allowed = new HashSet<>(); - allowed.add(SaveStatus.TruncatedApply); - allowed.add(SaveStatus.TruncatedApplyWithOutcome); - saveStatus = diff.saveStatus.get(); - } if (diff.durability != null) durability = diff.durability.get(); @@ -307,75 +373,68 @@ public class Journal result = null; break; } + } - CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId); - if (partialTxn != null) - attrs.partialTxn(partialTxn); - if (durability != null) - attrs.durability(durability); - if (participants != null) attrs.setParticipants(participants); - else attrs.setParticipants(StoreParticipants.empty(txnId)); - - // TODO (desired): we can simplify this logic if, instead of diffing, we will infer the diff from the status - if (partialDeps != null && - (saveStatus.known.deps != Known.KnownDeps.NoDeps && - saveStatus.known.deps != Known.KnownDeps.DepsErased && - saveStatus.known.deps != Known.KnownDeps.DepsUnknown)) - attrs.partialDeps(partialDeps); - Invariants.checkState(saveStatus != null, - "Save status is null after applying %s", diffs); + CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId); + if (partialTxn != null) + attrs.partialTxn(partialTxn); + if (durability != null) + attrs.durability(durability); + if (participants != null) attrs.setParticipants(participants); + else attrs.setParticipants(StoreParticipants.empty(txnId)); + + // TODO (desired): we can simplify this logic if, instead of diffing, we will infer the diff from the status + if (partialDeps != null && + (saveStatus.known.deps != Known.KnownDeps.NoDeps && + saveStatus.known.deps != Known.KnownDeps.DepsErased && + saveStatus.known.deps != Known.KnownDeps.DepsUnknown)) + attrs.partialDeps(partialDeps); + + try + { - try + Command current; + switch (saveStatus.status) { - if (reconstruct == Reconstruct.Each || - (reconstruct == Reconstruct.Last && i == diffs.size() - 1)) - { - Command current; - switch (saveStatus.status) - { - case NotDefined: - current = saveStatus == SaveStatus.Uninitialised ? Command.NotDefined.uninitialised(attrs.txnId()) - : Command.NotDefined.notDefined(attrs, promised); - break; - case PreAccepted: - current = Command.PreAccepted.preAccepted(attrs, executeAt, promised); - break; - case AcceptedInvalidate: - case Accepted: - case PreCommitted: - if (saveStatus == SaveStatus.AcceptedInvalidate) - current = Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, acceptedOrCommitted); - else - current = Command.Accepted.accepted(attrs, saveStatus, executeAt, promised, acceptedOrCommitted); - break; - case Committed: - case Stable: - current = Command.Committed.committed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn); - break; - case PreApplied: - case Applied: - current = Command.Executed.executed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes, result); - break; - case Invalidated: - case Truncated: - current = truncated(attrs, saveStatus, executeAt, executesAtLeast, writes, result); - break; - default: - throw new IllegalStateException("Do not know " + saveStatus.status + " " + saveStatus); - } - - results.add(current); - } + case NotDefined: + current = saveStatus == SaveStatus.Uninitialised ? Command.NotDefined.uninitialised(attrs.txnId()) + : Command.NotDefined.notDefined(attrs, promised); + break; + case PreAccepted: + current = Command.PreAccepted.preAccepted(attrs, executeAt, promised); + break; + case AcceptedInvalidate: + case Accepted: + case PreCommitted: + if (saveStatus == SaveStatus.AcceptedInvalidate) + current = Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, acceptedOrCommitted); + else + current = Command.Accepted.accepted(attrs, saveStatus, executeAt, promised, acceptedOrCommitted); + break; + case Committed: + case Stable: + current = Command.Committed.committed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn); + break; + case PreApplied: + case Applied: + current = Command.Executed.executed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes, result); + break; + case Invalidated: + case Truncated: + current = truncated(attrs, saveStatus, executeAt, executesAtLeast, writes, result); + break; + default: + throw new IllegalStateException("Do not know " + saveStatus.status + " " + saveStatus); } - catch (Throwable t) - { - throw new RuntimeException("Can not reconstruct from diff:\n" + diffs.stream().map(o -> o.toString()) - .collect(Collectors.joining("\n")), - t); - } + return current; + } + catch (Throwable t) + { + throw new RuntimeException("Can not reconstruct from diff:\n" + diffs.stream().map(o -> o.toString()) + .collect(Collectors.joining("\n")), + t); } - return results; } private static Command.Truncated truncated(CommonAttributes.Mutable attrs, SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes writes, Result result) @@ -397,23 +456,6 @@ public class Journal } } - public void onExecute(int commandStoreId, Command before, Command after, boolean isPrimary) - { - if (before == null && after == null) - return; - - Diff diff = diff(before, after); - if (!isPrimary) - diff = diff.asNonPrimary(); - - if (diff != null) - { - diffsPerCommandStore.computeIfAbsent(commandStoreId, (k) -> new TreeMap<>()) - .computeIfAbsent(after.txnId(), (k_) -> new ArrayList<>()) - .add(diff); - } - } - private static class Diff { public final TxnId txnId; @@ -470,12 +512,6 @@ public class Journal this.result = result; } - // We allow only save status, and waitingOn to be updated by non-primary transactions - public Diff asNonPrimary() - { - return new Diff(txnId, null, null, saveStatus, null, null, null, null, null, null, waitingOn, null, null); - } - public boolean allNulls() { if (txnId != null) return false; @@ -546,7 +582,7 @@ public class Journal ifNotEqual(before, after, Command::participants, true), ifNotEqual(before, after, Command::partialTxn, false), ifNotEqual(before, after, Command::partialDeps, false), - ifNotEqual(before, after, Journal::getWaitingOn, true), + ifNotEqual(before, after, InMemoryJournal::getWaitingOn, true), ifNotEqual(before, after, Command::writes, false), ifNotEqual(before, after, Command::result, false)); if (diff.allNulls()) diff --git a/accord-core/src/main-test/java/accord/impl/basic/LoggingJournal.java b/accord-core/src/main-test/java/accord/impl/basic/LoggingJournal.java new file mode 100644 index 00000000..012bbff7 --- /dev/null +++ b/accord-core/src/main-test/java/accord/impl/basic/LoggingJournal.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl.basic; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.NavigableMap; + +import accord.api.Journal; +import accord.local.Command; +import accord.local.CommandStores; +import accord.local.DurableBefore; +import accord.local.RedundantBefore; +import accord.primitives.Ranges; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; + +/** + * Logging journal, a wrapper over journal for debugging / inspecting history purposes + */ +public class LoggingJournal implements Journal +{ + private final BufferedWriter log; + private final Journal delegate; + + public LoggingJournal(Journal delegate) + { + this.delegate = delegate; + File f = new File("journal.log"); + try + { + log = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f))); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + } + + private synchronized void log(String format, Object... objects) + { + try + { + log.write(String.format(format, objects)); + log.flush(); + } + catch (IOException e) + { + // ignore + } + } + + public Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) + { + return delegate.loadCommand(commandStoreId, txnId, redundantBefore, durableBefore); + } + + public void saveCommand(int store, CommandUpdate update, Runnable onFlush) + { + log("%d: %s\n", store, update.after); + delegate.saveCommand(store, update, onFlush); + } + + public void purge(CommandStores commandStores) + { + log("PURGE\n"); + delegate.purge(commandStores); + } + + public void replay(CommandStores commandStores) + { + delegate.replay(commandStores); + } + + public RedundantBefore loadRedundantBefore(int commandStoreId) + { + return delegate.loadRedundantBefore(commandStoreId); + } + + public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId) + { + return delegate.loadBootstrapBeganAt(commandStoreId); + } + + public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId) + { + return delegate.loadSafeToRead(commandStoreId); + } + + public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int commandStoreId) + { + return delegate.loadRangesForEpoch(commandStoreId); + } + + public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush) + { + log("%d: %s", store, fieldUpdates); + delegate.saveStoreState(store, fieldUpdates, onFlush); + } +} \ No newline at end of file diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java new file mode 100644 index 00000000..1747c97f --- /dev/null +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.api; + +import java.util.NavigableMap; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import accord.local.Command; +import accord.local.CommandStores; +import accord.local.DurableBefore; +import accord.local.RedundantBefore; +import accord.primitives.Ranges; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; + +/** + * Persisted journal for transactional recovery. + */ +public interface Journal +{ + Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore); + // TODO: use OnDone instead of Runnable + void saveCommand(int store, CommandUpdate value, Runnable onFlush); + + void purge(CommandStores commandStores); + void replay(CommandStores commandStores); + + RedundantBefore loadRedundantBefore(int commandStoreId); + NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId); + NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId); + CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int commandStoreId); + + void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush); + + // TODO (required): this class can be changed in favour of Writer as if/when we switch from DelayedCommandStore to AccordCommandStore + // TODO: move to a more appropriate spot? + class CommandUpdate + { + public final TxnId txnId; + public final Command before; + public final Command after; + + public CommandUpdate(@Nullable Command before, @Nonnull Command after) + { + this.txnId = after.txnId(); + this.before = before; + this.after = after; + } + } + + class FieldUpdates + { + // TODO: use persisted field logic + public RedundantBefore newRedundantBefore; + public NavigableMap<TxnId, Ranges> newBootstrapBeganAt; + public NavigableMap<Timestamp, Ranges> newSafeToRead; + public CommandStores.RangesForEpoch.Snapshot newRangesForEpoch; + + public String toString() + { + return "FieldUpdates{" + + "newRedundantBefore=" + newRedundantBefore + + ", newBootstrapBeganAt=" + newBootstrapBeganAt + + ", newSafeToRead=" + newSafeToRead + + ", newRangesForEpoch=" + newRangesForEpoch + + '}'; + } + } + + /** + * + */ + interface Loader + { + void load(Command next, OnDone onDone); + void apply(Command next, OnDone onDone); + } + + + interface OnDone + { + void success(); + void failure(Throwable t); + } +} diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java b/accord-core/src/main/java/accord/impl/CommandChange.java new file mode 100644 index 00000000..60d1a209 --- /dev/null +++ b/accord-core/src/main/java/accord/impl/CommandChange.java @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +//import java.util.function.Function; +// +//import com.google.common.annotations.VisibleForTesting; +// +//import accord.api.Agent; +//import accord.api.Result; +//import accord.local.Cleanup; +//import accord.local.Command; +//import accord.local.CommonAttributes; +//import accord.local.DurableBefore; +//import accord.local.RedundantBefore; +//import accord.local.StoreParticipants; +//import accord.primitives.Ballot; +//import accord.primitives.PartialDeps; +//import accord.primitives.PartialTxn; +//import accord.primitives.SaveStatus; +//import accord.primitives.Status; +//import accord.primitives.Timestamp; +//import accord.primitives.TxnId; +//import accord.primitives.Writes; +//import accord.utils.Invariants; +// +//import static accord.impl.CommandChange.Fields.ACCEPTED; +//import static accord.impl.CommandChange.Fields.CLEANUP; +//import static accord.impl.CommandChange.Fields.DURABILITY; +//import static accord.impl.CommandChange.Fields.EXECUTES_AT_LEAST; +//import static accord.impl.CommandChange.Fields.EXECUTE_AT; +//import static accord.impl.CommandChange.Fields.FIELDS; +//import static accord.impl.CommandChange.Fields.PARTIAL_DEPS; +//import static accord.impl.CommandChange.Fields.PARTIAL_TXN; +//import static accord.impl.CommandChange.Fields.PARTICIPANTS; +//import static accord.impl.CommandChange.Fields.PROMISED; +//import static accord.impl.CommandChange.Fields.SAVE_STATUS; +//import static accord.impl.CommandChange.Fields.WAITING_ON; +//import static accord.impl.CommandChange.Fields.WRITES; +//import static accord.impl.CommandChange.Load.ALL; +//import static accord.local.Cleanup.NO; +//import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME; +//import static accord.primitives.Known.KnownDeps.DepsErased; +//import static accord.primitives.Known.KnownDeps.DepsUnknown; +//import static accord.primitives.Known.KnownDeps.NoDeps; +//import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome; +//import static accord.primitives.Status.Durability.NotDurable; +//import static accord.utils.Invariants.illegalState; +//import static org.apache.cassandra.service.accord.SavedCommand.MinimalCommand; +// +///** +// * Class representing a change in the Command after its execution by SafeCommandStore +// */ +//public class CommandChange +//{ +// // This enum is order-dependent +// public enum Fields +// { +// PARTICIPANTS, // stored first so we can index it +// SAVE_STATUS, +// PARTIAL_DEPS, +// EXECUTE_AT, +// EXECUTES_AT_LEAST, +// DURABILITY, +// ACCEPTED, +// PROMISED, +// WAITING_ON, +// PARTIAL_TXN, +// WRITES, +// CLEANUP, +// ; +// +// public static final Fields[] FIELDS = values(); +// } +// +// // TODO (required): calculate flags once +// private static boolean anyFieldChanged(int flags) +// { +// return (flags >>> 16) != 0; +// } +// +// private static int validateFlags(int flags) +// { +// Invariants.checkState(0 == (~(flags >>> 16) & (flags & 0xffff))); +// return flags; +// } +// +// /** +// * Collects flags +// */ +// public static int getFlags(Command before, Command after) +// { +// int flags = 0; +// +// flags = collectFlags(before, after, Command::executeAt, true, EXECUTE_AT, flags); +// flags = collectFlags(before, after, Command::executesAtLeast, true, EXECUTES_AT_LEAST, flags); +// flags = collectFlags(before, after, Command::saveStatus, false, SAVE_STATUS, flags); +// flags = collectFlags(before, after, Command::durability, false, DURABILITY, flags); +// +// flags = collectFlags(before, after, Command::acceptedOrCommitted, false, ACCEPTED, flags); +// flags = collectFlags(before, after, Command::promised, false, PROMISED, flags); +// +// flags = collectFlags(before, after, Command::participants, true, PARTICIPANTS, flags); +// flags = collectFlags(before, after, Command::partialTxn, false, PARTIAL_TXN, flags); +// flags = collectFlags(before, after, Command::partialDeps, false, PARTIAL_DEPS, flags); +// +// // TODO: waitingOn vs WaitingOnWithExecutedAt? +// flags = collectFlags(before, after, CommandChange::getWaitingOn, true, WAITING_ON, flags); +// +// flags = collectFlags(before, after, Command::writes, false, WRITES, flags); +// +// return flags; +// } +// +// private static Command.WaitingOn getWaitingOn(Command command) +// { +// if (command instanceof Command.Committed) +// return command.asCommitted().waitingOn(); +// +// return null; +// } +// +// private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ, VAL> convert, boolean allowClassMismatch, CommandChange.Fields field, int flags) +// { +// VAL l = null; +// VAL r = null; +// if (lo != null) l = convert.apply(lo); +// if (ro != null) r = convert.apply(ro); +// +// if (l == r) +// return flags; // no change +// +// if (r == null) +// flags = setFieldIsNull(field, flags); +// +// if (l == null || r == null) +// return setFieldChanged(field, flags); +// +// assert allowClassMismatch || l.getClass() == r.getClass() : String.format("%s != %s", l.getClass(), r.getClass()); +// +// if (l.equals(r)) +// return flags; // no change +// +// return setFieldChanged(field, flags); +// } +// +// private static int setFieldChanged(CommandChange.Fields field, int oldFlags) +// { +// return oldFlags | (0x10000 << field.ordinal()); +// } +// +// @VisibleForTesting +// public static boolean getFieldChanged(CommandChange.Fields field, int oldFlags) +// { +// return (oldFlags & (0x10000 << field.ordinal())) != 0; +// } +// +// static int toIterableSetFields(int flags) +// { +// return flags >>> 16; +// } +// +// static CommandChange.Fields nextSetField(int iterable) +// { +// int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable)); +// return i == 32 ? null : FIELDS[i]; +// } +// +// static int unsetIterableFields(CommandChange.Fields field, int iterable) +// { +// return iterable & ~(1 << field.ordinal()); +// } +// +// @VisibleForTesting +// static boolean getFieldIsNull(CommandChange.Fields field, int oldFlags) +// { +// return (oldFlags & (1 << field.ordinal())) != 0; +// } +// +// private static int setFieldIsNull(CommandChange.Fields field, int oldFlags) +// { +// return oldFlags | (1 << field.ordinal()); +// } +// +// public enum Load +// { +// ALL(0), +// PURGEABLE(SAVE_STATUS, PARTICIPANTS, DURABILITY, EXECUTE_AT, WRITES), +// MINIMAL(SAVE_STATUS, PARTICIPANTS, EXECUTE_AT); +// +// final int mask; +// +// Load(int mask) +// { +// this.mask = mask; +// } +// +// Load(Fields ... fields) +// { +// int mask = -1; +// for (Fields field : fields) +// mask &= ~(1<< field.ordinal()); +// this.mask = mask; +// } +// } +// +// /** +// * Helper class for reconstructing Command from a sequence of logged CommandChanges +// */ +// public static class CommandBuilder +// { +// final int mask; +// int flags; +// +// TxnId txnId; +// +// Timestamp executeAt; +// Timestamp executeAtLeast; +// SaveStatus saveStatus; +// Status.Durability durability; +// +// Ballot acceptedOrCommitted; +// Ballot promised; +// +// StoreParticipants participants; +// PartialTxn partialTxn; +// PartialDeps partialDeps; +// +// byte[] waitingOnBytes; +// CommandChange.WaitingOnProvider waitingOn; +// Writes writes; +// Result result; +// Cleanup cleanup; +// +// boolean nextCalled; +// int count; +// +// public CommandBuilder(TxnId txnId, CommandChange.Load load) +// { +// this.mask = load.mask; +// init(txnId); +// } +// +// public CommandBuilder(TxnId txnId) +// { +// this(txnId, ALL); +// } +// +// public CommandBuilder(CommandChange.Load load) +// { +// this.mask = load.mask; +// } +// +// public CommandBuilder() +// { +// this(ALL); +// } +// +// public TxnId txnId() +// { +// return txnId; +// } +// +// public Timestamp executeAt() +// { +// return executeAt; +// } +// +// public Timestamp executeAtLeast() +// { +// return executeAtLeast; +// } +// +// public SaveStatus saveStatus() +// { +// return saveStatus; +// } +// +// public Status.Durability durability() +// { +// return durability; +// } +// +// public Ballot acceptedOrCommitted() +// { +// return acceptedOrCommitted; +// } +// +// public Ballot promised() +// { +// return promised; +// } +// +// public StoreParticipants participants() +// { +// return participants; +// } +// +// public PartialTxn partialTxn() +// { +// return partialTxn; +// } +// +// public PartialDeps partialDeps() +// { +// return partialDeps; +// } +// +// public CommandChange.WaitingOnProvider waitingOn() +// { +// return waitingOn; +// } +// +// public Writes writes() +// { +// return writes; +// } +// +// public Result result() +// { +// return result; +// } +// +// public void clear() +// { +// flags = 0; +// txnId = null; +// +// executeAt = null; +// executeAtLeast = null; +// saveStatus = null; +// durability = null; +// +// acceptedOrCommitted = null; +// promised = null; +// +// participants = null; +// partialTxn = null; +// partialDeps = null; +// +// waitingOnBytes = null; +// waitingOn = null; +// writes = null; +// result = null; +// cleanup = null; +// +// nextCalled = false; +// count = 0; +// } +// +// public void reset(TxnId txnId) +// { +// clear(); +// init(txnId); +// } +// +// public void init(TxnId txnId) +// { +// this.txnId = txnId; +// durability = NotDurable; +// acceptedOrCommitted = promised = Ballot.ZERO; +// waitingOn = (txn, deps) -> null; +// // TODO +// //result = CommandSerializers.APPLIED; +// } +// +// public boolean isEmpty() +// { +// return !nextCalled; +// } +// +// public int count() +// { +// return count; +// } +// +// public Cleanup shouldCleanup(Agent agent, RedundantBefore redundantBefore, DurableBefore durableBefore) +// { +// if (!nextCalled) +// return NO; +// +// if (saveStatus == null || participants == null) +// return Cleanup.NO; +// +// Cleanup cleanup = Cleanup.shouldCleanupPartial(agent, txnId, saveStatus, durability, participants, redundantBefore, durableBefore); +// if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0) +// cleanup = this.cleanup; +// return cleanup; +// } +// +// // TODO (expected): avoid allocating new builder +// public CommandBuilder maybeCleanup(Cleanup cleanup) +// { +// if (saveStatus() == null) +// return this; +// +// switch (cleanup) +// { +// case EXPUNGE: +// case ERASE: +// return null; +// +// case EXPUNGE_PARTIAL: +// return expungePartial(cleanup, saveStatus, true); +// +// case VESTIGIAL: +// case INVALIDATE: +// return saveStatusOnly(); +// +// case TRUNCATE_WITH_OUTCOME: +// case TRUNCATE: +// return expungePartial(cleanup, cleanup.appliesIfNot, cleanup == TRUNCATE_WITH_OUTCOME); +// +// case NO: +// return this; +// default: +// throw new IllegalStateException("Unknown cleanup: " + cleanup);} +// } +// +// public CommandBuilder expungePartial(Cleanup cleanup, SaveStatus saveStatus, boolean includeOutcome) +// { +// Invariants.checkState(txnId != null); +// CommandBuilder builder = new CommandBuilder(txnId, ALL); +// +// builder.count++; +// builder.nextCalled = true; +// +// Invariants.checkState(saveStatus != null); +// builder.flags = setFieldChanged(SAVE_STATUS, builder.flags); +// builder.saveStatus = saveStatus; +// builder.flags = setFieldChanged(CLEANUP, builder.flags); +// builder.cleanup = cleanup; +// if (executeAt != null) +// { +// builder.flags = setFieldChanged(EXECUTE_AT, builder.flags); +// builder.executeAt = executeAt; +// } +// if (durability != null) +// { +// builder.flags = setFieldChanged(DURABILITY, builder.flags); +// builder.durability = durability; +// } +// if (participants != null) +// { +// builder.flags = setFieldChanged(PARTICIPANTS, builder.flags); +// builder.participants = participants; +// } +// if (includeOutcome && builder.writes != null) +// { +// builder.flags = setFieldChanged(WRITES, builder.flags); +// builder.writes = writes; +// } +// +// return builder; +// } +// +// public CommandBuilder saveStatusOnly() +// { +// Invariants.checkState(txnId != null); +// CommandBuilder builder = new CommandBuilder(txnId, ALL); +// +// builder.count++; +// builder.nextCalled = true; +// +// // TODO: these accesses can be abstracted away +// if (saveStatus != null) +// { +// builder.flags = setFieldChanged(SAVE_STATUS, builder.flags); +// builder.saveStatus = saveStatus; +// } +// +// return builder; +// } +// +// public MinimalCommand asMinimal() +// { +// return new MinimalCommand(txnId, saveStatus, participants, durability, executeAt, writes); +// } +// +// @VisibleForTesting +// public void forceResult(Result newValue) +// { +// this.result = newValue; +// } +// +// public Command construct() +// { +// if (!nextCalled) +// return null; +// +// Invariants.checkState(txnId != null); +// CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId); +// if (partialTxn != null) +// attrs.partialTxn(partialTxn); +// if (durability != null) +// attrs.durability(durability); +// if (participants != null) +// attrs.setParticipants(participants); +// if (partialDeps != null && +// (saveStatus.known.deps != NoDeps && +// saveStatus.known.deps != DepsErased && +// saveStatus.known.deps != DepsUnknown)) +// attrs.partialDeps(partialDeps); +// +// Command.WaitingOn waitingOn = null; +// if (this.waitingOn != null) +// waitingOn = this.waitingOn.provide(txnId, partialDeps); +// +// switch (saveStatus.status) +// { +// case NotDefined: +// return saveStatus == SaveStatus.Uninitialised ? Command.NotDefined.uninitialised(attrs.txnId()) +// : Command.NotDefined.notDefined(attrs, promised); +// case PreAccepted: +// return Command.PreAccepted.preAccepted(attrs, executeAt, promised); +// case AcceptedInvalidate: +// case Accepted: +// case PreCommitted: +// if (saveStatus == SaveStatus.AcceptedInvalidate) +// return Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, acceptedOrCommitted); +// else +// return Command.Accepted.accepted(attrs, saveStatus, executeAt, promised, acceptedOrCommitted); +// case Committed: +// case Stable: +// return Command.Committed.committed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn); +// case PreApplied: +// case Applied: +// return Command.Executed.executed(attrs, saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes, result); +// case Truncated: +// case Invalidated: +// return truncated(attrs, saveStatus, executeAt, executeAtLeast, writes, result); +// default: +// throw new IllegalStateException(); +// } +// } +// +// private static Command.Truncated truncated(CommonAttributes.Mutable attrs, SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes writes, Result result) +// { +// switch (status) +// { +// default: +// throw illegalState("Unhandled SaveStatus: " + status); +// case TruncatedApplyWithOutcome: +// case TruncatedApplyWithDeps: +// case TruncatedApply: +// if (status != TruncatedApplyWithOutcome) +// result = null; +// if (attrs.txnId().kind().awaitsOnlyDeps()) +// return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result, executesAtLeast); +// return Command.Truncated.truncatedApply(attrs, status, executeAt, writes, result, null); +// case ErasedOrVestigial: +// return Command.Truncated.erasedOrInvalidOrVestigial(attrs.txnId(), attrs.durability(), attrs.participants()); +// case Erased: +// return Command.Truncated.erased(attrs.txnId(), attrs.durability(), attrs.participants()); +// case Invalidated: +// return Command.Truncated.invalidated(attrs.txnId()); +// } +// } +// +// public String toString() +// { +// return "Diff {" + +// "txnId=" + txnId + +// ", executeAt=" + executeAt + +// ", saveStatus=" + saveStatus + +// ", durability=" + durability + +// ", acceptedOrCommitted=" + acceptedOrCommitted + +// ", promised=" + promised + +// ", participants=" + participants + +// ", partialTxn=" + partialTxn + +// ", partialDeps=" + partialDeps + +// ", waitingOn=" + waitingOn + +// ", writes=" + writes + +// '}'; +// } +// } +// +// public interface WaitingOnProvider +// { +// Command.WaitingOn provide(TxnId txnId, PartialDeps deps); +// } +//} \ No newline at end of file diff --git a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java index bd370b93..ada18fc9 100644 --- a/accord-core/src/main/java/accord/impl/DurabilityScheduling.java +++ b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java @@ -36,6 +36,7 @@ import accord.api.Scheduler; import accord.coordinate.CoordinateGloballyDurable; import accord.coordinate.CoordinationFailed; import accord.coordinate.ExecuteSyncPoint.SyncPointErased; +import accord.coordinate.Timeout; import accord.local.Node; import accord.local.ShardDistributor; import accord.primitives.FullRoute; @@ -313,7 +314,10 @@ public class DurabilityScheduling implements ConfigurationService.Listener index *= 2; numberOfSplits *= 2; } - logger.warn("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits, syncId, ranges, fail); + if (fail instanceof Timeout) + logger.trace("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits, syncId, ranges, fail); + else + logger.debug("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits, syncId, ranges, fail); } } else @@ -335,7 +339,10 @@ public class DurabilityScheduling implements ConfigurationService.Listener .addCallback((success, fail) -> { if (fail != null && fail.getClass() != SyncPointErased.class) { - logger.debug("Exception coordinating shard durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail); + if (fail instanceof Timeout) + logger.trace("Exception coordinating shard durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail); + else + logger.debug("Exception coordinating shard durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail); retryCoordinateDurability(node, exclusiveSyncPoint, nextIndex); } else diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index e50998b4..2322186c 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -44,6 +44,7 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; +import accord.api.Journal; import accord.api.LocalListeners; import accord.api.RoutingKey; import accord.impl.progresslog.DefaultProgressLog; @@ -130,10 +131,12 @@ public abstract class InMemoryCommandStore extends CommandStore protected Timestamp maxRedundant = Timestamp.NONE; private InMemorySafeStore current; + private final Journal.Loader loader; public InMemoryCommandStore(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) { super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); + this.loader = new CommandLoader(this); } protected boolean canExposeUnloaded() @@ -1357,88 +1360,107 @@ public abstract class InMemoryCommandStore extends CommandStore unsafeSetRejectBefore(new RejectBefore()); } - public interface Loader + public Journal.Loader loader() { - void load(Command next); - void apply(Command next); + return loader; } - public Loader loader() + private static class CommandLoader implements Journal.Loader { - return new Loader() + private final InMemoryCommandStore commandStore; + + private CommandLoader(InMemoryCommandStore commandStore) { - private PreLoadContext context(Command command, KeyHistory keyHistory) - { - TxnId txnId = command.txnId(); - AbstractUnseekableKeys keys = null; + this.commandStore = commandStore; + } - if (CommandsForKey.manages(txnId)) - keys = (AbstractUnseekableKeys) command.participants().hasTouched(); - else if (!CommandsForKey.managesExecution(txnId) && command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated)) - keys = command.asCommitted().waitingOn.keys; + private PreLoadContext context(Command command, KeyHistory keyHistory) + { + TxnId txnId = command.txnId(); + AbstractUnseekableKeys keys = null; - if (keys != null) - { - return PreLoadContext.contextFor(txnId, keys, keyHistory); - } + if (CommandsForKey.manages(txnId)) + keys = (AbstractUnseekableKeys) command.participants().hasTouched(); + else if (!CommandsForKey.managesExecution(txnId) && command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated)) + keys = command.asCommitted().waitingOn.keys; - return PreLoadContext.contextFor(txnId); + if (keys != null) + { + return PreLoadContext.contextFor(txnId, keys, keyHistory); } - public void load(Command command) - { - TxnId txnId = command.txnId(); - - executeInContext(InMemoryCommandStore.this, - context(command, ASYNC), - safeStore -> { - Command local = command; - if (local.status() != Truncated && local.status() != Invalidated) - { - Cleanup cleanup = Cleanup.shouldCleanup(safeStore, local, local.participants()); - switch (cleanup) - { - case NO: - break; - case INVALIDATE: - case TRUNCATE_WITH_OUTCOME: - case TRUNCATE: - case ERASE: - local = Commands.purge(local, local.participants(), cleanup); - } - } - - local = safeStore.unsafeGet(txnId).update(safeStore, local); - if (local.status() == Truncated) - safeStore.progressLog().clear(local.txnId()); - return local; - }); + return PreLoadContext.contextFor(txnId); + } + @Override + public void load(Command command, Journal.OnDone onDone) + { + TxnId txnId = command.txnId(); + try + { + commandStore.executeInContext(commandStore, + context(command, ASYNC), + safeStore -> { + Command local = command; + if (local.status() != Truncated && local.status() != Invalidated) + { + Cleanup cleanup = Cleanup.shouldCleanup(safeStore, local, local.participants()); + switch (cleanup) + { + case NO: + break; + case INVALIDATE: + case TRUNCATE_WITH_OUTCOME: + case TRUNCATE: + case ERASE: + local = Commands.purge(local, local.participants(), cleanup); + } + } + + local = safeStore.unsafeGet(txnId).update(safeStore, local); + if (local.status() == Truncated) + safeStore.progressLog().clear(local.txnId()); + return local; + }); + onDone.success(); } - - public void apply(Command command) + catch (Throwable t) { - TxnId txnId = command.txnId(); + onDone.failure(t); + } + } + + @Override + public void apply(Command command, Journal.OnDone onDone) + { + TxnId txnId = command.txnId(); + try + { PreLoadContext context = context(command, KeyHistory.TIMESTAMPS); - executeInContext(InMemoryCommandStore.this, - context, - safeStore -> { - SafeCommand safeCommand = safeStore.unsafeGet(txnId); - Command local = safeCommand.current(); - if (local.is(Stable) || local.is(PreApplied)) - { - Commands.maybeExecute(safeStore, safeCommand, local, true, true); - } - else if (local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated)) - { - unsafeApplyWrites(safeStore, safeCommand, local); - } - return null; - }); + commandStore.executeInContext(commandStore, + context, + safeStore -> { + SafeCommand safeCommand = safeStore.unsafeGet(txnId); + Command local = safeCommand.current(); + if (local.is(Stable) || local.is(PreApplied)) + { + Commands.maybeExecute(safeStore, safeCommand, local, true, true); + } + else if (local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated)) + { + unsafeApplyWrites(safeStore, safeCommand, local); + } + return null; + }); + onDone.success(); } - }; + catch (Throwable t) + { + onDone.failure(t); + } + } } public static void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 6fb4ae04..f7eb397b 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -18,6 +18,7 @@ package accord.local; +import accord.api.Journal; import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.DataStore; @@ -207,6 +208,8 @@ public abstract class CommandStore implements AgentExecutor return id; } + public abstract Journal.Loader loader(); + @Override public Agent agent() { diff --git a/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java b/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java index e86a1805..1ee926df 100644 --- a/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java +++ b/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java @@ -75,7 +75,8 @@ public class ReadEphemeralTxnData extends ReadData private ReadEphemeralTxnData(TxnId txnId, Participants<?> readScope, Route<?> scope, long executeAtEpoch, @Nonnull Txn txn, @Nonnull Deps deps, @Nonnull FullRoute<?> route) { super(txnId, readScope.intersecting(scope), executeAtEpoch); - Invariants.checkState(executeAtEpoch == txnId.epoch()); + Invariants.checkState(executeAtEpoch == txnId.epoch(), + "Epoch for transaction %s (%d) did not match expected %d", txn, txnId.epoch(), executeAtEpoch); this.route = route; this.partialTxn = txn.intersecting(scope, false); this.partialDeps = deps.intersecting(scope); diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index bfbb7c7b..740b705d 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -52,6 +52,7 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.Journal; import accord.api.Key; import accord.burn.random.FrequentLargeRange; import accord.impl.MessageListener; @@ -59,6 +60,7 @@ import accord.impl.PrefixedIntHashKey; import accord.impl.TopologyFactory; import accord.impl.basic.Cluster; import accord.impl.basic.Cluster.Stats; +import accord.impl.basic.InMemoryJournal; import accord.impl.basic.Packet; import accord.impl.basic.PendingQueue; import accord.impl.basic.PendingRunnable; @@ -295,14 +297,14 @@ public class BurnTest random2.setSeed(seed); ExecutorService exec = Executors.newFixedThreadPool(2); RandomDelayQueue.ReconcilingQueueFactory factory = new RandomDelayQueue.ReconcilingQueueFactory(seed); - Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory, clients, nodes, keyCount, operations, concurrency, factory.get(true))); - Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory, clients, nodes, keyCount, operations, concurrency, factory.get(false))); + Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory, clients, nodes, keyCount, operations, concurrency, factory.get(true), InMemoryJournal::new)); + Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory, clients, nodes, keyCount, operations, concurrency, factory.get(false), InMemoryJournal::new)); exec.shutdown(); f1.get(); f2.get(); } - static void burn(RandomSource random, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency, PendingQueue pendingQueue) + static void burn(RandomSource random, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency, PendingQueue pendingQueue, Function<Id, Journal> journalFactory) { List<Throwable> failures = Collections.synchronizedList(new ArrayList<>()); AtomicLong progress = new AtomicLong(); @@ -462,8 +464,8 @@ public class BurnTest responseSink, random::fork, nowSupplier, topologyFactory, initialRequests::poll, onSubmitted::set, - ignore -> {} - ); + ignore -> {}, + journalFactory); for (Verifier verifier : validators.values()) verifier.close(); } @@ -622,12 +624,13 @@ public class BurnTest List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3)); burn(random, new TopologyFactory(rf, ranges(0, HASH_RANGE_START, HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() * 3))), - clients, - nodes, - 5 + random.nextInt(15), - operations, - 10 + random.nextInt(30), - new Factory(random).get()); + clients, + nodes, + 5 + random.nextInt(15), + operations, + 10 + random.nextInt(30), + new Factory(random).get(), + InMemoryJournal::new); } catch (Throwable t) { diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java index 0688fa5a..dc8b553a 100644 --- a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java +++ b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java @@ -18,6 +18,7 @@ package accord.coordinate; +import java.time.Duration; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -67,7 +68,6 @@ import accord.utils.async.AsyncResult; import static accord.Utils.id; import static accord.Utils.ids; import static accord.Utils.ranges; -import static accord.Utils.spinUntilSuccess; import static accord.Utils.writeTxn; import static accord.impl.IntKey.key; import static accord.impl.IntKey.keys; @@ -87,6 +87,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import org.awaitility.Awaitility; +import org.awaitility.core.ThrowingRunnable; public class CoordinateTransactionTest { @@ -416,4 +418,19 @@ public class CoordinateTransactionTest { return keys.toRoute(keys.get(0).toUnseekable()); } + + public static void spinUntilSuccess(ThrowingRunnable runnable) + { + spinUntilSuccess(runnable, 10); + } + + public static void spinUntilSuccess(ThrowingRunnable runnable, int timeoutInSeconds) + { + Awaitility.await() + .pollInterval(Duration.ofMillis(100)) + .pollDelay(0, TimeUnit.MILLISECONDS) + .atMost(timeoutInSeconds, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(runnable); + } } diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java index 7f1f81b8..9ea735c4 100644 --- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java +++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test; import accord.api.Agent; import accord.api.DataStore; +import accord.api.Journal; import accord.api.ProgressLog; import accord.api.RemoteListeners; import accord.api.RemoteListeners.Registration; @@ -394,6 +395,12 @@ public class RemoteListenersTest this.storeId = id; } + @Override + public Journal.Loader loader() + { + throw new UnsupportedOperationException(); + } + @Override public boolean inStore() { return false; } @Override public AsyncChain<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer) { return null; } @Override public <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> apply) { return null; } diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index 50b2b97c..1123ab73 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import accord.api.Agent; import accord.api.Data; import accord.api.DataStore; +import accord.api.Journal; import accord.api.ProgressLog; import accord.api.ProgressLog.BlockedUntil; import accord.api.Query; @@ -949,6 +950,11 @@ public class CommandsForKeyTest return true; } + public Journal.Loader loader() + { + throw new UnsupportedOperationException(); + } + @Override public Agent agent() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
