This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new bd0761c5 Expose Journal-related files to allow external Journal
implementation integration.
bd0761c5 is described below
commit bd0761c567d153995a3db8da686ffdc940247200
Author: Alex Petrov <[email protected]>
AuthorDate: Mon Nov 11 18:39:02 2024 +0100
Expose Journal-related files to allow external Journal implementation
integration.
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for
CASSANDRA-20112.
---
accord-core/src/main/java/accord/api/Journal.java | 100 ++++
.../src/main/java/accord/impl/AbstractLoader.java | 74 +++
.../java/accord/impl/InMemoryCommandStore.java | 158 +++---
.../main/java/accord/impl/TimestampsForKeys.java | 7 +-
.../src/main/java/accord/local/Command.java | 8 +-
.../src/main/java/accord/local/CommandStore.java | 3 +
accord-core/src/main/java/accord/local/Node.java | 4 +-
.../java/accord/messages/ReadEphemeralTxnData.java | 3 +-
.../src/main/java/accord/primitives/Timestamp.java | 4 +-
accord-core/src/test/java/accord/Utils.java | 26 +-
.../src/test/java/accord/burn/BurnTest.java | 615 +--------------------
.../burn/{BurnTest.java => BurnTestBase.java} | 131 ++---
.../src/test/java/accord/impl/IntHashKey.java | 2 +-
.../test/java/accord/impl/PrefixedIntHashKey.java | 2 +-
.../test/java/accord/impl/RemoteListenersTest.java | 7 +
.../src/test/java/accord/impl/basic/Cluster.java | 20 +-
.../accord/impl/basic/DelayedCommandStores.java | 7 +-
.../basic/{Journal.java => InMemoryJournal.java} | 335 ++++++-----
.../java/accord/impl/basic/LoggingJournal.java | 120 ++++
.../java/accord/impl/basic/VerifyingJournal.java | 97 ++++
.../src/test/java/accord/impl/list/ListQuery.java | 6 +-
.../src/test/java/accord/impl/list/ListRead.java | 2 +-
.../src/test/java/accord/impl/list/ListResult.java | 2 +-
.../java/accord/local/cfk/CommandsForKeyTest.java | 6 +
.../java/accord/topology/TopologyRandomizer.java | 4 +-
25 files changed, 759 insertions(+), 984 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Journal.java
b/accord-core/src/main/java/accord/api/Journal.java
new file mode 100644
index 00000000..6bd18bd2
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.api;
+
+import java.util.NavigableMap;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.local.Command;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+/**
+ * Persisted journal for transactional recovery.
+ */
+public interface Journal
+{
+ Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore
redundantBefore, DurableBefore durableBefore);
+ // TODO (required): use OnDone instead of Runnable
+ void saveCommand(int store, CommandUpdate value, Runnable onFlush);
+
+ void purge(CommandStores commandStores);
+ void replay(CommandStores commandStores);
+
+ RedundantBefore loadRedundantBefore(int commandStoreId);
+ NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId);
+ NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId);
+ CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int
commandStoreId);
+
+ void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush);
+
+ class CommandUpdate
+ {
+ public final TxnId txnId;
+ public final Command before;
+ public final Command after;
+
+ public CommandUpdate(@Nullable Command before, @Nonnull Command after)
+ {
+ this.txnId = after.txnId();
+ this.before = before;
+ this.after = after;
+ }
+ }
+
+ class FieldUpdates
+ {
+ // TODO (required): use persisted field logic
+ public RedundantBefore newRedundantBefore;
+ public NavigableMap<TxnId, Ranges> newBootstrapBeganAt;
+ public NavigableMap<Timestamp, Ranges> newSafeToRead;
+ public CommandStores.RangesForEpoch.Snapshot newRangesForEpoch;
+
+ public String toString()
+ {
+ return "FieldUpdates{" +
+ "newRedundantBefore=" + newRedundantBefore +
+ ", newBootstrapBeganAt=" + newBootstrapBeganAt +
+ ", newSafeToRead=" + newSafeToRead +
+ ", newRangesForEpoch=" + newRangesForEpoch +
+ '}';
+ }
+ }
+
+ /**
+ * Helper for CommandStore to restore Command states.
+ */
+ interface Loader
+ {
+ void load(Command next, OnDone onDone);
+ void apply(Command next, OnDone onDone);
+ }
+
+
+ interface OnDone
+ {
+ void success();
+ void failure(Throwable t);
+ }
+}
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
new file mode 100644
index 00000000..66745a6d
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.function.BiConsumer;
+
+import accord.api.Journal;
+import accord.local.Cleanup;
+import accord.local.Command;
+import accord.local.Commands;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.TxnId;
+
+import static accord.primitives.SaveStatus.Applying;
+import static accord.primitives.Status.Invalidated;
+import static accord.primitives.Status.PreApplied;
+import static accord.primitives.Status.Stable;
+import static accord.primitives.Status.Truncated;
+
+public abstract class AbstractLoader implements Journal.Loader
+{
+ protected Command loadInternal(Command command, SafeCommandStore safeStore)
+ {
+ TxnId txnId = command.txnId();
+ if (command.status() != Truncated && command.status() != Invalidated)
+ {
+ Cleanup cleanup = Cleanup.shouldCleanup(safeStore, command,
command.participants());
+ switch (cleanup)
+ {
+ case NO:
+ break;
+ case INVALIDATE:
+ case TRUNCATE_WITH_OUTCOME:
+ case TRUNCATE:
+ case ERASE:
+ command = Commands.purge(command, command.participants(),
cleanup);
+ }
+ }
+
+ return safeStore.unsafeGet(txnId).update(safeStore, command);
+ }
+
+ protected void applyWrites(Command command, SafeCommandStore safeStore,
BiConsumer<SafeCommand, Command> apply)
+ {
+ TxnId txnId = command.txnId();
+ SafeCommand safeCommand = safeStore.unsafeGet(txnId);
+ Command local = safeCommand.current();
+ if (local.is(Stable) || local.is(PreApplied))
+ {
+ Commands.maybeExecute(safeStore, safeCommand, local, true, true);
+ }
+ else if (local.saveStatus().compareTo(Applying) >= 0 &&
!local.hasBeen(Truncated))
+ {
+ apply.accept(safeCommand, local);
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index cae30a16..9bf05302 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -43,18 +43,16 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
-
-import accord.api.LocalListeners;
-import accord.api.RoutingKey;
-import accord.impl.progresslog.DefaultProgressLog;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Agent;
import accord.api.DataStore;
+import accord.api.Journal;
+import accord.api.LocalListeners;
import accord.api.ProgressLog;
-import accord.local.Cleanup;
+import accord.api.RoutingKey;
+import accord.impl.progresslog.DefaultProgressLog;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores.RangesForEpoch;
@@ -97,19 +95,17 @@ import static
accord.local.SafeCommandStore.TestDep.ANY_DEPS;
import static accord.local.SafeCommandStore.TestDep.WITH_OR_INVALIDATED;
import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE;
import static accord.local.SafeCommandStore.TestStatus.ANY_STATUS;
+import static accord.primitives.Routables.Slice.Minimal;
import static accord.primitives.SaveStatus.Applying;
import static accord.primitives.SaveStatus.Erased;
import static accord.primitives.SaveStatus.ErasedOrVestigial;
import static accord.primitives.SaveStatus.ReadyToExecute;
import static accord.primitives.Status.Applied;
import static accord.primitives.Status.Durability.Local;
-import static accord.primitives.Status.Invalidated;
-import static accord.primitives.Status.PreApplied;
+import static accord.primitives.Status.NotDefined;
import static accord.primitives.Status.PreCommitted;
import static accord.primitives.Status.Stable;
import static accord.primitives.Status.Truncated;
-import static accord.primitives.Status.NotDefined;
-import static accord.primitives.Routables.Slice.Minimal;
import static accord.utils.Invariants.illegalState;
import static java.lang.String.format;
@@ -130,10 +126,12 @@ public abstract class InMemoryCommandStore extends
CommandStore
protected Timestamp maxRedundant = Timestamp.NONE;
private InMemorySafeStore current;
+ private final Journal.Loader loader;
public InMemoryCommandStore(int id, NodeCommandStoreService time, Agent
agent, DataStore store, ProgressLog.Factory progressLogFactory,
LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder)
{
super(id, time, agent, store, progressLogFactory, listenersFactory,
epochUpdateHolder);
+ this.loader = new CommandLoader(this);
}
protected boolean canExposeUnloaded()
@@ -1377,99 +1375,89 @@ public abstract class InMemoryCommandStore extends
CommandStore
unsafeSetRejectBefore(new RejectBefore());
}
- public interface Loader
+ public Journal.Loader loader()
{
- void load(Command next);
- void apply(Command next);
+ return loader;
}
- public Loader loader()
+ private static class CommandLoader extends AbstractLoader
{
- return new Loader()
- {
- private PreLoadContext context(Command command, KeyHistory
keyHistory)
- {
- TxnId txnId = command.txnId();
- AbstractUnseekableKeys keys = null;
+ private final InMemoryCommandStore commandStore;
- if (CommandsForKey.manages(txnId))
- keys = (AbstractUnseekableKeys)
command.participants().hasTouched();
- else if (!CommandsForKey.managesExecution(txnId) &&
command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated))
- keys = command.asCommitted().waitingOn.keys;
+ private CommandLoader(InMemoryCommandStore commandStore)
+ {
+ this.commandStore = commandStore;
+ }
- if (keys != null)
- {
- return PreLoadContext.contextFor(txnId, keys, keyHistory);
- }
+ private PreLoadContext context(Command command, KeyHistory keyHistory)
+ {
+ TxnId txnId = command.txnId();
+ AbstractUnseekableKeys keys = null;
- return PreLoadContext.contextFor(txnId);
- }
+ if (CommandsForKey.manages(txnId))
+ keys = (AbstractUnseekableKeys)
command.participants().hasTouched();
+ else if (!CommandsForKey.managesExecution(txnId) &&
command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated))
+ keys = command.asCommitted().waitingOn.keys;
- public void load(Command command)
+ if (keys != null)
{
- TxnId txnId = command.txnId();
-
- executeInContext(InMemoryCommandStore.this,
- context(command, ASYNC),
- safeStore -> {
- Command local = command;
- if (local.status() != Truncated &&
local.status() != Invalidated)
- {
- Cleanup cleanup =
Cleanup.shouldCleanup(safeStore, local, local.participants());
- switch (cleanup)
- {
- case NO:
- break;
- case INVALIDATE:
- case TRUNCATE_WITH_OUTCOME:
- case TRUNCATE:
- case ERASE:
- local = Commands.purge(local,
local.participants(), cleanup);
- }
- }
-
- local =
safeStore.unsafeGet(txnId).update(safeStore, local);
- if (local.status() == Truncated)
-
safeStore.progressLog().clear(local.txnId());
- return local;
- });
+ return PreLoadContext.contextFor(txnId, keys, keyHistory);
+ }
+ return PreLoadContext.contextFor(txnId);
+ }
+ @Override
+ public void load(Command command, Journal.OnDone onDone)
+ {
+ try
+ {
+ commandStore.executeInContext(commandStore,
+ context(command, ASYNC),
+ safeStore ->
loadInternal(command, safeStore));
}
-
- public void apply(Command command)
+ catch (Throwable t)
{
- TxnId txnId = command.txnId();
+ onDone.failure(t);
+ }
+
+ onDone.success();
+ }
+ @Override
+ public void apply(Command command, Journal.OnDone onDone)
+ {
+ try
+ {
PreLoadContext context = context(command,
KeyHistory.TIMESTAMPS);
- executeInContext(InMemoryCommandStore.this,
- context,
- safeStore -> {
- SafeCommand safeCommand =
safeStore.unsafeGet(txnId);
- Command local = safeCommand.current();
- if (local.is(Stable) ||
local.is(PreApplied))
- {
- Commands.maybeExecute(safeStore,
safeCommand, local, true, true);
- }
- else if
(local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated))
- {
- unsafeApplyWrites(safeStore,
safeCommand, local);
- }
- return null;
- });
+ commandStore.executeInContext(commandStore,
+ context,
+ safeStore -> {
+ applyWrites(command,
safeStore, (safeCommand, cmd) -> {
+
unsafeApplyWrites(safeStore, safeCommand, cmd);
+ });
+ return null;
+ });
+ }
+ catch (Throwable t)
+ {
+ onDone.failure(t);
+ return;
}
- };
- }
- public static void unsafeApplyWrites(SafeCommandStore safeStore,
SafeCommand safeCommand, Command command)
- {
- Command.Executed executed = command.asExecuted();
- Participants<?> executes = executed.participants().executes(safeStore,
command.txnId(), command.executeAt());
- if (!executes.isEmpty())
+ onDone.success();
+ }
+
+ protected void unsafeApplyWrites(SafeCommandStore safeStore,
SafeCommand safeCommand, Command command)
{
- command.writes().applyUnsafe(safeStore,
Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn());
- safeCommand.applied(safeStore);
- safeStore.notifyListeners(safeCommand, command);
+ Command.Executed executed = command.asExecuted();
+ Participants<?> executes =
executed.participants().executes(safeStore, command.txnId(),
command.executeAt());
+ if (!executes.isEmpty())
+ {
+ command.writes().applyUnsafe(safeStore,
Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn());
+ safeCommand.applied(safeStore);
+ safeStore.notifyListeners(safeCommand, command);
+ }
}
}
diff --git a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
index d54080a6..f954f648 100644
--- a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
+++ b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
@@ -21,6 +21,7 @@ package accord.impl;
import accord.api.RoutingKey;
import accord.api.VisibleForImplementation;
import accord.local.SafeCommandStore;
+import accord.local.cfk.CommandsForKey;
import accord.primitives.RoutingKeys;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
@@ -48,7 +49,8 @@ public class TimestampsForKeys
{
if
(safeStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId,
current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY)
return current;
- throw illegalState("%s is less than the most recent write
timestamp %s", executeAt, lastWrite);
+ if (CommandsForKey.reportLinearizabilityViolations())
+ throw illegalState("%s is less than the most recent write
timestamp %s", executeAt, lastWrite);
}
Timestamp lastExecuted = current.lastExecutedTimestamp();
@@ -61,7 +63,8 @@ public class TimestampsForKeys
{
if
(!safeStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable()))
return current;
- throw illegalState("%s is less than the most recent executed
timestamp %s", executeAt, lastExecuted);
+ if (CommandsForKey.reportLinearizabilityViolations())
+ throw illegalState("%s is less than the most recent executed
timestamp %s", executeAt, lastExecuted);
}
long micros = executeAt.hlc();
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index cf64e6cd..a089036a 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -328,8 +328,8 @@ public abstract class Command implements CommonAttributes
{
default: throw new AssertionError("Unhandled Outcome: " +
known.outcome);
case Apply:
- Invariants.checkState(writes != null, "Writes is
null");
- Invariants.checkState(result != null, "Result is
null");
+ Invariants.checkState(writes != null, "Writes is null
%s", validate);
+ Invariants.checkState(result != null, "Result is null
%s", validate);
break;
case Invalidated:
Invariants.checkState(validate.durability().isMaybeInvalidated(), "%s is not
invalidated", validate.durability());
@@ -337,8 +337,8 @@ public abstract class Command implements CommonAttributes
Invariants.checkState(validate.durability() != Local);
case Erased:
case WasApply:
- Invariants.checkState(writes == null, "Writes exist");
- Invariants.checkState(result == null, "Results exist");
+ Invariants.checkState(writes == null, "Writes exist
for %s", validate);
+ Invariants.checkState(result == null, "Results exist
%s", validate);
break;
}
}
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index 6fb4ae04..f7eb397b 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -18,6 +18,7 @@
package accord.local;
+import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.api.DataStore;
@@ -207,6 +208,8 @@ public abstract class CommandStore implements AgentExecutor
return id;
}
+ public abstract Journal.Loader loader();
+
@Override
public Agent agent()
{
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index a213c112..38939974 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -671,7 +671,9 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
*/
public TxnId nextTxnId(Txn.Kind rw, Domain domain)
{
- return new TxnId(uniqueNow(), rw, domain);
+ TxnId txnId = new TxnId(uniqueNow(), rw, domain);
+ Invariants.checkState((txnId.lsb & (0xffff & ~TxnId.IDENTITY_FLAGS))
== 0);
+ return txnId;
}
public AsyncResult<Result> coordinate(Txn txn)
diff --git
a/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
b/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
index e86a1805..1ee926df 100644
--- a/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
+++ b/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
@@ -75,7 +75,8 @@ public class ReadEphemeralTxnData extends ReadData
private ReadEphemeralTxnData(TxnId txnId, Participants<?> readScope,
Route<?> scope, long executeAtEpoch, @Nonnull Txn txn, @Nonnull Deps deps,
@Nonnull FullRoute<?> route)
{
super(txnId, readScope.intersecting(scope), executeAtEpoch);
- Invariants.checkState(executeAtEpoch == txnId.epoch());
+ Invariants.checkState(executeAtEpoch == txnId.epoch(),
+ "Epoch for transaction %s (%d) did not match
expected %d", txn, txnId.epoch(), executeAtEpoch);
this.route = route;
this.partialTxn = txn.intersecting(scope, false);
this.partialDeps = deps.intersecting(scope);
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 0a9de552..f488ab8b 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -38,8 +38,8 @@ public class Timestamp implements Comparable<Timestamp>,
EpochSupplier
*/
private static final int MERGE_FLAGS = 0x8000;
// TODO (testing): is this the correct set of identity bits?
- private static final long IDENTITY_LSB = 0xFFFFFFFFFFFF001EL;
- public static final int IDENTITY_FLAGS = 0x001E;
+ private static final long IDENTITY_LSB = 0xFFFFFFFF_FFFF001FL;
+ public static final int IDENTITY_FLAGS = 0x00000000_0000001F;
public static final long MAX_EPOCH = (1L << 48) - 1;
private static final long HLC_INCR = 1L << 16;
static final long MAX_FLAGS = HLC_INCR - 1;
diff --git a/accord-core/src/test/java/accord/Utils.java
b/accord-core/src/test/java/accord/Utils.java
index 9ea7d858..b2b04cae 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -23,25 +23,25 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import accord.api.Agent;
import com.google.common.collect.Sets;
+import accord.api.Agent;
import accord.api.Key;
+import accord.api.LocalConfig;
import accord.api.MessageSink;
import accord.api.Scheduler;
import accord.api.TopologySorter;
-import accord.api.LocalConfig;
import accord.coordinate.CoordinationAdapter;
+import accord.impl.DefaultLocalListeners;
+import accord.impl.DefaultRemoteListeners;
import accord.impl.DefaultTimeouts;
import accord.impl.InMemoryCommandStores;
import accord.impl.IntKey;
-import accord.impl.DefaultLocalListeners;
-import accord.impl.progresslog.DefaultProgressLogs;
-import accord.impl.DefaultRemoteListeners;
import accord.impl.SizeOfIntersectionSorter;
import accord.impl.TestAgent;
import accord.impl.list.ListQuery;
@@ -50,6 +50,7 @@ import accord.impl.list.ListUpdate;
import accord.impl.mock.MockCluster;
import accord.impl.mock.MockConfigurationService;
import accord.impl.mock.MockStore;
+import accord.impl.progresslog.DefaultProgressLogs;
import accord.local.DurableBefore;
import accord.local.Node;
import accord.local.Node.Id;
@@ -132,9 +133,10 @@ public class Utils
public static Txn listWriteTxn(Id client, Keys keys)
{
- ListUpdate update = new ListUpdate(Function.identity());
+ TreeMap<Key, Integer> map = new TreeMap<>();
for (Key k : keys)
- update.put(k, 1);
+ map.put(k, 1);
+ ListUpdate update = new ListUpdate(Function.identity());
ListRead read = new ListRead(Function.identity(), false, keys, keys);
ListQuery query = new ListQuery(client, keys.size(), false);
return new Txn.InMemory(keys, read, query, update);
@@ -202,6 +204,11 @@ public class Utils
return node;
}
+ public static TopologyManager testTopologyManager(TopologySorter.Supplier
sorter, Id node)
+ {
+ return new TopologyManager(sorter, new TestAgent.RethrowAgent(), node,
Scheduler.NEVER_RUN_SCHEDULED, new MockCluster.Clock(0), LocalConfig.DEFAULT);
+ }
+
public static void spinUntilSuccess(ThrowingRunnable runnable)
{
spinUntilSuccess(runnable, 10);
@@ -216,9 +223,4 @@ public class Utils
.ignoreExceptions()
.untilAsserted(runnable);
}
-
- public static TopologyManager testTopologyManager(TopologySorter.Supplier
sorter, Id node)
- {
- return new TopologyManager(sorter, new TestAgent.RethrowAgent(), node,
Scheduler.NEVER_RUN_SCHEDULED, new MockCluster.Clock(0), LocalConfig.DEFAULT);
- }
}
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 69a7b683..1d3b2646 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -18,514 +18,21 @@
package accord.burn;
-import java.time.Duration;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.IntSupplier;
import java.util.function.LongSupplier;
-import java.util.function.Supplier;
-import java.util.stream.IntStream;
-import java.util.zip.CRC32;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import accord.api.Key;
-import accord.burn.random.FrequentLargeRange;
-import accord.impl.MessageListener;
-import accord.impl.PrefixedIntHashKey;
-import accord.impl.TopologyFactory;
-import accord.impl.basic.Cluster;
-import accord.impl.basic.Cluster.Stats;
-import accord.impl.basic.Packet;
-import accord.impl.basic.PendingQueue;
-import accord.impl.basic.PendingRunnable;
-import accord.impl.basic.MonitoredPendingQueue;
-import accord.impl.basic.RandomDelayQueue;
-import accord.impl.basic.RandomDelayQueue.Factory;
-import accord.impl.basic.SimulatedDelayedExecutorService;
-import accord.impl.list.ListAgent;
-import accord.impl.list.ListQuery;
-import accord.impl.list.ListRead;
-import accord.impl.list.ListRequest;
-import accord.impl.list.ListResult;
-import accord.impl.list.ListUpdate;
-import accord.local.CommandStore;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.messages.MessageType;
-import accord.messages.Reply;
-import accord.primitives.Keys;
-import accord.primitives.Range;
-import accord.primitives.Ranges;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.topology.Shard;
-import accord.topology.Topology;
import accord.utils.DefaultRandom;
-import accord.utils.Gen;
-import accord.utils.Gens;
-import accord.utils.RandomSource;
-import accord.utils.Utils;
-import accord.utils.async.AsyncExecutor;
-import accord.utils.async.TimeoutUtils;
-import accord.verify.CompositeVerifier;
-import accord.verify.ElleVerifier;
-import accord.verify.StrictSerializabilityVerifier;
-import accord.verify.Verifier;
-import org.agrona.collections.Int2ObjectHashMap;
-import org.agrona.collections.IntHashSet;
-import static accord.impl.PrefixedIntHashKey.forHash;
-import static accord.impl.PrefixedIntHashKey.range;
-import static accord.impl.PrefixedIntHashKey.ranges;
-import static accord.primitives.Txn.Kind.EphemeralRead;
import static accord.utils.Invariants.illegalArgument;
-import static accord.utils.Utils.toArray;
-public class BurnTest
+public class BurnTest extends BurnTestBase
{
- private static final Logger logger =
LoggerFactory.getLogger(BurnTest.class);
-
- /**
- * Min hash value for the test domain, this value must be respected by the
hash function
- * @see {@link BurnTest#hash(int)}
- */
- public static final int HASH_RANGE_START = 0;
- /**
- * Max hash value for the test domain, this value must be respected by the
hash function
- * @see {@link BurnTest#hash(int)}
- */
- public static final int HASH_RANGE_END = 1 << 16;
- private static final Range[] EMPTY_RANGES = new Range[0];
-
- static List<Packet> generate(RandomSource random, MessageListener
listener, Function<? super CommandStore, AsyncExecutor> executor, List<Id>
clients, List<Id> nodes, int[] keys, int operations)
- {
- List<Packet> packets = new ArrayList<>();
- Int2ObjectHashMap<int[]> prefixKeyUpdates = new Int2ObjectHashMap<>();
- double readInCommandStore = random.nextDouble();
- Function<int[], Range> nextRange = randomRanges(random);
-
- for (int count = 0 ; count < operations ; ++count)
- {
- int finalCount = count;
- Id client = clients.get(random.nextInt(clients.size()));
- Id node = nodes.get(random.nextInt(nodes.size()));
-
- boolean isRangeQuery = random.nextBoolean();
- String description;
- Function<Node, Txn> txnGenerator;
- if (isRangeQuery)
- {
- description = "range";
- txnGenerator = n -> {
- int[] prefixes = prefixes(n.topology().current());
-
- int rangeCount = 1 + random.nextInt(2);
- List<Range> requestRanges = new ArrayList<>();
- while (--rangeCount >= 0)
- requestRanges.add(nextRange.apply(prefixes));
- Ranges ranges =
Ranges.of(requestRanges.toArray(EMPTY_RANGES));
- ListRead read = new
ListRead(random.decide(readInCommandStore) ? Function.identity() : executor,
false, ranges, ranges);
- ListQuery query = new ListQuery(client, finalCount, false);
- return new Txn.InMemory(ranges, read, query);
- };
- }
- else
- {
- description = "key";
- txnGenerator = n -> {
- int[] prefixes = prefixes(n.topology().current());
-
- boolean isWrite = random.nextBoolean();
- int readCount = 1 + random.nextInt(2);
- int writeCount = isWrite ? 1 + random.nextInt(2) : 0;
- Txn.Kind kind = isWrite ? Txn.Kind.Write : readCount == 1
? EphemeralRead : Txn.Kind.Read;
-
- TreeSet<Key> requestKeys = new TreeSet<>();
- IntHashSet readValues = new IntHashSet();
- while (readCount-- > 0)
- requestKeys.add(randomKey(random, prefixes, keys,
readValues));
-
- ListUpdate update = isWrite ? new ListUpdate(executor) :
null;
- IntHashSet writeValues = isWrite ? new IntHashSet() : null;
- while (writeCount-- > 0)
- {
- PrefixedIntHashKey.Key key = randomKey(random,
prefixes, keys, writeValues);
- int i = Arrays.binarySearch(keys, key.key);
- int[] keyUpdateCounter =
prefixKeyUpdates.computeIfAbsent(key.prefix, ignore -> new int[keys.length]);
- update.put(key, ++keyUpdateCounter[i]);
- }
-
- Keys readKeys = new Keys(requestKeys);
- if (isWrite)
- requestKeys.addAll(update.keySet());
- ListRead read = new
ListRead(random.decide(readInCommandStore) ? Function.identity() : executor,
kind == EphemeralRead, readKeys, new Keys(requestKeys));
- ListQuery query = new ListQuery(client, finalCount, kind
== EphemeralRead);
- return new Txn.InMemory(kind, new Keys(requestKeys), read,
query, update);
- };
- }
- packets.add(new Packet(client, node, Long.MAX_VALUE, count, new
ListRequest(description, txnGenerator, listener)));
- }
-
- return packets;
- }
-
- private static int[] prefixes(Topology topology)
- {
- IntHashSet uniq = new IntHashSet();
- for (Shard shard : topology.shards())
- uniq.add(((PrefixedIntHashKey) shard.range.start()).prefix);
- int[] prefixes = new int[uniq.size()];
- IntHashSet.IntIterator it = uniq.iterator();
- for (int i = 0; it.hasNext(); i++)
- prefixes[i] = it.nextValue();
- Arrays.sort(prefixes);
- return prefixes;
- }
-
- private static Function<int[], Range> randomRanges(RandomSource rs)
- {
- int selection = rs.nextInt(0, 2);
- switch (selection)
- {
- case 0: // uniform
- return (prefixes) -> randomRange(rs, prefixes, () ->
rs.nextInt(0, 1 << 13) + 1);
- case 1: // zipf
- int domain = HASH_RANGE_END - HASH_RANGE_START + 1;
- int splitSize = 100;
- int interval = domain / splitSize;
- int[] splits = new int[6];
- for (int i = 0; i < splits.length; i++)
- splits[i] = i == 0 ? interval : splits[i - 1] * 2;
- int[] splitsToPick = splits;
- int bias = rs.nextInt(0, 3); // small, large, random
- if (bias != 0)
- splitsToPick = Arrays.copyOf(splits, splits.length);
- if (bias == 1)
- Utils.reverse(splitsToPick);
- else if (bias == 2)
- Utils.shuffle(splitsToPick, rs);
- Gen.IntGen zipf = Gens.pickZipf(splitsToPick);
- return (prefixes) -> randomRange(rs, prefixes, () -> {
- int value = zipf.nextInt(rs);
- int idx = Arrays.binarySearch(splits, value);
- int min = idx == 0 ? 0 : splits[idx - 1];
- return rs.nextInt(min, value) + 1;
- });
- default:
- throw new AssertionError("Unexpected value: " + selection);
- }
- }
-
- private static Range randomRange(RandomSource random, int[] prefixes,
IntSupplier rangeSizeFn)
- {
- int prefix = random.pickInt(prefixes);
- int i = random.nextInt(HASH_RANGE_START, HASH_RANGE_END);
- int rangeSize = rangeSizeFn.getAsInt();
- int j = i + rangeSize;
- if (j > HASH_RANGE_END)
- {
- int delta = j - HASH_RANGE_END;
- j = HASH_RANGE_END;
- i -= delta;
- // saftey check, this shouldn't happen unless the configs were
changed in an unsafe way
- if (i < HASH_RANGE_START)
- i = HASH_RANGE_START;
- }
- return range(forHash(prefix, i), forHash(prefix, j));
- }
-
- private static PrefixedIntHashKey.Key randomKey(RandomSource random, int[]
prefixes, int[] keys, Set<Integer> seen)
- {
- int prefix = random.pickInt(prefixes);
- int key;
- do
- {
- key = random.pickInt(keys);
- }
- while (!seen.add(key));
- return PrefixedIntHashKey.key(prefix, key, hash(key));
- }
-
- /**
- * This class uses a limited range than the default for the following
reasons:
- *
- * 1) easier to debug smaller numbers
- * 2) adds hash collisions (multiple keys map to the same hash)
- */
- private static int hash(int key)
- {
- CRC32 crc32c = new CRC32();
- crc32c.update(key);
- crc32c.update(key >> 8);
- crc32c.update(key >> 16);
- crc32c.update(key >> 24);
- return (int) crc32c.getValue() & 0xffff;
- }
-
- @SuppressWarnings("unused")
- static void reconcile(long seed, TopologyFactory topologyFactory, List<Id>
clients, List<Id> nodes, int keyCount, int prefixCount, int operations, int
concurrency) throws ExecutionException, InterruptedException
- {
- RandomSource random1 = new DefaultRandom(), random2 = new
DefaultRandom();
-
- random1.setSeed(seed);
- random2.setSeed(seed);
- ExecutorService exec = Executors.newFixedThreadPool(2);
- RandomDelayQueue.ReconcilingQueueFactory factory = new
RandomDelayQueue.ReconcilingQueueFactory(seed);
- Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory,
clients, nodes, keyCount, prefixCount, operations, concurrency,
factory.get(true)));
- Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory,
clients, nodes, keyCount, prefixCount, operations, concurrency,
factory.get(false)));
- exec.shutdown();
- f1.get();
- f2.get();
- }
-
- static void burn(RandomSource random, TopologyFactory topologyFactory,
List<Id> clients, List<Id> nodes, int keyCount, int prefixCount, int
operations, int concurrency, PendingQueue pendingQueue)
- {
- List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
- AtomicLong progress = new AtomicLong();
- MonitoredPendingQueue queue = new MonitoredPendingQueue(failures,
progress, 5L, TimeUnit.MINUTES, pendingQueue);
- long startNanos = System.nanoTime();
- long startLogicalMillis = queue.nowInMillis();
- Consumer<Runnable> retryBootstrap;
- {
- RandomSource retryRandom = random.fork();
- retryBootstrap = retry -> {
- long delay = retryRandom.nextInt(1, 15);
- queue.add(PendingRunnable.create(retry::run), delay,
TimeUnit.SECONDS);
- };
- }
- IntSupplier coordinationDelays, progressDelays, timeoutDelays;
- {
- RandomSource rnd = random.fork();
- coordinationDelays = delayGenerator(rnd, 1, 100, 100, 1000);
- progressDelays = delayGenerator(rnd, 1, 100, 100, 1000);
- timeoutDelays = delayGenerator(rnd, 500, 800, 1000, 10000);
- }
- Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier =
onStale -> new ListAgent(random.fork(), 1000L, failures::add, retryBootstrap,
onStale, coordinationDelays, progressDelays, timeoutDelays,
pendingQueue::nowInMillis);
-
- Supplier<LongSupplier> nowSupplier = () -> {
- RandomSource forked = random.fork();
- // TODO (expected): meta-randomise scale of clock drift
- return FrequentLargeRange.builder(forked)
- .ratio(1, 5)
- .small(50, 5000,
TimeUnit.MICROSECONDS)
- .large(1, 10,
TimeUnit.MILLISECONDS)
- .build()
- .mapAsLong(j -> Math.max(0,
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
- .asLongSupplier(forked);
- };
-
- SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, new ListAgent(random.fork(), 1000L,
failures::add, retryBootstrap, (i1, i2) -> {
- throw new IllegalAccessError("Global executor should enver get a
stale event");
- }, coordinationDelays, progressDelays, timeoutDelays,
queue::nowInMillis));
- Verifier verifier = createVerifier(keyCount * prefixCount);
- Function<CommandStore, AsyncExecutor> executor = ignore ->
globalExecutor;
-
- MessageListener listener = MessageListener.get();
-
- int[] prefixes = IntStream.concat(IntStream.of(0),
IntStream.generate(() ->
random.nextInt(1024))).distinct().limit(prefixCount).toArray();
- int[] newPrefixes = Arrays.copyOfRange(prefixes, 1, prefixes.length);
- Arrays.sort(prefixes);
-
- int[] keys = IntStream.range(0, keyCount).toArray();
- Packet[] requests = toArray(generate(random, listener, executor,
clients, nodes, keys, operations), Packet[]::new);
- int[] starts = new int[requests.length];
- Packet[] replies = new Packet[requests.length];
-
- AtomicInteger acks = new AtomicInteger();
- AtomicInteger nacks = new AtomicInteger();
- AtomicInteger lost = new AtomicInteger();
- AtomicInteger truncated = new AtomicInteger();
- AtomicInteger recovered = new AtomicInteger();
- AtomicInteger failedToCheck = new AtomicInteger();
- AtomicInteger clock = new AtomicInteger();
- AtomicInteger requestIndex = new AtomicInteger();
- Queue<Packet> initialRequests = new ArrayDeque<>();
- for (int max = Math.min(concurrency, requests.length) ;
requestIndex.get() < max ; )
- {
- int i = requestIndex.getAndIncrement();
- starts[i] = clock.incrementAndGet();
- initialRequests.add(requests[i]);
- }
-
- // not used for atomicity, just for encapsulation
- AtomicReference<Runnable> onSubmitted = new AtomicReference<>();
- Consumer<Packet> responseSink = packet -> {
- if (replies[(int)packet.replyId] != null)
- return;
-
- if (requestIndex.get() < requests.length)
- {
- int i = requestIndex.getAndIncrement();
- starts[i] = clock.incrementAndGet();
- queue.addNoDelay(requests[i]);
- if (i == requests.length - 1)
- onSubmitted.get().run();
- }
- if (packet.message instanceof Reply.FailureReply)
- {
- failures.add(new AssertionError("Unexpected failure in list
reply", ((Reply.FailureReply) packet.message).failure));
- return;
- }
- ListResult reply = (ListResult) packet.message;
-
- try
- {
- if (!reply.isSuccess() && reply.status() ==
ListResult.Status.HeartBeat)
- return; // interrupted; will fetch our actual reply once
rest of simulation is finished (but wanted to send another request to keep
correct number in flight)
-
- int start = starts[(int)packet.replyId];
- int end = clock.incrementAndGet();
- logger.debug("{} at [{}, {}]", reply, start, end);
- replies[(int)packet.replyId] = packet;
-
- if (!reply.isSuccess())
- {
- switch (reply.status())
- {
- case Lost: lost.incrementAndGet();
break;
- case Invalidated: nacks.incrementAndGet();
break;
- case Failure: failedToCheck.incrementAndGet();
break;
- case Truncated: truncated.incrementAndGet();
break;
- // txn was applied?, but client saw a timeout, so
response isn't known
- case Other:
break;
- default: throw new
AssertionError("Unexpected fault: " + reply.status());
- }
- return;
- }
-
- progress.incrementAndGet();
- switch (reply.status())
- {
- default: throw new AssertionError("Unhandled status: " +
reply.status());
- case Applied: acks.incrementAndGet(); break;
- case RecoveryApplied: recovered.incrementAndGet(); //
NOTE: technically this might have been applied by the coordinator and it simply
timed out
- }
- // TODO (correctness): when a keyspace is removed, the
history/validator isn't cleaned up...
- // the current logic for add keyspace only knows what is
there, so a ABA problem exists where keyspaces
- // may come back... logically this is a problem as the history
doesn't get reset, but practically that
- // is fine as the backing map and the validator are consistent
- Verifier.Checker check = verifier.witness(start, end);
- for (int i = 0 ; i < reply.read.length ; ++i)
- {
- Key key = reply.responseKeys.get(i);
- int prefix = prefix(key);
- int keyValue = key(key);
- int k = Arrays.binarySearch(keys, keyValue);
-
- int[] read = reply.read[i];
- int write = reply.update == null ? -1 :
reply.update.getOrDefault(key, -1);
-
- int prefixIndex = Arrays.binarySearch(prefixes, prefix);
- int index = keyCount * prefixIndex + k;
- if (read != null)
- check.read(index, read);
- if (write >= 0)
- check.write(index, write);
- }
- check.close();
- }
- catch (Throwable t)
- {
- failures.add(t);
- }
- };
-
- Map<MessageType, Stats> messageStatsMap;
- try
- {
- messageStatsMap = Cluster.run(toArray(nodes, Id[]::new),
newPrefixes, listener, () -> queue,
- (id, onStale) ->
globalExecutor.withAgent(agentSupplier.apply(onStale)),
- queue::checkFailures,
- responseSink, random::fork,
nowSupplier,
- topologyFactory,
initialRequests::poll,
- onSubmitted::set,
- ignore -> {}
- );
- verifier.close();
- }
- catch (Throwable t)
- {
- logger.info("Keys: " + Arrays.toString(keys));
- logger.info("Prefixes: " + Arrays.toString(prefixes));
- for (int i = 0 ; i < requests.length ; ++i)
- {
- logger.info("{}", requests[i]);
- logger.info("\t\t" + replies[i]);
- }
- throw t;
- }
-
- int observedOperations = acks.get() + recovered.get() + nacks.get() +
lost.get() + truncated.get();
- logger.info("nodes: {}, rf: {}. Received {} acks, {} recovered, {}
nacks, {} lost, {} truncated ({} total) to {} operations", nodes.size(),
topologyFactory.rf, acks.get(), recovered.get(), nacks.get(), lost.get(),
truncated.get(), observedOperations, operations);
- logger.info("Message counts: {}", statsInDescOrder(messageStatsMap));
- logger.info("Took {} and in logical time of {}",
Duration.ofNanos(System.nanoTime() - startNanos),
Duration.ofMillis(queue.nowInMillis() - startLogicalMillis));
- if (clock.get() != operations * 2 || observedOperations != operations)
- {
- StringBuilder sb = new StringBuilder();
- for (int i = 0 ; i < requests.length ; ++i)
- {
- // since this only happens when operations are lost, only log
the ones without a reply to lower the amount of noise
- if (replies[i] == null)
- {
- sb.setLength(0);
- sb.append(requests[i]).append("\n\t\t").append(replies[i]);
- logger.info(sb.toString());
- }
- }
- if (clock.get() != operations * 2) throw new
AssertionError("Incomplete set of responses; clock=" + clock.get() + ",
expected operations=" + (operations * 2));
- else throw new AssertionError("Incomplete set of responses;
ack+recovered+other+nacks+lost+truncated=" + observedOperations + ", expected
operations=" + (operations * 2));
- }
- }
-
- private static IntSupplier delayGenerator(RandomSource rnd, int
absoluteMin, int absoluteMaxMin, int absoluteMinMax, int asoluteMax)
- {
- int minDelay = rnd.nextInt(absoluteMin, absoluteMaxMin);
- int maxDelay = rnd.nextInt(Math.max(absoluteMinMax, minDelay),
asoluteMax);
- if (rnd.nextBoolean())
- {
- int medianDelay = rnd.nextInt(minDelay, maxDelay);
- return () -> rnd.nextBiasedInt(minDelay, medianDelay, maxDelay);
- }
- return () -> rnd.nextInt(minDelay, maxDelay);
- }
-
- private static String statsInDescOrder(Map<MessageType, Stats> statsMap)
- {
- List<Stats> stats = new ArrayList<>(statsMap.values());
- stats.sort(Comparator.comparingInt(s -> -s.count()));
- return stats.toString();
- }
-
- private static Verifier createVerifier(int keyCount)
+ @Test
+ public void testOne()
{
- if (!ElleVerifier.Support.allowed())
- return new StrictSerializabilityVerifier("", keyCount);
- return CompositeVerifier.create(new StrictSerializabilityVerifier("",
keyCount),
- new ElleVerifier());
+ run(System.nanoTime());
}
public static void main(String[] args)
@@ -575,118 +82,4 @@ public class BurnTest
}
}
}
-
- @Test
- public void testOne()
- {
- run(System.nanoTime());
- }
-
- private static void run(long seed)
- {
- Duration timeout = Duration.ofMinutes(3);
- try
- {
- TimeoutUtils.runBlocking(timeout, "BurnTest with timeout", () ->
run(seed, 1000));
- }
- catch (Throwable thrown)
- {
- Throwable cause = thrown;
- if (cause instanceof ExecutionException)
- cause = cause.getCause();
- if (cause instanceof TimeoutException)
- {
- TimeoutException override = new TimeoutException("test did not
complete within " + timeout);
- override.setStackTrace(new StackTraceElement[0]);
- cause = override;
- }
- logger.error("Exception running burn test for seed {}:", seed,
cause);
- throw SimulationException.wrap(seed, cause);
- }
- }
-
- private static void run(long seed, int operations)
- {
- logger.info("Seed: {}", seed);
- Cluster.trace.trace("Seed: {}", seed);
- RandomSource random = new DefaultRandom(seed);
- try
- {
- List<Id> clients = generateIds(true, 1 + random.nextInt(4));
- int rf;
- float chance = random.nextFloat();
- if (chance < 0.2f) { rf = random.nextInt(2, 9); }
- else if (chance < 0.4f) { rf = 3; }
- else if (chance < 0.7f) { rf = 5; }
- else if (chance < 0.8f) { rf = 7; }
- else { rf = 9; }
-
- List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
-
- burn(random, new TopologyFactory(rf, ranges(0, HASH_RANGE_START,
HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() *
3))),
- clients,
- nodes,
- 5 + random.nextInt(15),
- 5 + random.nextInt(15),
- operations,
- 10 + random.nextInt(30),
- new Factory(random).get());
- }
- catch (Throwable t)
- {
- logger.error("Exception running burn test for seed {}:", seed, t);
- throw SimulationException.wrap(seed, t);
- }
- }
-
- private static void reconcile(long seed, int operations)
- {
- logger.info("Seed: {}", seed);
- Cluster.trace.trace("Seed: {}", seed);
- RandomSource random = new DefaultRandom(seed);
- try
- {
- List<Id> clients = generateIds(true, 1 + random.nextInt(4));
- int rf;
- float chance = random.nextFloat();
- if (chance < 0.2f) { rf = random.nextInt(2, 9); }
- else if (chance < 0.4f) { rf = 3; }
- else if (chance < 0.7f) { rf = 5; }
- else if (chance < 0.8f) { rf = 7; }
- else { rf = 9; }
-
- List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
-
- reconcile(seed, new TopologyFactory(rf, ranges(0,
HASH_RANGE_START, HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1,
rf), nodes.size() * 3))),
- clients,
- nodes,
- 5 + random.nextInt(15),
- 5 + random.nextInt(15),
- operations,
- 10 + random.nextInt(30));
- }
- catch (Throwable t)
- {
- logger.error("Exception running burn test for seed {}:", seed, t);
- throw SimulationException.wrap(seed, t);
- }
- }
-
- private static List<Id> generateIds(boolean clients, int count)
- {
- List<Id> ids = new ArrayList<>();
- for (int i = 1; i <= count ; ++i)
- ids.add(new Id(clients ? -i : i));
- return ids;
- }
-
- private static int key(Key key)
- {
- return ((PrefixedIntHashKey) key).key;
- }
-
- private static int prefix(Key key)
- {
- return ((PrefixedIntHashKey) key).prefix;
- }
}
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java
b/accord-core/src/test/java/accord/burn/BurnTestBase.java
similarity index 87%
copy from accord-core/src/test/java/accord/burn/BurnTest.java
copy to accord-core/src/test/java/accord/burn/BurnTestBase.java
index 69a7b683..dd6c14bf 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestBase.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,10 +47,10 @@ import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.zip.CRC32;
-import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.Journal;
import accord.api.Key;
import accord.burn.random.FrequentLargeRange;
import accord.impl.MessageListener;
@@ -59,10 +58,11 @@ import accord.impl.PrefixedIntHashKey;
import accord.impl.TopologyFactory;
import accord.impl.basic.Cluster;
import accord.impl.basic.Cluster.Stats;
+import accord.impl.basic.InMemoryJournal;
+import accord.impl.basic.MonitoredPendingQueue;
import accord.impl.basic.Packet;
import accord.impl.basic.PendingQueue;
import accord.impl.basic.PendingRunnable;
-import accord.impl.basic.MonitoredPendingQueue;
import accord.impl.basic.RandomDelayQueue;
import accord.impl.basic.RandomDelayQueue.Factory;
import accord.impl.basic.SimulatedDelayedExecutorService;
@@ -91,8 +91,6 @@ import accord.utils.RandomSource;
import accord.utils.Utils;
import accord.utils.async.AsyncExecutor;
import accord.utils.async.TimeoutUtils;
-import accord.verify.CompositeVerifier;
-import accord.verify.ElleVerifier;
import accord.verify.StrictSerializabilityVerifier;
import accord.verify.Verifier;
import org.agrona.collections.Int2ObjectHashMap;
@@ -102,12 +100,11 @@ import static accord.impl.PrefixedIntHashKey.forHash;
import static accord.impl.PrefixedIntHashKey.range;
import static accord.impl.PrefixedIntHashKey.ranges;
import static accord.primitives.Txn.Kind.EphemeralRead;
-import static accord.utils.Invariants.illegalArgument;
import static accord.utils.Utils.toArray;
-public class BurnTest
+public class BurnTestBase
{
- private static final Logger logger =
LoggerFactory.getLogger(BurnTest.class);
+ private static final Logger logger =
LoggerFactory.getLogger(BurnTestBase.class);
/**
* Min hash value for the test domain, this value must be respected by the
hash function
@@ -295,14 +292,14 @@ public class BurnTest
random2.setSeed(seed);
ExecutorService exec = Executors.newFixedThreadPool(2);
RandomDelayQueue.ReconcilingQueueFactory factory = new
RandomDelayQueue.ReconcilingQueueFactory(seed);
- Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory,
clients, nodes, keyCount, prefixCount, operations, concurrency,
factory.get(true)));
- Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory,
clients, nodes, keyCount, prefixCount, operations, concurrency,
factory.get(false)));
+ Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory,
clients, nodes, keyCount, prefixCount, operations, concurrency,
factory.get(true), InMemoryJournal::new));
+ Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory,
clients, nodes, keyCount, prefixCount, operations, concurrency,
factory.get(false), InMemoryJournal::new));
exec.shutdown();
f1.get();
f2.get();
}
- static void burn(RandomSource random, TopologyFactory topologyFactory,
List<Id> clients, List<Id> nodes, int keyCount, int prefixCount, int
operations, int concurrency, PendingQueue pendingQueue)
+ public static void burn(RandomSource random, TopologyFactory
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int
prefixCount, int operations, int concurrency, PendingQueue pendingQueue,
Function<Id, Journal> journalFactory)
{
List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
AtomicLong progress = new AtomicLong();
@@ -330,12 +327,12 @@ public class BurnTest
RandomSource forked = random.fork();
// TODO (expected): meta-randomise scale of clock drift
return FrequentLargeRange.builder(forked)
- .ratio(1, 5)
- .small(50, 5000,
TimeUnit.MICROSECONDS)
- .large(1, 10,
TimeUnit.MILLISECONDS)
- .build()
- .mapAsLong(j -> Math.max(0,
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
- .asLongSupplier(forked);
+ .ratio(1, 5)
+ .small(50, 5000, TimeUnit.MICROSECONDS)
+ .large(1, 10, TimeUnit.MILLISECONDS)
+ .build()
+ .mapAsLong(j -> Math.max(0,
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
+ .asLongSupplier(forked);
};
SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, new ListAgent(random.fork(), 1000L,
failures::add, retryBootstrap, (i1, i2) -> {
@@ -463,8 +460,8 @@ public class BurnTest
responseSink, random::fork,
nowSupplier,
topologyFactory,
initialRequests::poll,
onSubmitted::set,
- ignore -> {}
- );
+ ignore -> {},
+ journalFactory);
verifier.close();
}
catch (Throwable t)
@@ -520,69 +517,12 @@ public class BurnTest
return stats.toString();
}
- private static Verifier createVerifier(int keyCount)
- {
- if (!ElleVerifier.Support.allowed())
- return new StrictSerializabilityVerifier("", keyCount);
- return CompositeVerifier.create(new StrictSerializabilityVerifier("",
keyCount),
- new ElleVerifier());
- }
-
- public static void main(String[] args)
- {
- int count = 1;
- int operations = 1000;
- Long overrideSeed = null;
- boolean reconcile = false;
- LongSupplier seedGenerator = ThreadLocalRandom.current()::nextLong;
- boolean hasOverriddenSeed = false;
- for (int i = 0 ; i < args.length ; i += 2)
- {
- switch (args[i])
- {
- default: throw illegalArgument("Invalid option: " + args[i]);
- case "-c":
- count = Integer.parseInt(args[i + 1]);
- if (hasOverriddenSeed)
- throw illegalArgument("Cannot override both seed (-s)
and number of seeds to run (-c)");
- overrideSeed = null;
- break;
- case "-s":
- overrideSeed = Long.parseLong(args[i + 1]);
- hasOverriddenSeed = true;
- count = 1;
- break;
- case "-o":
- operations = Integer.parseInt(args[i + 1]);
- break;
- case "-r":
- reconcile = true;
- --i;
- break;
- case "--loop-seed":
- seedGenerator = new DefaultRandom(Long.parseLong(args[i +
1]))::nextLong;
- }
- }
- while (count-- > 0)
- {
- if (!reconcile)
- {
- run(overrideSeed != null ? overrideSeed :
seedGenerator.getAsLong(), operations);
- }
- else
- {
- reconcile(overrideSeed != null ? overrideSeed :
seedGenerator.getAsLong(), operations);
- }
- }
- }
-
- @Test
- public void testOne()
+ protected static Verifier createVerifier(int keyCount)
{
- run(System.nanoTime());
+ return new StrictSerializabilityVerifier("", keyCount);
}
- private static void run(long seed)
+ protected static void run(long seed)
{
Duration timeout = Duration.ofMinutes(3);
try
@@ -605,7 +545,7 @@ public class BurnTest
}
}
- private static void run(long seed, int operations)
+ protected static void run(long seed, int operations)
{
logger.info("Seed: {}", seed);
Cluster.trace.trace("Seed: {}", seed);
@@ -624,13 +564,14 @@ public class BurnTest
List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
burn(random, new TopologyFactory(rf, ranges(0, HASH_RANGE_START,
HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() *
3))),
- clients,
- nodes,
- 5 + random.nextInt(15),
- 5 + random.nextInt(15),
- operations,
- 10 + random.nextInt(30),
- new Factory(random).get());
+ clients,
+ nodes,
+ 5 + random.nextInt(15),
+ 5 + random.nextInt(15),
+ operations,
+ 10 + random.nextInt(30),
+ new Factory(random).get(),
+ InMemoryJournal::new);
}
catch (Throwable t)
{
@@ -639,7 +580,7 @@ public class BurnTest
}
}
- private static void reconcile(long seed, int operations)
+ protected static void reconcile(long seed, int operations)
{
logger.info("Seed: {}", seed);
Cluster.trace.trace("Seed: {}", seed);
@@ -658,12 +599,12 @@ public class BurnTest
List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
reconcile(seed, new TopologyFactory(rf, ranges(0,
HASH_RANGE_START, HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1,
rf), nodes.size() * 3))),
- clients,
- nodes,
- 5 + random.nextInt(15),
- 5 + random.nextInt(15),
- operations,
- 10 + random.nextInt(30));
+ clients,
+ nodes,
+ 5 + random.nextInt(15),
+ 5 + random.nextInt(15),
+ operations,
+ 10 + random.nextInt(30));
}
catch (Throwable t)
{
@@ -672,7 +613,7 @@ public class BurnTest
}
}
- private static List<Id> generateIds(boolean clients, int count)
+ public static List<Id> generateIds(boolean clients, int count)
{
List<Id> ids = new ArrayList<>();
for (int i = 1; i <= count ; ++i)
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java
b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 0585204e..1712b960 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -111,7 +111,7 @@ public abstract class IntHashKey implements RoutableKey
public static final class Key extends IntHashKey implements accord.api.Key
{
- private Key(int key)
+ public Key(int key)
{
super(key);
}
diff --git a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
b/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
index a6a8bc84..575ed6d1 100644
--- a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
@@ -157,7 +157,7 @@ public class PrefixedIntHashKey implements RoutableKey
public static final class Hash extends PrefixedIntRoutingKey
{
- private Hash(int prefix, int hash)
+ public Hash(int prefix, int hash)
{
super(prefix, hash);
}
diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
index 7f1f81b8..9ea735c4 100644
--- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
import accord.api.Agent;
import accord.api.DataStore;
+import accord.api.Journal;
import accord.api.ProgressLog;
import accord.api.RemoteListeners;
import accord.api.RemoteListeners.Registration;
@@ -394,6 +395,12 @@ public class RemoteListenersTest
this.storeId = id;
}
+ @Override
+ public Journal.Loader loader()
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override public boolean inStore() { return false; }
@Override public AsyncChain<Void> execute(PreLoadContext context,
Consumer<? super SafeCommandStore> consumer) { return null; }
@Override public <T> AsyncChain<T> submit(PreLoadContext context,
Function<? super SafeCommandStore, T> apply) { return null; }
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 4bccbd5e..93b792e0 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -46,11 +46,11 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
-import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.BarrierType;
+import accord.api.Journal;
import accord.api.LocalConfig;
import accord.api.MessageSink;
import accord.api.RoutingKey;
@@ -82,6 +82,7 @@ import accord.impl.progresslog.DefaultProgressLogs;
import accord.local.AgentExecutor;
import accord.local.Command;
import accord.local.CommandStore;
+import accord.local.CommandStores;
import accord.local.DurableBefore;
import accord.local.Node;
import accord.local.Node.Id;
@@ -450,7 +451,8 @@ public class Cluster
}
},
Runnable::run,
- nodeMap::set);
+ nodeMap::set,
+
InMemoryJournal::new);
if (!failures.isEmpty())
{
AssertionError error = new AssertionError("Unexpected errors
detected");
@@ -465,7 +467,7 @@ public class Cluster
Runnable checkFailures,
Consumer<Packet> responseSink,
Supplier<RandomSource>
randomSupplier, Supplier<LongSupplier> nowSupplierSupplier,
TopologyFactory topologyFactory,
Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal,
- Consumer<Map<Id, Node>>
readySignal)
+ Consumer<Map<Id, Node>>
readySignal, Function<Node.Id, Journal> journalFactory)
{
Topology topology = topologyFactory.toTopology(nodes);
Map<Id, Node> nodeMap = new LinkedHashMap<>();
@@ -523,7 +525,7 @@ public class Cluster
BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges)
-> configRandomizer.onStale(id, sinceAtLeast, ranges);
AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id,
onStale);
executorMap.put(id, nodeExecutor);
- Journal journal = new Journal(id);
+ Journal journal = journalFactory.apply(id);
journalMap.put(id, journal);
BurnTestConfigurationService configService = new
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology,
nodeMap::get, topologyUpdates);
DelayedCommandStores.CacheLoadingChance isLoadedCheck = new
DelayedCommandStores.CacheLoadingChance()
@@ -596,7 +598,7 @@ public class Cluster
AsyncResult<?> startup =
AsyncChains.reduce(nodeMap.values().stream().map(Node::unsafeStart).collect(toList()),
(a, b) -> null).beginAsResult();
while (sinks.processPending());
- Assertions.assertTrue(startup.isDone());
+ Invariants.checkArgument(startup.isDone());
ClusterScheduler clusterScheduler = sinks.new ClusterScheduler(-1);
List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
@@ -626,14 +628,14 @@ public class Cluster
listStore.restoreFromSnapshot();
// we are simulating node restart, so its remote listeners
will also be gone
((DefaultRemoteListeners)nodeMap.get(id).remoteListeners()).clear();
- Journal journal = journalMap.get(id);
Int2ObjectHashMap<NavigableMap<TxnId, Command>>
beforeStores = copyCommands(stores);
for (CommandStore s : stores)
{
DelayedCommandStores.DelayedCommandStore store =
(DelayedCommandStores.DelayedCommandStore) s;
store.clearForTesting();
- journal.reconstructAll(store.loader(), store.id());
}
+ Journal journal = journalMap.get(id);
+ journal.replay(nodeMap.get(id).commandStores());
while (sinks.drain(pred));
CommandsForKey.enableLinearizabilityViolationsReporting();
Invariants.checkState(listStore.equals(prevData));
@@ -713,10 +715,10 @@ public class Cluster
Node node = nodeMap.get(id);
Journal journal = journalMap.get(node.id());
- CommandStore[] stores =
nodeMap.get(node.id()).commandStores().all();
+ CommandStores stores = nodeMap.get(node.id()).commandStores();
// run on node scheduler so doesn't run during replay
scheduled = node.scheduler().selfRecurring(() -> {
- journal.purge(j -> j < stores.length ? stores[j] : null);
+ journal.purge(stores);
schedule(clusterScheduler, rs, nodes, nodeMap, journalMap);
}, 0, SECONDS);
}
diff --git
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index e3f7d8be..17afe245 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
@@ -33,6 +32,7 @@ import com.google.common.collect.Iterables;
import accord.api.Agent;
import accord.api.DataStore;
+import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
@@ -65,6 +65,7 @@ import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.Cancellable;
+import static accord.api.Journal.CommandUpdate;
import static accord.utils.Invariants.Paranoia.LINEAR;
import static accord.utils.Invariants.ParanoiaCostFactor.HIGH;
@@ -153,7 +154,7 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
return;
// Journal will not have result persisted. This part is here for
test purposes and ensuring that we have strict object equality.
- Command reconstructed = journal.reconstruct(id, current.txnId());
+ Command reconstructed = journal.loadCommand(id, current.txnId(),
unsafeGetRedundantBefore(), durableBefore());
List<Difference<?>> diff =
ReflectionUtils.recursiveEquals(current, reconstructed);
Invariants.checkState(diff.isEmpty(), "Commands did not match:
expected %s, given %s, node %s, store %d, diff %s", current, reconstructed,
node, id(), new LazyToString(() -> String.join("\n", Iterables.transform(diff,
Object::toString))));
}
@@ -283,7 +284,7 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
Command before = safe.original();
Command after = safe.current();
- commandStore.journal.onExecute(commandStore.id(), before,
after, Objects.equals(context.primaryTxnId(), after.txnId()));
+ commandStore.journal.saveCommand(commandStore.id(), new
CommandUpdate(before, after), () -> {});
commandStore.validateRead(safe.current());
});
super.postExecute();
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
similarity index 66%
rename from accord-core/src/test/java/accord/impl/basic/Journal.java
rename to accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 2543e3a5..d48cbb1b 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -20,38 +20,41 @@ package accord.impl.basic;
import java.util.AbstractList;
import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
-import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
-import java.util.function.IntFunction;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableSortedMap;
+
+import accord.api.Journal;
import accord.api.Result;
import accord.impl.InMemoryCommandStore;
import accord.local.Cleanup;
import accord.local.Command;
import accord.local.CommandStore;
+import accord.local.CommandStores;
import accord.local.Commands;
import accord.local.CommonAttributes;
+import accord.local.DurableBefore;
import accord.local.Node;
+import accord.local.RedundantBefore;
import accord.local.StoreParticipants;
import accord.primitives.Ballot;
import accord.primitives.Known;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
import accord.primitives.SaveStatus;
import accord.primitives.Status;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.Invariants;
-import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.Int2ObjectHashMap;
import static accord.primitives.SaveStatus.NotDefined;
import static accord.primitives.SaveStatus.Stable;
@@ -59,24 +62,108 @@ import static accord.primitives.Status.Invalidated;
import static accord.primitives.Status.Truncated;
import static accord.utils.Invariants.illegalState;
-public class Journal
+public class InMemoryJournal implements Journal
{
- private final Long2ObjectHashMap<NavigableMap<TxnId, List<Diff>>>
diffsPerCommandStore = new Long2ObjectHashMap<>();
+ private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>>
diffsPerCommandStore = new Int2ObjectHashMap<>();
+ private final FieldStates fieldStates;
+
+ private static class FieldStates
+ {
+ RedundantBefore redundantBefore = RedundantBefore.EMPTY;
+ NavigableMap<TxnId, Ranges> bootstrapBeganAt =
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY);
+ NavigableMap<Timestamp, Ranges> safeToRead =
ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY);
+ CommandStores.RangesForEpoch.Snapshot rangesForEpoch = null;
+ }
private final Node.Id id;
- public Journal(Node.Id id)
+ public InMemoryJournal(Node.Id id)
{
this.id = id;
+ this.fieldStates = new FieldStates();
+ }
+
+ @Override
+ public Command loadCommand(int commandStoreId, TxnId txnId,
+ // TODO: currently unused!
+ RedundantBefore redundantBefore, DurableBefore
durableBefore)
+ {
+ NavigableMap<TxnId, List<Diff>> commandStore =
this.diffsPerCommandStore.get(commandStoreId);
+
+ if (commandStore == null)
+ return null;
+
+ List<Diff> saved =
this.diffsPerCommandStore.get(commandStoreId).get(txnId);
+ if (saved == null)
+ return null;
+
+ return reconstruct(saved);
+ }
+
+ @Override
+ public void saveCommand(int store, CommandUpdate diff, Runnable onFlush)
+ {
+ if (diff == null ||
+ diff.before == diff.after
+ || diff.after == null
+ || diff.after.saveStatus() == SaveStatus.Uninitialised)
+ return;
+
+ diffsPerCommandStore.computeIfAbsent(store, (k) -> new TreeMap<>())
+ .computeIfAbsent(diff.txnId, (k_) -> new
ArrayList<>())
+ .add(diff(diff.before, diff.after));
+
+ if (onFlush!= null)
+ onFlush.run();
+ }
+
+ @Override
+ public RedundantBefore loadRedundantBefore(int commandStoreId)
+ {
+ return fieldStates.redundantBefore;
}
- public void purge(IntFunction<CommandStore> storeSupplier)
+ @Override
+ public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
{
- for (Map.Entry<Long, NavigableMap<TxnId, List<Diff>>> e :
diffsPerCommandStore.entrySet())
+ return fieldStates.bootstrapBeganAt;
+ }
+
+ @Override
+ public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
+ {
+ return fieldStates.safeToRead;
+ }
+
+ @Override
+ public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int
commandStoreId)
+ {
+ return fieldStates.rangesForEpoch;
+ }
+
+ public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush)
+ {
+ if (fieldUpdates.newRedundantBefore != null)
+ fieldStates.redundantBefore = fieldUpdates.newRedundantBefore;
+ if (fieldUpdates.newSafeToRead != null)
+ fieldStates.safeToRead = fieldUpdates.newSafeToRead;
+ if (fieldUpdates.newBootstrapBeganAt != null)
+ fieldStates.bootstrapBeganAt = fieldUpdates.newBootstrapBeganAt;
+ if (fieldUpdates.newRangesForEpoch != null)
+ fieldStates.rangesForEpoch = fieldUpdates.newRangesForEpoch;
+
+ if (onFlush!= null)
+ onFlush.run();
+ }
+
+ @Override
+ public void purge(CommandStores commandStores)
+ {
+ for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> e :
diffsPerCommandStore.entrySet())
{
- int commandStoreId = e.getKey().intValue();
+ int commandStoreId = e.getKey();
Map<TxnId, List<Diff>> localJournal = e.getValue();
- CommandStore store = storeSupplier.apply(commandStoreId);
+ CommandStore store = commandStores.forId(commandStoreId);
if (store == null)
continue;
@@ -84,7 +171,7 @@ public class Journal
{
List<Diff> diffs = e2.getValue();
if (diffs.isEmpty()) continue;
- Command command = reconstruct(diffs, Reconstruct.Last).get(0);
+ Command command = reconstruct(diffs);
if (command.status() == Truncated || command.status() ==
Invalidated)
continue; // Already truncated
Cleanup cleanup = Cleanup.shouldCleanup(store.agent(),
command, store.unsafeGetRedundantBefore(), store.durableBefore());
@@ -110,32 +197,37 @@ public class Journal
}
}
- public void reconstructAll(InMemoryCommandStore.Loader loader, int
commandStoreId)
+ @Override
+ public void replay(CommandStores commandStores)
{
- // Nothing to do here, journal is empty for this command store
- if (!diffsPerCommandStore.containsKey(commandStoreId))
- return;
-
- // copy to avoid concurrent modification when appending to journal
- Map<TxnId, List<Diff>> diffs = new
TreeMap<>(diffsPerCommandStore.get(commandStoreId));
- for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
- e.setValue(new ArrayList<>(e.getValue()));
+ OnDone sync = new OnDone() {
+ public void success() {}
+ public void failure(Throwable t) { throw new
RuntimeException("Caught an exception during replay", t); }
+ };
- List<Command> toApply = new ArrayList<>();
- for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
+ for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> diffEntry :
diffsPerCommandStore.entrySet())
{
- if (e.getValue().isEmpty()) continue;
- Command command = reconstruct(commandStoreId, e.getKey(),
e.getValue());
- Invariants.checkState(command.saveStatus() !=
SaveStatus.Uninitialised,
- "Found uninitialized command in the log:
%s", command);
- if (command.saveStatus().compareTo(Stable) >= 0 &&
!command.hasBeen(Truncated))
- toApply.add(command);
- loader.load(command);
- }
+ int commandStoreId = diffEntry.getKey();
+ // copy to avoid concurrent modification when appending to journal
+ Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffEntry.getValue());
+
+ InMemoryCommandStore commandStore = (InMemoryCommandStore)
commandStores.forId(commandStoreId);
+ Loader loader = commandStore.loader();
- toApply.sort(Comparator.comparing(Command::executeAt));
- for (Command command : toApply)
- loader.apply(command);
+ for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
+ e.setValue(new ArrayList<>(e.getValue()));
+
+ for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
+ {
+ if (e.getValue().isEmpty()) continue;
+ Command command = reconstruct(e.getValue());
+ Invariants.checkState(command.saveStatus() !=
SaveStatus.Uninitialised,
+ "Found uninitialized command in the log:
%s %s", diffEntry.getKey(), e.getValue());
+ loader.load(command, sync);
+ if (command.saveStatus().compareTo(Stable) >= 0 &&
!command.hasBeen(Truncated))
+ loader.apply(command, sync);
+ }
+ }
}
static class ErasedList extends AbstractList<Diff>
@@ -206,28 +298,10 @@ public class Journal
}
}
- private enum Reconstruct
- {
- Each,
- Last
- }
-
- public Command reconstruct(int commandStoreId, TxnId txnId)
- {
- return reconstruct(commandStoreId, txnId,
diffsPerCommandStore.get(commandStoreId).get(txnId));
- }
-
- public Command reconstruct(int commandStoreId, TxnId txnId, List<Diff>
diffs)
- {
- return reconstruct(diffs, Reconstruct.Last).get(0);
- }
-
- private List<Command> reconstruct(List<Diff> diffs, Reconstruct
reconstruct)
+ private Command reconstruct(List<Diff> diffs)
{
Invariants.checkState(diffs != null && !diffs.isEmpty());
- List<Command> results = new ArrayList<>();
-
TxnId txnId = null;
Timestamp executeAt = null;
Timestamp executesAtLeast = null;
@@ -255,13 +329,7 @@ public class Journal
if (diff.executesAtLeast != null)
executesAtLeast = diff.executesAtLeast.get();
if (diff.saveStatus != null)
- {
- Set<SaveStatus> allowed = new HashSet<>();
- allowed.add(SaveStatus.TruncatedApply);
- allowed.add(SaveStatus.TruncatedApplyWithOutcome);
-
saveStatus = diff.saveStatus.get();
- }
if (diff.durability != null)
durability = diff.durability.get();
@@ -302,75 +370,68 @@ public class Journal
result = null;
break;
}
+ }
- CommonAttributes.Mutable attrs = new
CommonAttributes.Mutable(txnId);
- if (partialTxn != null)
- attrs.partialTxn(partialTxn);
- if (durability != null)
- attrs.durability(durability);
- if (participants != null) attrs.setParticipants(participants);
- else attrs.setParticipants(StoreParticipants.empty(txnId));
-
- // TODO (desired): we can simplify this logic if, instead of
diffing, we will infer the diff from the status
- if (partialDeps != null &&
- (saveStatus.known.deps != Known.KnownDeps.NoDeps &&
- saveStatus.known.deps != Known.KnownDeps.DepsErased &&
- saveStatus.known.deps != Known.KnownDeps.DepsUnknown))
- attrs.partialDeps(partialDeps);
- Invariants.checkState(saveStatus != null,
- "Save status is null after applying %s",
diffs);
+ CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId);
+ if (partialTxn != null)
+ attrs.partialTxn(partialTxn);
+ if (durability != null)
+ attrs.durability(durability);
+ if (participants != null) attrs.setParticipants(participants);
+ else attrs.setParticipants(StoreParticipants.empty(txnId));
+
+ // TODO (desired): we can simplify this logic if, instead of diffing,
we will infer the diff from the status
+ if (partialDeps != null &&
+ (saveStatus.known.deps != Known.KnownDeps.NoDeps &&
+ saveStatus.known.deps != Known.KnownDeps.DepsErased &&
+ saveStatus.known.deps != Known.KnownDeps.DepsUnknown))
+ attrs.partialDeps(partialDeps);
+
+ try
+ {
- try
+ Command current;
+ switch (saveStatus.status)
{
- if (reconstruct == Reconstruct.Each ||
- (reconstruct == Reconstruct.Last && i == diffs.size() - 1))
- {
- Command current;
- switch (saveStatus.status)
- {
- case NotDefined:
- current = saveStatus == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
- :
Command.NotDefined.notDefined(attrs, promised);
- break;
- case PreAccepted:
- current = Command.PreAccepted.preAccepted(attrs,
executeAt, promised);
- break;
- case AcceptedInvalidate:
- case Accepted:
- case PreCommitted:
- if (saveStatus == SaveStatus.AcceptedInvalidate)
- current =
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised,
acceptedOrCommitted);
- else
- current = Command.Accepted.accepted(attrs,
saveStatus, executeAt, promised, acceptedOrCommitted);
- break;
- case Committed:
- case Stable:
- current = Command.Committed.committed(attrs,
saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn);
- break;
- case PreApplied:
- case Applied:
- current = Command.Executed.executed(attrs,
saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes,
result);
- break;
- case Invalidated:
- case Truncated:
- current = truncated(attrs, saveStatus, executeAt,
executesAtLeast, writes, result);
- break;
- default:
- throw new IllegalStateException("Do not know " +
saveStatus.status + " " + saveStatus);
- }
-
- results.add(current);
- }
+ case NotDefined:
+ current = saveStatus == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
+ :
Command.NotDefined.notDefined(attrs, promised);
+ break;
+ case PreAccepted:
+ current = Command.PreAccepted.preAccepted(attrs,
executeAt, promised);
+ break;
+ case AcceptedInvalidate:
+ case Accepted:
+ case PreCommitted:
+ if (saveStatus == SaveStatus.AcceptedInvalidate)
+ current =
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised,
acceptedOrCommitted);
+ else
+ current = Command.Accepted.accepted(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted);
+ break;
+ case Committed:
+ case Stable:
+ current = Command.Committed.committed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn);
+ break;
+ case PreApplied:
+ case Applied:
+ current = Command.Executed.executed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
+ break;
+ case Invalidated:
+ case Truncated:
+ current = truncated(attrs, saveStatus, executeAt,
executesAtLeast, writes, result);
+ break;
+ default:
+ throw new IllegalStateException("Do not know " +
saveStatus.status + " " + saveStatus);
}
- catch (Throwable t)
- {
- throw new RuntimeException("Can not reconstruct from diff:\n"
+ diffs.stream().map(o -> o.toString())
-
.collect(Collectors.joining("\n")),
- t);
- }
+ return current;
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException("Can not reconstruct from diff:\n" +
diffs.stream().map(o -> o.toString())
+
.collect(Collectors.joining("\n")),
+ t);
}
- return results;
}
private static Command.Truncated truncated(CommonAttributes.Mutable attrs,
SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes
writes, Result result)
@@ -392,25 +453,6 @@ public class Journal
}
}
- public void onExecute(int commandStoreId, Command before, Command after,
boolean isPrimary)
- {
- if (before == after
- || after == null
- || after.saveStatus() == SaveStatus.Uninitialised)
- return;
-
- Diff diff = diff(before, after);
- if (!isPrimary)
- diff = diff.asNonPrimary();
-
- if (diff != null)
- {
- diffsPerCommandStore.computeIfAbsent(commandStoreId, (k) -> new
TreeMap<>())
- .computeIfAbsent(after.txnId(), (k_) -> new
ArrayList<>())
- .add(diff);
- }
- }
-
private static class Diff
{
public final TxnId txnId;
@@ -467,12 +509,6 @@ public class Journal
this.result = result;
}
- // We allow only save status, and waitingOn to be updated by
non-primary transactions
- public Diff asNonPrimary()
- {
- return new Diff(txnId, null, null, saveStatus, null, null, null,
null, null, null, waitingOn, null, null);
- }
-
public boolean allNulls()
{
if (txnId != null) return false;
@@ -543,7 +579,7 @@ public class Journal
ifNotEqual(before, after, Command::participants,
true),
ifNotEqual(before, after, Command::partialTxn,
false),
ifNotEqual(before, after, Command::partialDeps,
false),
- ifNotEqual(before, after, Journal::getWaitingOn,
true),
+ ifNotEqual(before, after,
InMemoryJournal::getWaitingOn, true),
ifNotEqual(before, after, Command::writes, false),
ifNotEqual(before, after, Command::result,
false));
if (diff.allNulls())
@@ -620,5 +656,4 @@ public class Journal
throw new UnsupportedOperationException();
}
}
-
}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
new file mode 100644
index 00000000..69c799b8
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.NavigableMap;
+
+import accord.api.Journal;
+import accord.local.Command;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+/**
+ * Logging journal, a wrapper over journal for debugging / inspecting history
purposes.
+ */
+public class LoggingJournal implements Journal
+{
+ private final BufferedWriter log;
+ private final Journal delegate;
+
+ public LoggingJournal(Journal delegate, String path)
+ {
+ this.delegate = delegate;
+ File f = new File(path);
+ try
+ {
+ log = new BufferedWriter(new OutputStreamWriter(new
FileOutputStream(f)));
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private synchronized void log(String format, Object... objects)
+ {
+ try
+ {
+ log.write(String.format(format, objects));
+ log.flush();
+ }
+ catch (IOException e)
+ {
+ // ignore
+ }
+ }
+
+ public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
+ {
+ return delegate.loadCommand(commandStoreId, txnId, redundantBefore,
durableBefore);
+ }
+
+ public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
+ {
+ log("%d: %s\n", store, update.after);
+ delegate.saveCommand(store, update, onFlush);
+ }
+
+ public void purge(CommandStores commandStores)
+ {
+ log("PURGE\n");
+ delegate.purge(commandStores);
+ }
+
+ public void replay(CommandStores commandStores)
+ {
+ delegate.replay(commandStores);
+ }
+
+ public RedundantBefore loadRedundantBefore(int commandStoreId)
+ {
+ return delegate.loadRedundantBefore(commandStoreId);
+ }
+
+ public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
+ {
+ return delegate.loadBootstrapBeganAt(commandStoreId);
+ }
+
+ public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
+ {
+ return delegate.loadSafeToRead(commandStoreId);
+ }
+
+ public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int
commandStoreId)
+ {
+ return delegate.loadRangesForEpoch(commandStoreId);
+ }
+
+ public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush)
+ {
+ log("%d: %s", store, fieldUpdates);
+ delegate.saveStoreState(store, fieldUpdates, onFlush);
+ }
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
new file mode 100644
index 00000000..736a4f15
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import java.util.NavigableMap;
+
+import accord.api.Journal;
+import accord.local.Command;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+
+/**
+ * A simple version of journal that can be useful for debugging issues with an
implementation that checks command loading
+ * from SUT vs model.
+ */
+public class VerifyingJournal implements Journal
+{
+ private final Journal sut;
+ private final Journal model;
+
+ public VerifyingJournal(Journal model, Journal sut)
+ {
+ this.model = model;
+ this.sut = sut;
+ }
+
+ public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
+ {
+ Command sut = this.sut.loadCommand(commandStoreId, txnId,
redundantBefore, durableBefore);
+ Command model = this.model.loadCommand(commandStoreId, txnId,
redundantBefore, durableBefore);
+ Invariants.checkState(sut.equals(model));
+ return sut;
+ }
+
+ public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
+ {
+ model.saveCommand(store, update, null);
+ sut.saveCommand(store, update, onFlush);
+ }
+
+ public void purge(CommandStores commandStores)
+ {
+ model.purge(commandStores);
+ sut.purge(commandStores);
+ }
+
+ public void replay(CommandStores commandStores)
+ {
+ sut.replay(commandStores);
+ }
+
+ public RedundantBefore loadRedundantBefore(int commandStoreId)
+ {
+ return sut.loadRedundantBefore(commandStoreId);
+ }
+
+ public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
+ {
+ return sut.loadBootstrapBeganAt(commandStoreId);
+ }
+
+ public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
+ {
+ return sut.loadSafeToRead(commandStoreId);
+ }
+
+ public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int
commandStoreId)
+ {
+ return sut.loadRangesForEpoch(commandStoreId);
+ }
+
+ public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush)
+ {
+ sut.saveStoreState(store, fieldUpdates, onFlush);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java
b/accord-core/src/test/java/accord/impl/list/ListQuery.java
index de886b99..f10ac792 100644
--- a/accord-core/src/test/java/accord/impl/list/ListQuery.java
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -38,9 +38,9 @@ import javax.annotation.Nonnull;
public class ListQuery implements Query
{
- final Id client;
- final long requestId;
- final boolean isEphemeralRead;
+ public final Id client;
+ public final long requestId;
+ public final boolean isEphemeralRead;
public ListQuery(Id client, long requestId, boolean isEphemeralRead)
{
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java
b/accord-core/src/test/java/accord/impl/list/ListRead.java
index 915442ad..b1ea3c7c 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -46,7 +46,7 @@ public class ListRead implements Read
private static final Logger logger =
LoggerFactory.getLogger(ListRead.class);
private final Function<? super CommandStore, AsyncExecutor> executor;
- private final boolean isEphemeralRead;
+ public final boolean isEphemeralRead;
public final Seekables<?, ?> userReadKeys; // those only to be returned to
user
public final Seekables<?, ?> keys; // those including necessary for writes
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java
b/accord-core/src/test/java/accord/impl/list/ListResult.java
index adfa1fe3..c752bd0b 100644
--- a/accord-core/src/test/java/accord/impl/list/ListResult.java
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -42,7 +42,7 @@ public class ListResult implements Result, Reply
public final Keys responseKeys;
public final int[][] read; // equal in size to keys.size()
public final ListUpdate update;
- private final Status status;
+ public final Status status;
public ListResult(Status status, Id client, long requestId, TxnId txnId,
Seekables<?, ?> readKeys, Keys responseKeys, int[][] read, ListUpdate update)
{
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 50b2b97c..1123ab73 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import accord.api.Agent;
import accord.api.Data;
import accord.api.DataStore;
+import accord.api.Journal;
import accord.api.ProgressLog;
import accord.api.ProgressLog.BlockedUntil;
import accord.api.Query;
@@ -949,6 +950,11 @@ public class CommandsForKeyTest
return true;
}
+ public Journal.Loader loader()
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public Agent agent()
{
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index ab24cf72..1fab5e1f 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -53,8 +53,8 @@ import accord.utils.RandomSource;
import accord.utils.SortedArrays.SortedArrayList;
import org.agrona.collections.IntHashSet;
-import static accord.burn.BurnTest.HASH_RANGE_END;
-import static accord.burn.BurnTest.HASH_RANGE_START;
+import static accord.burn.BurnTestBase.HASH_RANGE_END;
+import static accord.burn.BurnTestBase.HASH_RANGE_START;
// TODO (testing): add change replication factor
public class TopologyRandomizer
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]